Avoid streaming the transaction which are skipped (in corner cases)

Started by Dilip Kumarabout 3 years ago24 messages
#1Dilip Kumar
dilipbalaut@gmail.com
1 attachment(s)

During DecodeCommit() for skipping a transaction we use ReadRecPtr to
check whether to skip this transaction or not. Whereas in
ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to
stream or not. Generally it will not create a problem but if the
commit record itself is adding some changes to the transaction(e.g.
snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
EndRecPtr then streaming will decide to stream the transaction where
as DecodeCommit will decide to skip it. And for handling this case in
ReorderBufferForget() we call stream_abort().

So ideally if we are planning to skip the transaction we should never
stream it hence there is no need to stream abort such transaction in
case of skip.

In this patch I have fixed the skip condition in the streaming case
and also added an assert inside ReorderBufferForget() to ensure that
the transaction should have never been streamed.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v1-0001-Fix-thinko-in-when-to-stream-a-transaction.patchtext/x-patch; charset=US-ASCII; name=v1-0001-Fix-thinko-in-when-to-stream-a-transaction.patchDownload
From 20cc1084c4943bdaf23753f2a7d9add22097ed95 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Fri, 25 Nov 2022 13:11:44 +0530
Subject: [PATCH v1] Fix thinko in when to stream a transaction

Actually, during DecodeCommit() for skipping a transaction we use
ReadRecPtr to check whether to skip this transaction or not.  Whereas
in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether
to stream or not. Generally it will not create a problem but if the
commit record itslef is adding some changes to the transaction(e.g. snapshot)
and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then
streaming will decide to stream the transaction where as DecodeCommit will
decide to skip it.  And for handling this case in ReorderBufferForget() we
call stream_abort() in order to abort any streamed changes.  So ideally if
we are planning to skip the transaction we should never stream it hence there
is no need to stream abort such transaction in case of skip.

In this patch I have fixed the skip condition in streaming case and also
added an assert inside ReorderBufferForget() to ensure that the transaction
should have never been streamed.
---
 src/backend/replication/logical/reorderbuffer.c | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381..ddd5db0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2942,9 +2942,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* the transaction which is being skipped shouldn't have been streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3919,7 +3918,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
-- 
1.8.3.1

#2Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Dilip Kumar (#1)
Re: Avoid streaming the transaction which are skipped (in corner cases)

Excellent catch. We were looking at this code last week and wondered
the purpose of this abort. Probably we should have some macro or
function to decided whether to skip a transaction based on log record.
That will avoid using different values in different places.

On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

During DecodeCommit() for skipping a transaction we use ReadRecPtr to
check whether to skip this transaction or not. Whereas in
ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to
stream or not. Generally it will not create a problem but if the
commit record itself is adding some changes to the transaction(e.g.
snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
EndRecPtr then streaming will decide to stream the transaction where
as DecodeCommit will decide to skip it. And for handling this case in
ReorderBufferForget() we call stream_abort().

So ideally if we are planning to skip the transaction we should never
stream it hence there is no need to stream abort such transaction in
case of skip.

In this patch I have fixed the skip condition in the streaming case
and also added an assert inside ReorderBufferForget() to ensure that
the transaction should have never been streamed.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

--
Best Wishes,
Ashutosh Bapat

#3Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#1)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

During DecodeCommit() for skipping a transaction we use ReadRecPtr to
check whether to skip this transaction or not. Whereas in
ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to
stream or not. Generally it will not create a problem but if the
commit record itself is adding some changes to the transaction(e.g.
snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
EndRecPtr then streaming will decide to stream the transaction where
as DecodeCommit will decide to skip it. And for handling this case in
ReorderBufferForget() we call stream_abort().

The other cases are probably where we don't have FilterByOrigin or
dbid check, for example, XLOG_HEAP2_NEW_CID/XLOG_XACT_INVALIDATIONS.
We anyway actually don't send anything for such cases except empty
start/stop messages. Can we add some flag to txn which says that there
is at least one change like DML that we want to stream? Then we can
use that flag to decide whether to stream or not.

--
With Regards,
Amit Kapila.

#4Dilip Kumar
dilipbalaut@gmail.com
In reply to: Ashutosh Bapat (#2)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Fri, Nov 25, 2022 at 4:04 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

Excellent catch. We were looking at this code last week and wondered
the purpose of this abort. Probably we should have some macro or
function to decided whether to skip a transaction based on log record.
That will avoid using different values in different places.

We do have a common function i.e. SnapBuildXactNeedsSkip() but there
are two problems 1) it has a dependency on the input parameter so the
result may vary based on the input 2) this is only checked based on
the LSN but there are other factors dbid and originid based on those
also transaction could be skipped during DecodeCommit. So I think one
possible solution could be to remember a dbid and originid in
ReorderBufferTXN as soon as we get the first change which has valid
values for these parameters. And now as you suggested have a common
function that will be used by streaming as well as by DecodeCommit to
decide on whether to skip or not.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#5Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#3)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Fri, Nov 25, 2022 at 5:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

During DecodeCommit() for skipping a transaction we use ReadRecPtr to
check whether to skip this transaction or not. Whereas in
ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to
stream or not. Generally it will not create a problem but if the
commit record itself is adding some changes to the transaction(e.g.
snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
EndRecPtr then streaming will decide to stream the transaction where
as DecodeCommit will decide to skip it. And for handling this case in
ReorderBufferForget() we call stream_abort().

The other cases are probably where we don't have FilterByOrigin or
dbid check, for example, XLOG_HEAP2_NEW_CID/XLOG_XACT_INVALIDATIONS.
We anyway actually don't send anything for such cases except empty
start/stop messages. Can we add some flag to txn which says that there
is at least one change like DML that we want to stream?

We can probably think of using txn_flags for this purpose.

Then we can
use that flag to decide whether to stream or not.

The other possibility here is to use ReorderBufferTXN's base_snapshot.
Normally, we don't stream unless the base_snapshot is set. However,
there are a few challenges (a) the base_snapshot is set in
SnapBuildProcessChange which is called before DecodeInsert, and
similar APIs. So, now even if we filter out insert due origin or
db_id, the base_snapshot will be set. (b) we currently set it to
execute invalidations even when there are no changes to send to
downstream.

For (a), I guess we can split SnapBuildProcessChange, such that the
part where we set the base snapshot will be done after DecodeInsert
and similar APIs decide to queue the change. I am less sure about
point (b), ideally, if we don't need a snapshot to execute
invalidations, I think we can avoid setting base_snapshot even in that
case. Then we can use it as a check whether we want to stream
anything.

I feel this approach required a lot more changes and bit riskier as
compared to having a flag in txn_flags.

--
With Regards,
Amit Kapila.

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#4)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Sat, Nov 26, 2022 at 10:59 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Fri, Nov 25, 2022 at 4:04 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

Excellent catch. We were looking at this code last week and wondered
the purpose of this abort. Probably we should have some macro or
function to decided whether to skip a transaction based on log record.
That will avoid using different values in different places.

We do have a common function i.e. SnapBuildXactNeedsSkip() but there
are two problems 1) it has a dependency on the input parameter so the
result may vary based on the input 2) this is only checked based on
the LSN but there are other factors dbid and originid based on those
also transaction could be skipped during DecodeCommit. So I think one
possible solution could be to remember a dbid and originid in
ReorderBufferTXN as soon as we get the first change which has valid
values for these parameters.

But is the required information say 'dbid' available in all records,
for example, what about XLOG_XACT_INVALIDATIONS? The other thing to
consider in this regard is if we are planning to have additional
information as mentioned by me in another to decide whether to stream
or not then the additional checks may be redundant anyway. It is a
good idea to have a common check at both places but if not, we can at
least add some comments to say why the check is different.

--
With Regards,
Amit Kapila.

#7Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#5)
1 attachment(s)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Sat, Nov 26, 2022 at 12:15 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Nov 25, 2022 at 5:38 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

During DecodeCommit() for skipping a transaction we use ReadRecPtr to
check whether to skip this transaction or not. Whereas in
ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to
stream or not. Generally it will not create a problem but if the
commit record itself is adding some changes to the transaction(e.g.
snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
EndRecPtr then streaming will decide to stream the transaction where
as DecodeCommit will decide to skip it. And for handling this case in
ReorderBufferForget() we call stream_abort().

The other cases are probably where we don't have FilterByOrigin or
dbid check, for example, XLOG_HEAP2_NEW_CID/XLOG_XACT_INVALIDATIONS.
We anyway actually don't send anything for such cases except empty
start/stop messages. Can we add some flag to txn which says that there
is at least one change like DML that we want to stream?

We can probably think of using txn_flags for this purpose.

In the attached patch I have used txn_flags to identify whether it has
any streamable change or not and the transaction will not be selected
for streaming unless it has at least one streamable change.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v2-0001-Fix-thinko-in-when-to-stream-a-transaction.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Fix-thinko-in-when-to-stream-a-transaction.patchDownload
From f330d37f6ac1930cde1d2773dcd568b9a35454c9 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Fri, 25 Nov 2022 13:11:44 +0530
Subject: [PATCH v2] Fix thinko in when to stream a transaction

Actually, during DecodeCommit() for skipping a transaction we use
ReadRecPtr to check whether to skip this transaction or not.  Whereas
in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether
to stream or not. Generally it will not create a problem but if the
commit record itslef is adding some changes to the transaction(e.g. snapshot)
and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then
streaming will decide to stream the transaction where as DecodeCommit will
decide to skip it.  And for handling this case in ReorderBufferForget() we
call stream_abort() in order to abort any streamed changes.  So ideally if
we are planning to skip the transaction we should never stream it hence there
is no need to stream abort such transaction in case of skip.

Along with that we also skip the transaction if the transaction dbid is not same
slot dbid or it is filtered by origin id.  So in corner cases it is possible that
we might stream the transaction but later it will be skipped in DecodeCommit.

For fixing that do not select any transaction for streaming unless there is
any streamable change and if there is any streamable change then we can safely
select it for streaming as it will not be skipped by DecodeCommit.
---
 src/backend/replication/logical/reorderbuffer.c | 34 +++++++++++++++++++++----
 src/include/replication/reorderbuffer.h         | 23 +++++++++++------
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381..e1a031d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -793,6 +793,30 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		return;
 	}
 
+	/*
+	 * If there are any streamable changes getting queued then get the top
+	 * transaction and mark it has streamable change.  This is required for
+	 * streaming in-progress transactions, the in-progress transaction will
+	 * not be selected for streaming unless it has at least one streamable
+	 * change.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE)
+	{
+		ReorderBufferTXN *toptxn;
+
+		/* get the top transaction */
+		if (txn->toptxn != NULL)
+			toptxn = txn->toptxn;
+		else
+			toptxn = txn;
+
+		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+	}
+
 	change->lsn = lsn;
 	change->txn = txn;
 
@@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* the transaction which is being skipped shouldn't have been streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3502,7 +3525,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+			rbtxn_has_streamable_change(txn))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3919,7 +3943,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc..9766c9f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE	  0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 	0x0001
+#define RBTXN_IS_SUBXACT          	0x0002
+#define RBTXN_IS_SERIALIZED       	0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 	0x0008
+#define RBTXN_IS_STREAMED         	0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE  	0x0020
+#define RBTXN_PREPARE             	0x0040
+#define RBTXN_SKIPPED_PREPARE	  	0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Has this transaction contains streamable change? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

#8shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Dilip Kumar (#7)
RE: Avoid streaming the transaction which are skipped (in corner cases)

On Sun, Nov 27, 2022 1:33 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Sat, Nov 26, 2022 at 12:15 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Fri, Nov 25, 2022 at 5:38 PM Amit Kapila <amit.kapila16@gmail.com>

wrote:

On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbalaut@gmail.com>

wrote:

During DecodeCommit() for skipping a transaction we use ReadRecPtr to
check whether to skip this transaction or not. Whereas in
ReorderBufferCanStartStreaming() we use EndRecPtr to check whether

to

stream or not. Generally it will not create a problem but if the
commit record itself is adding some changes to the transaction(e.g.
snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
EndRecPtr then streaming will decide to stream the transaction where
as DecodeCommit will decide to skip it. And for handling this case in
ReorderBufferForget() we call stream_abort().

The other cases are probably where we don't have FilterByOrigin or
dbid check, for example,

XLOG_HEAP2_NEW_CID/XLOG_XACT_INVALIDATIONS.

We anyway actually don't send anything for such cases except empty
start/stop messages. Can we add some flag to txn which says that there
is at least one change like DML that we want to stream?

We can probably think of using txn_flags for this purpose.

In the attached patch I have used txn_flags to identify whether it has
any streamable change or not and the transaction will not be selected
for streaming unless it has at least one streamable change.

Thanks for your patch.

I saw that the patch added a check when selecting largest transaction, but in
addition to ReorderBufferCheckMemoryLimit(), the transaction can also be
streamed in ReorderBufferProcessPartialChange(). Should we add the check in
this function, too?

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9a58c4bfb9..108737b02f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -768,7 +768,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 */
 	if (ReorderBufferCanStartStreaming(rb) &&
 		!(rbtxn_has_partial_change(toptxn)) &&
-		rbtxn_is_serialized(txn))
+		rbtxn_is_serialized(txn) &&
+		rbtxn_has_streamable_change(txn))
 		ReorderBufferStreamTXN(rb, toptxn);
 }

Regards,
Shi yu

#9Dilip Kumar
dilipbalaut@gmail.com
In reply to: shiy.fnst@fujitsu.com (#8)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Mon, Nov 28, 2022 at 1:46 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Thanks for your patch.

I saw that the patch added a check when selecting largest transaction, but in
addition to ReorderBufferCheckMemoryLimit(), the transaction can also be
streamed in ReorderBufferProcessPartialChange(). Should we add the check in
this function, too?

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9a58c4bfb9..108737b02f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -768,7 +768,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (ReorderBufferCanStartStreaming(rb) &&
!(rbtxn_has_partial_change(toptxn)) &&
-               rbtxn_is_serialized(txn))
+               rbtxn_is_serialized(txn) &&
+               rbtxn_has_streamable_change(txn))
ReorderBufferStreamTXN(rb, toptxn);
}

You are right we need this in ReorderBufferProcessPartialChange() as
well. I will fix this in the next version.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#10Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#9)
1 attachment(s)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Mon, Nov 28, 2022 at 3:19 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 28, 2022 at 1:46 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Thanks for your patch.

I saw that the patch added a check when selecting largest transaction, but in
addition to ReorderBufferCheckMemoryLimit(), the transaction can also be
streamed in ReorderBufferProcessPartialChange(). Should we add the check in
this function, too?

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9a58c4bfb9..108737b02f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -768,7 +768,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (ReorderBufferCanStartStreaming(rb) &&
!(rbtxn_has_partial_change(toptxn)) &&
-               rbtxn_is_serialized(txn))
+               rbtxn_is_serialized(txn) &&
+               rbtxn_has_streamable_change(txn))
ReorderBufferStreamTXN(rb, toptxn);
}

You are right we need this in ReorderBufferProcessPartialChange() as
well. I will fix this in the next version.

Fixed this in the attached patch.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v3-0001-Fix-thinko-in-when-to-stream-a-transaction.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Fix-thinko-in-when-to-stream-a-transaction.patchDownload
From d00e15b37dbfa0152d32800e3d4dd0982962b6f8 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Fri, 25 Nov 2022 13:11:44 +0530
Subject: [PATCH v3] Fix thinko in when to stream a transaction

Actually, during DecodeCommit() for skipping a transaction we use
ReadRecPtr to check whether to skip this transaction or not.  Whereas
in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether
to stream or not. Generally it will not create a problem but if the
commit record itslef is adding some changes to the transaction(e.g. snapshot)
and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then
streaming will decide to stream the transaction where as DecodeCommit will
decide to skip it.  And for handling this case in ReorderBufferForget() we
call stream_abort() in order to abort any streamed changes.  So ideally if
we are planning to skip the transaction we should never stream it hence there
is no need to stream abort such transaction in case of skip.

Along with that we also skip the transaction if the transaction dbid is not same
slot dbid or it is filtered by origin id.  So in corner cases it is possible that
we might stream the transaction but later it will be skipped in DecodeCommit.

For fixing that do not select any transaction for streaming unless there is
any streamable change and if there is any streamable change then we can safely
select it for streaming as it will not be skipped by DecodeCommit.
---
 src/backend/replication/logical/reorderbuffer.c | 39 ++++++++++++++++++++-----
 src/include/replication/reorderbuffer.h         | 23 ++++++++++-----
 2 files changed, 47 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381..a581ab5 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -753,7 +753,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	/*
 	 * Stream the transaction if it is serialized before and the changes are
-	 * now complete in the top-level transaction.
+	 * now complete in the top-level transaction and it has a streamable change
 	 *
 	 * The reason for doing the streaming of such a transaction as soon as we
 	 * get the complete change for it is that previously it would have reached
@@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 */
 	if (ReorderBufferCanStartStreaming(rb) &&
 		!(rbtxn_has_partial_change(toptxn)) &&
-		rbtxn_is_serialized(txn))
+		rbtxn_is_serialized(txn) &&
+		rbtxn_has_streamable_change(txn))
 		ReorderBufferStreamTXN(rb, toptxn);
 }
 
@@ -793,6 +794,30 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		return;
 	}
 
+	/*
+	 * If there are any streamable changes getting queued then get the top
+	 * transaction and mark it has streamable change.  This is required for
+	 * streaming in-progress transactions, the in-progress transaction will
+	 * not be selected for streaming unless it has at least one streamable
+	 * change.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE)
+	{
+		ReorderBufferTXN *toptxn;
+
+		/* get the top transaction */
+		if (txn->toptxn != NULL)
+			toptxn = txn->toptxn;
+		else
+			toptxn = txn;
+
+		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+	}
+
 	change->lsn = lsn;
 	change->txn = txn;
 
@@ -2942,9 +2967,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* the transaction which is being skipped shouldn't have been streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+			rbtxn_has_streamable_change(txn))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc..9766c9f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE	  0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 	0x0001
+#define RBTXN_IS_SUBXACT          	0x0002
+#define RBTXN_IS_SERIALIZED       	0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 	0x0008
+#define RBTXN_IS_STREAMED         	0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE  	0x0020
+#define RBTXN_PREPARE             	0x0040
+#define RBTXN_SKIPPED_PREPARE	  	0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Has this transaction contains streamable change? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

#11houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Dilip Kumar (#10)
RE: Avoid streaming the transaction which are skipped (in corner cases)

On Tuesday, November 29, 2022 12:08 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Hi,

On Mon, Nov 28, 2022 at 3:19 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Nov 28, 2022 at 1:46 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

Thanks for your patch.

I saw that the patch added a check when selecting largest
transaction, but in addition to ReorderBufferCheckMemoryLimit(), the
transaction can also be streamed in
ReorderBufferProcessPartialChange(). Should we add the check in this

function, too?

diff --git a/src/backend/replication/logical/reorderbuffer.c
b/src/backend/replication/logical/reorderbuffer.c
index 9a58c4bfb9..108737b02f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -768,7 +768,8 @@

ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN
*txn,

*/
if (ReorderBufferCanStartStreaming(rb) &&
!(rbtxn_has_partial_change(toptxn)) &&
-               rbtxn_is_serialized(txn))
+               rbtxn_is_serialized(txn) &&
+               rbtxn_has_streamable_change(txn))
ReorderBufferStreamTXN(rb, toptxn);  }

You are right we need this in ReorderBufferProcessPartialChange() as
well. I will fix this in the next version.

Fixed this in the attached patch.

Thanks for updating the patch.

I have few comments about the patch.

1.

1.1.
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* the transaction which is being skipped shouldn't have been streamed */
+	Assert(!rbtxn_is_streamed(txn));
1.2
-		rbtxn_is_serialized(txn))
+		rbtxn_is_serialized(txn) &&
+		rbtxn_has_streamable_change(txn))
 		ReorderBufferStreamTXN(rb, toptxn);

In the above two places, I think we should do the check for the top-level
transaction(e.g. toptxn) because the patch only set flag for the top-level
transaction.

2.

+	/*
+	 * If there are any streamable changes getting queued then get the top
+	 * transaction and mark it has streamable change.  This is required for
+	 * streaming in-progress transactions, the in-progress transaction will
+	 * not be selected for streaming unless it has at least one streamable
+	 * change.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE)

I think that a transaction that contains REORDER_BUFFER_CHANGE_MESSAGE can also be
considered as streamable. Is there a reason that we don't check it here ?

Best regards,
Hou zj

#12Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Dilip Kumar (#10)
Re: Avoid streaming the transaction which are skipped (in corner cases)

Hi Dilip,

On Tue, Nov 29, 2022 at 9:38 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

You are right we need this in ReorderBufferProcessPartialChange() as
well. I will fix this in the next version.

Fixed this in the attached patch.

I focused my attention on SnapBuildXactNeedsSkip() usages and I see
they are using different end points of WAL record
1 decode.c logicalmsg_decode 594
SnapBuildXactNeedsSkip(builder, buf->origptr)))
2 decode.c DecodeTXNNeedSkip 1250 return
(SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
3 reorderbuffer.c AssertTXNLsnOrder 897 if
(SnapBuildXactNeedsSkip(ctx->snapshot_builder,
ctx->reader->EndRecPtr))
4 reorderbuffer.c ReorderBufferCanStartStreaming 3922
!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
5 snapbuild.c SnapBuildXactNeedsSkip 429
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)

The first two are using origin ptr and the last two are using end ptr.
you have fixed the fourth one. Do we need to fix the third one as
well?

Probably we need to create two wrappers (macros) around
SnapBuildXactNeedsSkip(), one which accepts a XLogRecordBuffer and
other which accepts XLogReaderState. Then use those. That way at least
we have logic unified as to which XLogRecPtr to use.

--
Best Wishes,
Ashutosh Bapat

#13Amit Kapila
amit.kapila16@gmail.com
In reply to: Ashutosh Bapat (#12)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Fri, Dec 2, 2022 at 4:58 PM Ashutosh Bapat
<ashutosh.bapat.oss@gmail.com> wrote:

Hi Dilip,

On Tue, Nov 29, 2022 at 9:38 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

You are right we need this in ReorderBufferProcessPartialChange() as
well. I will fix this in the next version.

Fixed this in the attached patch.

I focused my attention on SnapBuildXactNeedsSkip() usages and I see
they are using different end points of WAL record
1 decode.c logicalmsg_decode 594
SnapBuildXactNeedsSkip(builder, buf->origptr)))
2 decode.c DecodeTXNNeedSkip 1250 return
(SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
3 reorderbuffer.c AssertTXNLsnOrder 897 if
(SnapBuildXactNeedsSkip(ctx->snapshot_builder,
ctx->reader->EndRecPtr))
4 reorderbuffer.c ReorderBufferCanStartStreaming 3922
!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
5 snapbuild.c SnapBuildXactNeedsSkip 429
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)

The first two are using origin ptr and the last two are using end ptr.
you have fixed the fourth one. Do we need to fix the third one as
well?

I think we can change the third one as well but I haven't tested it.
Adding Sawada-San for his inputs as it is added in commit 16b1fe0037.
In any case, I think we can do that as a separate patch because it is
not directly related to the streaming case we are trying to solve as
part of this patch.

Probably we need to create two wrappers (macros) around
SnapBuildXactNeedsSkip(), one which accepts a XLogRecordBuffer and
other which accepts XLogReaderState. Then use those. That way at least
we have logic unified as to which XLogRecPtr to use.

I don't know how that will be an improvement because both those have
the start and end locations of the record.

--
With Regards,
Amit Kapila.

#14Amit Kapila
amit.kapila16@gmail.com
In reply to: houzj.fnst@fujitsu.com (#11)
1 attachment(s)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Tue, Nov 29, 2022 at 12:23 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Tuesday, November 29, 2022 12:08 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

I have few comments about the patch.

1.

1.1.
-       /* For streamed transactions notify the remote node about the abort. */
-       if (rbtxn_is_streamed(txn))
-               rb->stream_abort(rb, txn, lsn);
+       /* the transaction which is being skipped shouldn't have been streamed */
+       Assert(!rbtxn_is_streamed(txn));
1.2
-               rbtxn_is_serialized(txn))
+               rbtxn_is_serialized(txn) &&
+               rbtxn_has_streamable_change(txn))
ReorderBufferStreamTXN(rb, toptxn);

In the above two places, I think we should do the check for the top-level
transaction(e.g. toptxn) because the patch only set flag for the top-level
transaction.

Among these, the first one seems okay because it will check both the
transaction and its subtransactions from that path and none of those
should be marked as streamed. I have fixed the second one in the
attached patch.

2.

+       /*
+        * If there are any streamable changes getting queued then get the top
+        * transaction and mark it has streamable change.  This is required for
+        * streaming in-progress transactions, the in-progress transaction will
+        * not be selected for streaming unless it has at least one streamable
+        * change.
+        */
+       if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+               change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+               change->action == REORDER_BUFFER_CHANGE_DELETE ||
+               change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+               change->action == REORDER_BUFFER_CHANGE_TRUNCATE)

I think that a transaction that contains REORDER_BUFFER_CHANGE_MESSAGE can also be
considered as streamable. Is there a reason that we don't check it here ?

No, I don't see any reason not to do this check for
REORDER_BUFFER_CHANGE_MESSAGE.

Apart from the above, I have slightly adjusted the comments in the
attached. Do let me know what you think of the attached.

--
With Regards,
Amit Kapila.

Attachments:

v4-0001-Avoid-unnecessary-streaming-of-transactions-durin.patchapplication/octet-stream; name=v4-0001-Avoid-unnecessary-streaming-of-transactions-durin.patchDownload
From 5b0cfa3f72ee1ffeced4bffe0bcbc4a8551a47b8 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Sat, 3 Dec 2022 12:11:29 +0530
Subject: [PATCH v4] Avoid unnecessary streaming of transactions during logical
 replication.

After restart, we don't perform streaming of an in-progress transaction if
it was previously decoded and confirmed by client. To achieve that we were
comparing the END location of the WAL record being decoded with the WAL
location we have already decoded and confirmed by the client. While
decoding the commit record, to decide whether to process and send the
complete transaction, we compare its START location with the WAL location
we have already decoded and confirmed by the client. Now, if we need to
queue some change in the transaction while decoding the commit record
(e.g. snapshot), it is possible that we decide to stream the transaction
but later commit processing decides to skip it. In such a case, we would
needlessly send the changes and later when we decide to skip it, we will
send stream abort.

We also sometimes decide to stream the changes when we actually just need
to process them locally like a change for invalidations. This will lead us
to send empty streams. To avoid this, while queuing each change for
decoding, we remember whether the transaction has any change that actually
needs to be sent downstream and use that information later to decide
whether to stream the transaction or not.

Author: Dilip Kumar
Reviewed-by: Hou Zhijie, Ashutosh Bapat, Shi yu, Amit Kapila
Discussion: https://postgr.es/m/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
---
 .../replication/logical/reorderbuffer.c       | 42 +++++++++++++++----
 src/include/replication/reorderbuffer.h       | 23 ++++++----
 2 files changed, 48 insertions(+), 17 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381f2d..b50844b0c8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -695,9 +695,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
  * Record the partial change for the streaming of in-progress transactions.  We
  * can stream only complete changes so if we have a partial change like toast
  * table insert or speculative insert then we mark such a 'txn' so that it
- * can't be streamed.  We also ensure that if the changes in such a 'txn' are
- * above logical_decoding_work_mem threshold then we stream them as soon as we
- * have a complete change.
+ * can't be streamed.  We also ensure that if the changes in such a 'txn' can
+ * be streamed and are above logical_decoding_work_mem threshold then we stream
+ * them as soon as we have a complete change.
  */
 static void
 ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 */
 	if (ReorderBufferCanStartStreaming(rb) &&
 		!(rbtxn_has_partial_change(toptxn)) &&
-		rbtxn_is_serialized(txn))
+		rbtxn_is_serialized(txn) &&
+		rbtxn_has_streamable_change(toptxn))
 		ReorderBufferStreamTXN(rb, toptxn);
 }
 
@@ -793,6 +794,29 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		return;
 	}
 
+	/*
+	 * The changes that are sent downstream are considered streamable.  We
+	 * remember such transactions so that only those will later be considered
+	 * for streaming.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
+		change->action == REORDER_BUFFER_CHANGE_MESSAGE)
+	{
+		ReorderBufferTXN *toptxn;
+
+		/* get the top transaction */
+		if (txn->toptxn != NULL)
+			toptxn = txn->toptxn;
+		else
+			toptxn = txn;
+
+		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+	}
+
 	change->lsn = lsn;
 	change->txn = txn;
 
@@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* this transaction mustn't be streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3502,7 +3525,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+			rbtxn_has_streamable_change(txn))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3919,7 +3943,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc4f9..c700b55b1c 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE	  0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 	0x0001
+#define RBTXN_IS_SUBXACT          	0x0002
+#define RBTXN_IS_SERIALIZED       	0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 	0x0008
+#define RBTXN_IS_STREAMED         	0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE  	0x0020
+#define RBTXN_PREPARE             	0x0040
+#define RBTXN_SKIPPED_PREPARE	  	0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Does this transaction contain streamable changes? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
2.28.0.windows.1

#15houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#14)
RE: Avoid streaming the transaction which are skipped (in corner cases)

On Saturday, December 3, 2022 7:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Nov 29, 2022 at 12:23 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Tuesday, November 29, 2022 12:08 PM Dilip Kumar

<dilipbalaut@gmail.com> wrote:

I have few comments about the patch.

1.

1.1.
- /* For streamed transactions notify the remote node about the abort.

*/

-       if (rbtxn_is_streamed(txn))
-               rb->stream_abort(rb, txn, lsn);
+       /* the transaction which is being skipped shouldn't have been

streamed */

+ Assert(!rbtxn_is_streamed(txn));

1.2
-               rbtxn_is_serialized(txn))
+               rbtxn_is_serialized(txn) &&
+               rbtxn_has_streamable_change(txn))
ReorderBufferStreamTXN(rb, toptxn);

In the above two places, I think we should do the check for the
top-level transaction(e.g. toptxn) because the patch only set flag for
the top-level transaction.

Among these, the first one seems okay because it will check both the transaction
and its subtransactions from that path and none of those should be marked as
streamed. I have fixed the second one in the attached patch.

2.

+       /*
+        * If there are any streamable changes getting queued then get the

top

+ * transaction and mark it has streamable change. This is required

for

+        * streaming in-progress transactions, the in-progress transaction will
+        * not be selected for streaming unless it has at least one streamable
+        * change.
+        */
+       if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+               change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+               change->action == REORDER_BUFFER_CHANGE_DELETE ||
+               change->action ==

REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||

+ change->action ==

REORDER_BUFFER_CHANGE_TRUNCATE)

I think that a transaction that contains REORDER_BUFFER_CHANGE_MESSAGE
can also be considered as streamable. Is there a reason that we don't check it

here ?

No, I don't see any reason not to do this check for
REORDER_BUFFER_CHANGE_MESSAGE.

Apart from the above, I have slightly adjusted the comments in the attached. Do
let me know what you think of the attached.

Thanks for updating the patch. It looks good to me.

Best regards,
Hou zj

#16Amit Kapila
amit.kapila16@gmail.com
In reply to: houzj.fnst@fujitsu.com (#15)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Sun, Dec 4, 2022 at 5:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Saturday, December 3, 2022 7:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Apart from the above, I have slightly adjusted the comments in the attached. Do
let me know what you think of the attached.

Thanks for updating the patch. It looks good to me.

I feel the function name ReorderBufferLargestTopTXN() is slightly
misleading because it also checks some of the streaming properties
(like whether the TXN has partial changes and whether it contains any
streamable change). Shall we rename it to
ReorderBufferLargestStreamableTopTXN() or something like that?

The other point to consider is whether we need to have a test case for
this patch. I think before this patch if the size of DDL changes in a
transaction exceeds logical_decoding_work_mem, the empty streams will
be output in the plugin but after this patch, there won't be any such
stream.

--
With Regards,
Amit Kapila.

#17Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#16)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Mon, Dec 5, 2022 at 8:59 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Sun, Dec 4, 2022 at 5:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Saturday, December 3, 2022 7:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Apart from the above, I have slightly adjusted the comments in the attached. Do
let me know what you think of the attached.

Thanks for updating the patch. It looks good to me.

I feel the function name ReorderBufferLargestTopTXN() is slightly
misleading because it also checks some of the streaming properties
(like whether the TXN has partial changes and whether it contains any
streamable change). Shall we rename it to
ReorderBufferLargestStreamableTopTXN() or something like that?

Yes that makes sense

The other point to consider is whether we need to have a test case for
this patch. I think before this patch if the size of DDL changes in a
transaction exceeds logical_decoding_work_mem, the empty streams will
be output in the plugin but after this patch, there won't be any such
stream.

Yes, we can do that, I will make these two changes.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#18Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#17)
1 attachment(s)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Mon, Dec 5, 2022 at 9:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Dec 5, 2022 at 8:59 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Sun, Dec 4, 2022 at 5:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Saturday, December 3, 2022 7:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Apart from the above, I have slightly adjusted the comments in the attached. Do
let me know what you think of the attached.

Thanks for updating the patch. It looks good to me.

I feel the function name ReorderBufferLargestTopTXN() is slightly
misleading because it also checks some of the streaming properties
(like whether the TXN has partial changes and whether it contains any
streamable change). Shall we rename it to
ReorderBufferLargestStreamableTopTXN() or something like that?

Yes that makes sense

I have done this change in the attached patch.

The other point to consider is whether we need to have a test case for
this patch. I think before this patch if the size of DDL changes in a
transaction exceeds logical_decoding_work_mem, the empty streams will
be output in the plugin but after this patch, there won't be any such
stream.

I tried this test, but I think generating 64k data with just CID
messages will make the test case really big. I tried using multiple
sessions such that one session makes the reorder buffer full but
contains partial changes so that we try to stream another transaction
but that is not possible in an automated test to consistently generate
the partial change.

I think we need something like this[1]/messages/by-id/OSZPR01MB631042582805A8E8615BC413FD329@OSZPR01MB6310.jpnprd01.prod.outlook.com so that we can better control
the streaming.

[1]: /messages/by-id/OSZPR01MB631042582805A8E8615BC413FD329@OSZPR01MB6310.jpnprd01.prod.outlook.com

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

v5-0001-Avoid-unnecessary-streaming-of-transactions-durin.patchtext/x-patch; charset=US-ASCII; name=v5-0001-Avoid-unnecessary-streaming-of-transactions-durin.patchDownload
From d15b33d74952a78c4050ed7f1235bbb675643478 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Sat, 3 Dec 2022 12:11:29 +0530
Subject: [PATCH v5] Avoid unnecessary streaming of transactions during logical
 replication.

After restart, we don't perform streaming of an in-progress transaction if
it was previously decoded and confirmed by client. To achieve that we were
comparing the END location of the WAL record being decoded with the WAL
location we have already decoded and confirmed by the client. While
decoding the commit record, to decide whether to process and send the
complete transaction, we compare its START location with the WAL location
we have already decoded and confirmed by the client. Now, if we need to
queue some change in the transaction while decoding the commit record
(e.g. snapshot), it is possible that we decide to stream the transaction
but later commit processing decides to skip it. In such a case, we would
needlessly send the changes and later when we decide to skip it, we will
send stream abort.

We also sometimes decide to stream the changes when we actually just need
to process them locally like a change for invalidations. This will lead us
to send empty streams. To avoid this, while queuing each change for
decoding, we remember whether the transaction has any change that actually
needs to be sent downstream and use that information later to decide
whether to stream the transaction or not.

Author: Dilip Kumar
Reviewed-by: Hou Zhijie, Ashutosh Bapat, Shi yu, Amit Kapila
Discussion: https://postgr.es/m/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
---
 src/backend/replication/logical/reorderbuffer.c | 51 ++++++++++++++++++-------
 src/include/replication/reorderbuffer.h         | 23 +++++++----
 2 files changed, 53 insertions(+), 21 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381..6e11056 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -695,9 +695,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
  * Record the partial change for the streaming of in-progress transactions.  We
  * can stream only complete changes so if we have a partial change like toast
  * table insert or speculative insert then we mark such a 'txn' so that it
- * can't be streamed.  We also ensure that if the changes in such a 'txn' are
- * above logical_decoding_work_mem threshold then we stream them as soon as we
- * have a complete change.
+ * can't be streamed.  We also ensure that if the changes in such a 'txn' can
+ * be streamed and are above logical_decoding_work_mem threshold then we stream
+ * them as soon as we have a complete change.
  */
 static void
 ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 */
 	if (ReorderBufferCanStartStreaming(rb) &&
 		!(rbtxn_has_partial_change(toptxn)) &&
-		rbtxn_is_serialized(txn))
+		rbtxn_is_serialized(txn) &&
+		rbtxn_has_streamable_change(toptxn))
 		ReorderBufferStreamTXN(rb, toptxn);
 }
 
@@ -793,6 +794,29 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		return;
 	}
 
+	/*
+	 * The changes that are sent downstream are considered streamable.  We
+	 * remember such transactions so that only those will later be considered
+	 * for streaming.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
+		change->action == REORDER_BUFFER_CHANGE_MESSAGE)
+	{
+		ReorderBufferTXN *toptxn;
+
+		/* get the top transaction */
+		if (txn->toptxn != NULL)
+			toptxn = txn->toptxn;
+		else
+			toptxn = txn;
+
+		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+	}
+
 	change->lsn = lsn;
 	change->txn = txn;
 
@@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* this transaction mustn't be streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3460,14 +3483,15 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
 }
 
 /*
- * Find the largest toplevel transaction to evict (by streaming).
+ * Find the largest streamable toplevel transaction to evict (by streaming).
  *
  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
  * should give us the same transaction (because we don't update memory account
  * for subtransaction with streaming, so it's always 0). But we can simply
  * iterate over the limited number of toplevel transactions that have a base
  * snapshot. There is no use of selecting a transaction that doesn't have base
- * snapshot because we don't decode such transactions.
+ * snapshot because we don't decode such transactions.  Also we do not select
+ * the transaction which doesn't have any streamable change.
  *
  * Note that, we skip transactions that contains incomplete changes. There
  * is a scope of optimization here such that we can select the largest
@@ -3483,7 +3507,7 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
  * the subxact from where we streamed the last change.
  */
 static ReorderBufferTXN *
-ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 {
 	dlist_iter	iter;
 	Size		largest_size = 0;
@@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+			rbtxn_has_streamable_change(txn))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3547,7 +3572,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 		 * memory by streaming, if possible.  Otherwise, spill to disk.
 		 */
 		if (ReorderBufferCanStartStreaming(rb) &&
-			(txn = ReorderBufferLargestTopTXN(rb)) != NULL)
+			(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
 		{
 			/* we know there has to be one, because the size is not zero */
 			Assert(txn && !txn->toptxn);
@@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc..c700b55 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE	  0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 	0x0001
+#define RBTXN_IS_SUBXACT          	0x0002
+#define RBTXN_IS_SERIALIZED       	0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 	0x0008
+#define RBTXN_IS_STREAMED         	0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE  	0x0020
+#define RBTXN_PREPARE             	0x0040
+#define RBTXN_SKIPPED_PREPARE	  	0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Does this transaction contain streamable changes? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

#19Amit Kapila
amit.kapila16@gmail.com
In reply to: Dilip Kumar (#18)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Mon, Dec 5, 2022 at 3:41 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Dec 5, 2022 at 9:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Mon, Dec 5, 2022 at 8:59 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Sun, Dec 4, 2022 at 5:14 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Saturday, December 3, 2022 7:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

Apart from the above, I have slightly adjusted the comments in the attached. Do
let me know what you think of the attached.

Thanks for updating the patch. It looks good to me.

I feel the function name ReorderBufferLargestTopTXN() is slightly
misleading because it also checks some of the streaming properties
(like whether the TXN has partial changes and whether it contains any
streamable change). Shall we rename it to
ReorderBufferLargestStreamableTopTXN() or something like that?

Yes that makes sense

I have done this change in the attached patch.

The other point to consider is whether we need to have a test case for
this patch. I think before this patch if the size of DDL changes in a
transaction exceeds logical_decoding_work_mem, the empty streams will
be output in the plugin but after this patch, there won't be any such
stream.

I tried this test, but I think generating 64k data with just CID
messages will make the test case really big. I tried using multiple
sessions such that one session makes the reorder buffer full but
contains partial changes so that we try to stream another transaction
but that is not possible in an automated test to consistently generate
the partial change.

I also don't see a way to achieve it in an automated way because both
toast and speculative inserts are part of one statement, so we need a
real concurrent test to make it happen. Can anyone else think of a way
to achieve it?

I think we need something like this[1] so that we can better control
the streaming.

+1. The additional advantage would be that we can generate parallel
apply and new streaming tests with much lesser data. Shi-San, can you
please start a new thread for the GUC patch proposed by you as
indicated by Dilip?

--
With Regards,
Amit Kapila.

#20shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Amit Kapila (#19)
RE: Avoid streaming the transaction which are skipped (in corner cases)

On Mon, Dec 5, 2022 6:57 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Dec 5, 2022 at 3:41 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

I think we need something like this[1] so that we can better control
the streaming.

+1. The additional advantage would be that we can generate parallel
apply and new streaming tests with much lesser data. Shi-San, can you
please start a new thread for the GUC patch proposed by you as
indicated by Dilip?

OK, I started a new thread for it. [1]/messages/by-id/OSZPR01MB63104E7449DBE41932DB19F1FD1B9@OSZPR01MB6310.jpnprd01.prod.outlook.com

[1]: /messages/by-id/OSZPR01MB63104E7449DBE41932DB19F1FD1B9@OSZPR01MB6310.jpnprd01.prod.outlook.com

Regards,
Shi yu

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: shiy.fnst@fujitsu.com (#20)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Tue, Dec 6, 2022 at 11:55 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

On Mon, Dec 5, 2022 6:57 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Dec 5, 2022 at 3:41 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

I think we need something like this[1] so that we can better control
the streaming.

+1. The additional advantage would be that we can generate parallel
apply and new streaming tests with much lesser data. Shi-San, can you
please start a new thread for the GUC patch proposed by you as
indicated by Dilip?

OK, I started a new thread for it. [1]

Thanks. I think it is better to go ahead with this patch and once we
decide what is the right thing to do in terms of GUC then we can try
to add additional tests for this. Anyway, it is not that the code
added by this patch is not getting covered by existing tests. What do
you think?

--
With Regards,
Amit Kapila.

#22Dilip Kumar
dilipbalaut@gmail.com
In reply to: Amit Kapila (#21)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Wed, Dec 7, 2022 at 9:28 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Dec 6, 2022 at 11:55 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

On Mon, Dec 5, 2022 6:57 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Dec 5, 2022 at 3:41 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

I think we need something like this[1] so that we can better control
the streaming.

+1. The additional advantage would be that we can generate parallel
apply and new streaming tests with much lesser data. Shi-San, can you
please start a new thread for the GUC patch proposed by you as
indicated by Dilip?

OK, I started a new thread for it. [1]

Thanks. I think it is better to go ahead with this patch and once we
decide what is the right thing to do in terms of GUC then we can try
to add additional tests for this. Anyway, it is not that the code
added by this patch is not getting covered by existing tests. What do
you think?

That makes sense to me.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#23shiy.fnst@fujitsu.com
shiy.fnst@fujitsu.com
In reply to: Dilip Kumar (#22)
RE: Avoid streaming the transaction which are skipped (in corner cases)

On Wed, Dec 7, 2022 12:01 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Dec 7, 2022 at 9:28 AM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Tue, Dec 6, 2022 at 11:55 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

On Mon, Dec 5, 2022 6:57 PM Amit Kapila <amit.kapila16@gmail.com>

wrote:

On Mon, Dec 5, 2022 at 3:41 PM Dilip Kumar <dilipbalaut@gmail.com>

wrote:

I think we need something like this[1] so that we can better control
the streaming.

+1. The additional advantage would be that we can generate parallel
apply and new streaming tests with much lesser data. Shi-San, can you
please start a new thread for the GUC patch proposed by you as
indicated by Dilip?

OK, I started a new thread for it. [1]

Thanks. I think it is better to go ahead with this patch and once we
decide what is the right thing to do in terms of GUC then we can try
to add additional tests for this. Anyway, it is not that the code
added by this patch is not getting covered by existing tests. What do
you think?

That makes sense to me.

+1

Regards,
Shi yu

#24Amit Kapila
amit.kapila16@gmail.com
In reply to: shiy.fnst@fujitsu.com (#23)
Re: Avoid streaming the transaction which are skipped (in corner cases)

On Wed, Dec 7, 2022 at 9:35 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

On Wed, Dec 7, 2022 12:01 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Thanks. I think it is better to go ahead with this patch and once we
decide what is the right thing to do in terms of GUC then we can try
to add additional tests for this. Anyway, it is not that the code
added by this patch is not getting covered by existing tests. What do
you think?

That makes sense to me.

+1

Pushed.

--
With Regards,
Amit Kapila.