[PATCH] Provide more information to filter_prepare

Started by Markus Wanneralmost 5 years ago26 messages
#1Markus Wanner
markus.wanner@enterprisedb.com
1 attachment(s)

Hi,

currently, only the gid is passed on to the filter_prepare callback.
While we probably should not pass a full ReorderBufferTXN (as we do for
most other output plugin callbacks), a bit more information would be
nice, I think.

Attached is a patch that adds the xid (still lacking docs changes). The
question about stream_prepare being optional made me think about whether
an output plugin needs to know if changes have been already streamed
prior to a prepare. Maybe not? Any other information you think the
output plugin might find useful to decide whether or not to skip the
prepare?

If you are okay with adding just the xid, I'll add docs changes to the
patch provided.

Regards

Markus

Attachments:

0001-add-xid-arg-to-filter_prepare_v1.patchtext/x-patch; charset=UTF-8; name=0001-add-xid-arg-to-filter_prepare_v1.patchDownload
From: Markus Wanner <markus@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for the
 output plugin.

---
 contrib/test_decoding/test_decoding.c     |  4 +++-
 src/backend/replication/logical/decode.c  | 17 +++++++++++------
 src/backend/replication/logical/logical.c |  5 +++--
 src/include/replication/logical.h         |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 5 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.1

#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#1)
Re: [PATCH] Provide more information to filter_prepare

On Tue, Mar 9, 2021 at 2:14 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

Hi,

currently, only the gid is passed on to the filter_prepare callback.
While we probably should not pass a full ReorderBufferTXN (as we do for
most other output plugin callbacks), a bit more information would be
nice, I think.

How the proposed 'xid' parameter can be useful? What exactly plugins
want to do with it?

--
With Regards,
Amit Kapila.

#3Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#2)
Re: [PATCH] Provide more information to filter_prepare

On 10.03.21 11:18, Amit Kapila wrote:

On Tue, Mar 9, 2021 at 2:14 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

currently, only the gid is passed on to the filter_prepare callback.
While we probably should not pass a full ReorderBufferTXN (as we do for
most other output plugin callbacks), a bit more information would be
nice, I think.

How the proposed 'xid' parameter can be useful? What exactly plugins
want to do with it?

The xid is the very basic identifier for transactions in Postgres. Any
output plugin that interacts with Postgres in any way slightly more
interesting than "filter by gid prefix" is very likely to come across a
TransactionId.

It allows for basics like checking if the transaction to decode still is
in progress, for example. Or in a much more complex scenario, decide on
whether or not to filter based on properties the extension stored during
processing the transaction.

Regards

Markus

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#3)
Re: [PATCH] Provide more information to filter_prepare

On Wed, Mar 10, 2021 at 4:26 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 10.03.21 11:18, Amit Kapila wrote:

On Tue, Mar 9, 2021 at 2:14 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

currently, only the gid is passed on to the filter_prepare callback.
While we probably should not pass a full ReorderBufferTXN (as we do for
most other output plugin callbacks), a bit more information would be
nice, I think.

How the proposed 'xid' parameter can be useful? What exactly plugins
want to do with it?

The xid is the very basic identifier for transactions in Postgres. Any
output plugin that interacts with Postgres in any way slightly more
interesting than "filter by gid prefix" is very likely to come across a
TransactionId.

It allows for basics like checking if the transaction to decode still is
in progress, for example.

But this happens when we are decoding prepare, so it is clear that the
transaction is prepared, why any additional check?

Or in a much more complex scenario, decide on
whether or not to filter based on properties the extension stored during
processing the transaction.

What in this can't be done with GID and how XID can achieve it?

--
With Regards,
Amit Kapila.

#5Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#4)
Re: [PATCH] Provide more information to filter_prepare

On 11.03.21 04:58, Amit Kapila wrote:

But this happens when we are decoding prepare, so it is clear that the
transaction is prepared, why any additional check?

An output plugin cannot assume the transaction is still prepared and
uncommitted at the point in time it gets to decode the prepare.
Therefore, the transaction may or may not be still in progress.
However, my point is that the xid is the more generally useful
identifier than the gid.

What in this can't be done with GID and how XID can achieve it?

It's a convenience. Of course, an output plugin could lookup the xid
via the gid. But why force it to have to do that when the xid would be
so readily available? (Especially given that seems rather expensive.
Or how would an extension lookup the xid by gid?)

The initial versions by Nikhil clearly did include it (actually a full
ReorderBufferTXN, which I think would be even better). I'm not clear on
your motivations to restrict the API. What's clear to me is that the
more information Postgres exposes to plugins and extensions, the easier
it becomes to extend Postgres. (Modulo perhaps API stability
considerations. A TransactionId clearly is not a concern in that area.
Especially given we expose the entire ReorderBufferTXN struct for
other callbacks.)

Regards

Markus

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#5)
Re: [PATCH] Provide more information to filter_prepare

On Thu, Mar 11, 2021 at 2:44 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 11.03.21 04:58, Amit Kapila wrote:

But this happens when we are decoding prepare, so it is clear that the
transaction is prepared, why any additional check?

An output plugin cannot assume the transaction is still prepared and
uncommitted at the point in time it gets to decode the prepare.
Therefore, the transaction may or may not be still in progress.
However, my point is that the xid is the more generally useful
identifier than the gid.

What in this can't be done with GID and how XID can achieve it?

It's a convenience. Of course, an output plugin could lookup the xid
via the gid. But why force it to have to do that when the xid would be
so readily available?

I am not suggesting doing any such look-up. It is just that the use of
additional parameter(s) for deciding whether to decode at prepare time
or to decode later as a regular one-phase transaction is not clear to
me. Now, it is possible that your argument is right that passing
additional information gives flexibility to plugin authors and we
should just do what you are saying or maybe go even a step further and
pass ReorderBufferTxn but I am not completely sure about this point
because I didn't hear of any concrete use case.

Anyone else would like to weigh in here?

--
With Regards,
Amit Kapila.

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#6)
Re: [PATCH] Provide more information to filter_prepare

On Sat, Mar 13, 2021 at 3:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Mar 11, 2021 at 2:44 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 11.03.21 04:58, Amit Kapila wrote:

But this happens when we are decoding prepare, so it is clear that the
transaction is prepared, why any additional check?

An output plugin cannot assume the transaction is still prepared and
uncommitted at the point in time it gets to decode the prepare.
Therefore, the transaction may or may not be still in progress.
However, my point is that the xid is the more generally useful
identifier than the gid.

What in this can't be done with GID and how XID can achieve it?

It's a convenience. Of course, an output plugin could lookup the xid
via the gid. But why force it to have to do that when the xid would be
so readily available?

I am not suggesting doing any such look-up. It is just that the use of
additional parameter(s) for deciding whether to decode at prepare time
or to decode later as a regular one-phase transaction is not clear to
me. Now, it is possible that your argument is right that passing
additional information gives flexibility to plugin authors and we
should just do what you are saying or maybe go even a step further and
pass ReorderBufferTxn but I am not completely sure about this point
because I didn't hear of any concrete use case.

During a discussion of GID's in the nearby thread [1]/messages/by-id/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com, it came up that
the replication solutions might want to generate a different GID based
on xid for two-phase transactions, so it seems this patch has a
use-case.

Markus, feel free to update the docs, you might want to mention about
use-case of XID. Also, feel free to add an open item on PG-14 Open
Items page [2]https://wiki.postgresql.org/wiki/PostgreSQL_14_Open_Items.

[1]: /messages/by-id/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
[2]: https://wiki.postgresql.org/wiki/PostgreSQL_14_Open_Items

--
With Regards,
Amit Kapila.

#8Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#7)
1 attachment(s)
Re: [PATCH] Provide more information to filter_prepare

Hello Amit,

On 21.03.21 11:53, Amit Kapila wrote:

During a discussion of GID's in the nearby thread [1], it came up that
the replication solutions might want to generate a different GID based
on xid for two-phase transactions, so it seems this patch has a
use-case.

thank you for reconsidering this patch. I updated it to include the
required adjustments to the documentation. Please review.

Markus, feel free to update the docs, you might want to mention about
use-case of XID. Also, feel free to add an open item on PG-14 Open
Items page [2].

Yes, will add.

Regards

Markus

Attachments:

0001-add-xid-arg-to-filter_prepare_v2.patchtext/x-patch; charset=UTF-8; name=0001-add-xid-arg-to-filter_prepare_v2.patchDownload
From: Markus Wanner <markus.wanner@enterprisedb.com>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 33 +++++++++++++++--------
 src/backend/replication/logical/decode.c  | 17 +++++++-----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 42 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..16a819a1004 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,29 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  Note however that it could be used multiple times,
+       losing its uniqueness.  Therefore, other systems combine
+       the <parameter>xid</parameter> with a node identifier to form a
+       globally unique transaction identifier.  The later <command>COMMIT
+       PREPARED</command> or <command>ROLLBACK PREPARED</command> carries both
+       identifiers again, providing an output plugin the choice of what to
+       use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback has to provide the same static answer for a given pair
+       of <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1228,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

#9Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Markus Wanner (#8)
1 attachment(s)
Re: [PATCH] Provide more information to filter_prepare

On 22.03.21 09:50, Markus Wanner wrote:

thank you for reconsidering this patch.  I updated it to include the
required adjustments to the documentation.  Please review.

I tweaked the wording in the docs a bit, resulting in a v3 of this patch.

Regards

Markus

Attachments:

0001-add-xid-arg-to-filter_prepare_v3.patchtext/x-patch; charset=UTF-8; name=0001-add-xid-arg-to-filter_prepare_v3.patchDownload
From: Markus Wanner <markus@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 34 +++++++++++++++--------
 src/backend/replication/logical/decode.c  | 17 ++++++++----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..f3ac84aa85a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  However, reuse of the same <parameter>gid</parameter>
+       for example by a downstream node using multiple subscriptions may lead
+       to it not being a unique identifier.  Therefore, other systems combine
+       the <parameter>xid</parameter> with a node identifier to form a
+       globally unique transaction identifier.  The later <command>COMMIT
+       PREPARED</command> or <command>ROLLBACK PREPARED</command> carries both
+       identifiers, providing an output plugin the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked several times per transaction to decode and
+       must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

#10Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#9)
Re: [PATCH] Provide more information to filter_prepare

On Thu, Mar 25, 2021 at 2:07 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 22.03.21 09:50, Markus Wanner wrote:

thank you for reconsidering this patch. I updated it to include the
required adjustments to the documentation. Please review.

I tweaked the wording in the docs a bit, resulting in a v3 of this patch.

One minor comment:
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked several times per transaction to decode and
+       must provide the same static answer for a given pair of

Why do you think that this callback can be invoked several times per
transaction? I think it could be called at most two times, once at
prepare time, then at commit or rollback time. So, I think using
'multiple' instead of 'several' times is better.

--
With Regards,
Amit Kapila.

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#10)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 11:42 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Mar 25, 2021 at 2:07 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 22.03.21 09:50, Markus Wanner wrote:

thank you for reconsidering this patch. I updated it to include the
required adjustments to the documentation. Please review.

I tweaked the wording in the docs a bit, resulting in a v3 of this patch.

One minor comment:
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked several times per transaction to decode and
+       must provide the same static answer for a given pair of

Why do you think that this callback can be invoked several times per
transaction? I think it could be called at most two times, once at
prepare time, then at commit or rollback time. So, I think using
'multiple' instead of 'several' times is better.

+       to it not being a unique identifier.  Therefore, other systems combine
+       the <parameter>xid</parameter> with a node identifier to form a
+       globally unique transaction identifier.

What exactly is the node identifier here? Is it a publisher or
subscriber node id? We might want to be a bit more explicit here?

--
With Regards,
Amit Kapila.

#12Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#11)
1 attachment(s)
Re: [PATCH] Provide more information to filter_prepare

On 29.03.21 08:23, Amit Kapila wrote:

On Mon, Mar 29, 2021 at 11:42 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

Why do you think that this callback can be invoked several times per
transaction? I think it could be called at most two times, once at
prepare time, then at commit or rollback time. So, I think using
'multiple' instead of 'several' times is better.

Thank you for reviewing.

That's fine with me, I just wanted to provide an explanation for why the
callback needs to be stable. (I would not want to limit us in the docs
to guarantee it is called only twice. 'multiple' sounds generic enough,
I changed it to that word.)

What exactly is the node identifier here? Is it a publisher or
subscriber node id? We might want to be a bit more explicit here?

Good point. I clarified this to speak of the origin node (given this is
not necessarily the direct provider when using chained replication).

An updated patch is attached.

Regards

Markus

Attachments:

0001-add-xid-arg-to-filter_prepare_v4.patchtext/x-patch; charset=UTF-8; name=0001-add-xid-arg-to-filter_prepare_v4.patchDownload
From: Markus Wanner <markus@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output
 plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 34 +++++++++++++++--------
 src/backend/replication/logical/decode.c  | 17 ++++++++----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..f3ac84aa85a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  However, reuse of the same <parameter>gid</parameter>
+       for example by a downstream node using multiple subscriptions may lead
+       to it not being a unique identifier.  Therefore, other systems combine
+       the <parameter>xid</parameter> with a node identifier to form a
+       globally unique transaction identifier.  The later <command>COMMIT
+       PREPARED</command> or <command>ROLLBACK PREPARED</command> carries both
+       identifiers, providing an output plugin the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked several times per transaction to decode and
+       must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

#13Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#11)
1 attachment(s)
Re: [PATCH] Provide more information to filter_prepare

Sorry, git tricked me. Here's the patch including actual changes.

Regards

Markus

Attachments:

0001-add-xid-arg-to-filter_prepare_v5.patchtext/x-patch; charset=UTF-8; name=0001-add-xid-arg-to-filter_prepare_v5.patchDownload
From: Markus Wanner <markus@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 35 ++++++++++++++++-------
 src/backend/replication/logical/decode.c  | 17 +++++++----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 44 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..84717ae93e5 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,31 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  However, reuse of the same <parameter>gid</parameter>
+       for example by a downstream node using multiple subscriptions may lead
+       to it not being a unique identifier.  Therefore, other systems combine
+       the <parameter>xid</parameter> with an identifier of the origin node to
+       form a globally unique transaction identifier.  The later
+       <command>COMMIT PREPARED</command> or <command>ROLLBACK
+       PREPARED</command> carries both identifiers, providing an output plugin
+       the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked multiple times per transaction to decode
+       and must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1230,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

#14Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#12)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 12:57 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 29.03.21 08:23, Amit Kapila wrote:

On Mon, Mar 29, 2021 at 11:42 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

What exactly is the node identifier here? Is it a publisher or
subscriber node id? We might want to be a bit more explicit here?

Good point. I clarified this to speak of the origin node (given this is
not necessarily the direct provider when using chained replication).

This might or might not be valid for all logical replication solutions
but in the publisher-subscriber model, it would easily lead to
duplicate identifiers and block the replication. For example, when
there are multiple subscriptions (say - 2) for multiple publications
(again say-2), the two subscriptions are on Node-B and two
publications are on Node-A. Say both publications are for different
tables tab-1 and tab-2. Now, a prepared transaction involving
operation on both tables will generate the same GID. This will block
forever if someone has set synchronous_standby_names for both
subscriptions because Prepare won't finish till both the subscribers
prepare the transaction and due to conflict one of the subscriber will
never finish the prepare. I thought it might be better to use
subscriber-id (or unique replication-origin-id for a subscription) and
the origin node's xid as that will minimize the chances of any such
collision. We can reach this situation if the user prepares the
transaction with the same name as we have generated but we can suggest
user not to do this or we can generate an internal prepared
transaction name starting with pg_* and disallow prepared transaction
names from the user starting with pg_ as we do in some other cases.

--
With Regards,
Amit Kapila.

#15Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#14)
Re: [PATCH] Provide more information to filter_prepare

On 29.03.21 11:13, Amit Kapila wrote:

This might or might not be valid for all logical replication solutions
but in the publisher-subscriber model, it would easily lead to
duplicate identifiers and block the replication. For example, when
there are multiple subscriptions (say - 2) for multiple publications
(again say-2), the two subscriptions are on Node-B and two
publications are on Node-A. Say both publications are for different
tables tab-1 and tab-2. Now, a prepared transaction involving
operation on both tables will generate the same GID.

I think you are misunderstanding. This is about a globally unique
identifier for a transaction, which has nothing to do with a GID used to
prepare a transaction. This *needs* to be the same for what logical is
the same transaction.

What GID a downsteam subscriber uses when receiving messages from some
non-Postgres-provided output plugin clearly is out of scope for this
documentation. The point is to highlight how the xid can be useful for
filter_prepare. And that serves transaction identification purposes.

Regards

Markus

#16Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#15)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 3:11 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 29.03.21 11:13, Amit Kapila wrote:

This might or might not be valid for all logical replication solutions
but in the publisher-subscriber model, it would easily lead to
duplicate identifiers and block the replication. For example, when
there are multiple subscriptions (say - 2) for multiple publications
(again say-2), the two subscriptions are on Node-B and two
publications are on Node-A. Say both publications are for different
tables tab-1 and tab-2. Now, a prepared transaction involving
operation on both tables will generate the same GID.

I think you are misunderstanding. This is about a globally unique
identifier for a transaction, which has nothing to do with a GID used to
prepare a transaction. This *needs* to be the same for what logical is
the same transaction.

Okay, but just in the previous sentence ("However, reuse of the same
<parameter>gid</parameter> for example by a downstream node using
multiple subscriptions may lead to it not being a unique
identifier."), you have explained how sending a GID identifier can
lead to a non-unique identifier for multiple subscriptions. And then
in the next line, the way you are suggesting to generate GID by use of
XID seems to have the same problem, so that caused confusion for me.

--
With Regards,
Amit Kapila.

#17Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#16)
1 attachment(s)
Re: [PATCH] Provide more information to filter_prepare

On 29.03.21 11:53, Amit Kapila wrote:

Okay, but just in the previous sentence ("However, reuse of the same
<parameter>gid</parameter> for example by a downstream node using
multiple subscriptions may lead to it not being a unique
identifier."), you have explained how sending a GID identifier can
lead to a non-unique identifier for multiple subscriptions.

Maybe the example of the downstream node is a bad one. I understand
that can cause confusion. Let's leave away that example and focus on
the output plugin side. v6 attached.

And then
in the next line, the way you are suggesting to generate GID by use of
XID seems to have the same problem, so that caused confusion for me.

It was not intended as a suggestion for how to generate GIDs at all.
Hopefully leaving away that bad example will make it less likely to
appear related to GID generation on the subscriber.

Regards

Markus

Attachments:

0001-add-xid-arg-to-filter_prepare_v6.patchtext/x-patch; charset=UTF-8; name=0001-add-xid-arg-to-filter_prepare_v6.patchDownload
From: Markus Wanner <markus@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 34 +++++++++++++++--------
 src/backend/replication/logical/decode.c  | 17 ++++++++----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..57f4165d06b 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  However, reuse of the same <parameter>gid</parameter>
+       may lead to it not being a unique identifier.  Therefore, other systems
+       combine the <parameter>xid</parameter> with an identifier of the origin
+       node to form a globally unique transaction identifier.  The later
+       <command>COMMIT PREPARED</command> or <command>ROLLBACK
+       PREPARED</command> carries both identifiers, providing an output plugin
+       the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked multiple times per transaction to decode
+       and must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

#18vignesh C
vignesh21@gmail.com
In reply to: Markus Wanner (#17)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 3:30 PM Markus Wanner <
markus.wanner@enterprisedb.com> wrote:

On 29.03.21 11:53, Amit Kapila wrote:

Okay, but just in the previous sentence ("However, reuse of the same
<parameter>gid</parameter> for example by a downstream node using
multiple subscriptions may lead to it not being a unique
identifier."), you have explained how sending a GID identifier can
lead to a non-unique identifier for multiple subscriptions.

Maybe the example of the downstream node is a bad one. I understand
that can cause confusion. Let's leave away that example and focus on
the output plugin side. v6 attached.

And then
in the next line, the way you are suggesting to generate GID by use of
XID seems to have the same problem, so that caused confusion for me.

It was not intended as a suggestion for how to generate GIDs at all.
Hopefully leaving away that bad example will make it less likely to
appear related to GID generation on the subscriber.

Thanks for the updated patch. Patch applies neatly, make check and make
check-world passes. The code changes look fine to me.

In documentation, I did not understand the bold contents in the below
documentation:
+       The <parameter>ctx</parameter> parameter has the same contents as
for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to
identify
+       the transaction.  For some systems, the <parameter>gid</parameter>
may
+       be sufficient.  However, reuse of the same
<parameter>gid</parameter>
+       may lead to it not being a unique identifier.
*Therefore, other systems+       combine the <parameter>xid</parameter>
with an identifier of the origin+       node to form a globally unique
transaction identifier.*  The later
+       <command>COMMIT PREPARED</command> or <command>ROLLBACK
+       PREPARED</command> carries both identifiers, providing an output
plugin
+       the choice of what to use.

I know that in publisher/subscriber decoding, the prepared transaction
gid will be modified to either pg_xid_origin or pg_xid_subid(it is still
being discussed in logical decoding of two-phase transactions thread, it
is in not yet completely finalized) to solve the subscriber getting the
same gid name.
But in prepare_filter_cb callback, by stating "other systems ..." it is not
very clear who will change the GID. Are we referring to
publisher/subscriber decoding?

Regards,
Vignesh

#19Markus Wanner
markus.wanner@enterprisedb.com
In reply to: vignesh C (#18)
Re: [PATCH] Provide more information to filter_prepare

On 29.03.21 12:18, vignesh C wrote:

But in prepare_filter_cb callback, by stating "other systems ..." it is
not very clear who will change the GID. Are we referring to
publisher/subscriber decoding?

Thanks for your feedback. This is not about GIDs at all, but just about
identifying a transaction. I'm out of ideas on how else to phrase that.
Any suggestion?

Maybe we should not try to give examples and reference other systems,
but just leave it at:

The <parameter>ctx</parameter> parameter has the same contents as for
the other callbacks. The parameters <parameter>xid</parameter>
and <parameter>gid</parameter> provide two different ways to identify
the transaction. The later <command>COMMIT PREPARED</command> or
<command>ROLLBACK PREPARED</command> carries both identifiers,
providing an output plugin the choice of what to use.

That is sufficient an explanation in my opinion. What do you think?

Regards

Markus

#20vignesh C
vignesh21@gmail.com
In reply to: Markus Wanner (#19)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 4:22 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 29.03.21 12:18, vignesh C wrote:

But in prepare_filter_cb callback, by stating "other systems ..." it is
not very clear who will change the GID. Are we referring to
publisher/subscriber decoding?

Thanks for your feedback. This is not about GIDs at all, but just about
identifying a transaction. I'm out of ideas on how else to phrase that.
Any suggestion?

Maybe we should not try to give examples and reference other systems,
but just leave it at:

The <parameter>ctx</parameter> parameter has the same contents as for
the other callbacks. The parameters <parameter>xid</parameter>
and <parameter>gid</parameter> provide two different ways to identify
the transaction. The later <command>COMMIT PREPARED</command> or
<command>ROLLBACK PREPARED</command> carries both identifiers,
providing an output plugin the choice of what to use.

That is sufficient an explanation in my opinion. What do you think?

The above content looks sufficient to me. As we already explain this
earlier "The optional filter_prepare_cb callback is called to
determine whether data that is part of the current two-phase commit
transaction should be considered for decoding at this prepare stage or
later as a regular one-phase transaction at COMMIT PREPARED time."
which helps in understanding what filter_prepare_cb means.
Thoughts?

Regards,
Vignesh

#21Markus Wanner
markus.wanner@enterprisedb.com
In reply to: vignesh C (#20)
1 attachment(s)
Re: [PATCH] Provide more information to filter_prepare

On 29.03.21 13:04, vignesh C wrote:

The above content looks sufficient to me.

Good, thanks. Based on that, I'm adding v7 of the patch.

Regards

Markus

Attachments:

0001-add-concurrent_abort-callback_v7.patchtext/x-patch; charset=UTF-8; name=0001-add-concurrent_abort-callback_v7.patchDownload
From: Markus Wanner <markus@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c     |  4 +++-
 doc/src/sgml/logicaldecoding.sgml         | 29 ++++++++++++++---------
 src/backend/replication/logical/decode.c  | 17 ++++++++-----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 38 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..da23f89ca32 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,25 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  The later <command>COMMIT PREPARED</command> or
+       <command>ROLLBACK PREPARED</command> carries both identifiers,
+       providing an output plugin the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked multiple times per transaction to decode
+       and must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1224,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

#22vignesh C
vignesh21@gmail.com
In reply to: Markus Wanner (#21)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 4:46 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 29.03.21 13:04, vignesh C wrote:

The above content looks sufficient to me.

Good, thanks. Based on that, I'm adding v7 of the patch.

Thanks for the updated patch.

@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+                                                const char *gid)
 {
        if (strstr(gid, "_nodecode") != NULL)
                return true;

Currently there is one test to filter prepared txn with gid having
"_nodecode". I'm not sure if we can have any tests based on xid, I'm
sure you might have thought about it, Have you intentionally not
written any tests as it will be difficult to predict the xid. I just
wanted to confirm my understanding.

Regards,
Vignesh

#23Markus Wanner
markus.wanner@enterprisedb.com
In reply to: vignesh C (#22)
Re: [PATCH] Provide more information to filter_prepare

On 29.03.21 14:00, vignesh C wrote:

Have you intentionally not
written any tests as it will be difficult to predict the xid. I just
wanted to confirm my understanding.

Yeah, that's the reason this is hard to test this with a regression
test. It might be possible to come up with a TAP test for this, but I
doubt that's worth it, as it's a pretty trivial addition.

Regards

Markus

#24vignesh C
vignesh21@gmail.com
In reply to: Markus Wanner (#23)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 5:40 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 29.03.21 14:00, vignesh C wrote:

Have you intentionally not
written any tests as it will be difficult to predict the xid. I just
wanted to confirm my understanding.

Yeah, that's the reason this is hard to test this with a regression
test. It might be possible to come up with a TAP test for this, but I
doubt that's worth it, as it's a pretty trivial addition.

Thanks, I don't have any more comments.

Regards,
Vignesh

#25Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#21)
Re: [PATCH] Provide more information to filter_prepare

On Mon, Mar 29, 2021 at 4:46 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 29.03.21 13:04, vignesh C wrote:

The above content looks sufficient to me.

Good, thanks. Based on that, I'm adding v7 of the patch.

Pushed. In the last version, you have named the patch incorrectly.

--
With Regards,
Amit Kapila.

#26Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#25)
Re: [PATCH] Provide more information to filter_prepare

On 30.03.21 10:33, Amit Kapila wrote:

Pushed. In the last version, you have named the patch incorrectly.

Thanks a lot, Amit!

Regards

Markus