[PATCH] add concurrent_abort callback for output plugin

Started by Markus Wanneralmost 5 years ago31 messages
#1Markus Wanner
markus.wanner@enterprisedb.com
1 attachment(s)

Hi,

here is another tidbit from our experience with using logical decoding.
The attached patch adds a callback to notify the output plugin of a
concurrent abort. I'll continue to describe the problem in more detail
and how this additional callback solves it.

Streamed transactions as well as two-phase commit transactions may get
decoded before they finish. At the point the begin_cb is invoked and
first changes are delivered to the output plugin, it is not necessarily
known whether the transaction will commit or abort.

This leads to the possibility of the transaction getting aborted
concurrent to logical decoding. In that case, it is likely for the
decoder to error on a catalog scan that conflicts with the abort of the
transaction. The reorderbuffer sports a PG_CATCH block to cleanup.
However, it does not currently inform the output plugin. From its point
of view, the transaction is left dangling until another one comes along
or until the final ROLLBACK or ROLLBACK PREPARED record from WAL gets
decoded. Therefore, what the output plugin might see in this case is:

* filter_prepare_cb (txn A)
* begin_prepare_cb (txn A)
* apply_change (txn A)
* apply_change (txn A)
* apply_change (txn A)
* begin_cb (txn B)

In other words, in this example, only the begin_cb of the following
transaction implicitly tells the output plugin that txn A could not be
fully decoded. And there's no upper time boundary on when that may
happen. (It could also be another filter_prepare_cb, if the subsequent
transaction happens to be a two-phase transaction as well. Or an
explicit rollback_prepared_cb or stream_abort if there's no other
transaction in between.)

An alternative and arguably cleaner approach for streamed transactions
may be to directly invoke stream_abort. However, the lsn argument
passed could not be that of the abort record, as that's not known at the
point in time of the concurrent abort. Plus, this seems like a bad fit
for two-phase commit transactions.

Again, this callback is especially important for output plugins that
invoke further actions on downstream nodes that delay the COMMIT
PREPARED of a transaction upstream, e.g. until prepared on other nodes.
Up until now, the output plugin has no way to learn about a concurrent
abort of the currently decoded (2PC or streamed) transaction (perhaps
short of continued polling on the transaction status).

I also think it generally improves the API by allowing the output plugin
to rely on such a callback, rather than having to implicitly deduce this
from other callbacks.

Thoughts or comments? If this is agreed on, I can look into adding
tests (concurrent aborts are not currently covered, it seems).

Regards

Markus

Attachments:

0001-add-concurrent_abort-callback_v1.patchtext/x-patch; charset=UTF-8; name=0001-add-concurrent_abort-callback_v1.patchDownload
From: Markus Wanner <markus.wanner@enterprisedb.com>
Date: Thu, 11 Feb 2021 13:49:55 +0100
Subject: [PATCH] Add a concurrent_abort callback for the output plugin.

Logical decoding of a prepared or streamed transaction may fail if the
transaction got aborted after invoking the begin_cb (and likely having
sent some changes via change_cb), but before the necessary catalog scans
could be performed.  In this case, decoding the transaction is neither
possible nor necessary (given it got rolled back).

To give the output plugin a chance to cleanup the aborted transaction as
well, introduce a concurrent_abort callback.  It is only ever invoked to
terminate unfinished transactions, not for normal aborts.

Adjust contrib/test_decoding to define a concurrent_abort callback.
---
 contrib/test_decoding/test_decoding.c         | 29 ++++++++++++
 doc/src/sgml/logicaldecoding.sgml             | 43 +++++++++++++++++-
 src/backend/replication/logical/logical.c     | 45 +++++++++++++++++++
 .../replication/logical/reorderbuffer.c       |  6 +++
 src/include/replication/output_plugin.h       |  9 ++++
 src/include/replication/reorderbuffer.h       |  7 +++
 6 files changed, 138 insertions(+), 1 deletion(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..a5dd80e0957 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -83,6 +83,9 @@ static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn,
 								  XLogRecPtr prepare_lsn);
+static void pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn, bool streaming,
+										   XLogRecPtr lsn);
 static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
 										  ReorderBufferTXN *txn,
 										  XLogRecPtr commit_lsn);
@@ -137,6 +140,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pg_decode_change;
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->concurrent_abort_cb = pg_decode_concurrent_abort_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
@@ -386,6 +390,31 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+							   ReorderBufferTXN *txn, bool streaming,
+							   XLogRecPtr lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool		xact_wrote_changes = txndata->xact_wrote_changes;
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	if (data->include_xids)
+		appendStringInfo(ctx->out, "<concurrent abort> %u", txn->xid);
+	else
+		appendStringInfoString(ctx->out, "<concurrent abort>");
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 /* COMMIT PREPARED callback */
 static void
 pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..c44dee55a02 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -441,6 +441,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
     LogicalDecodeBeginPrepareCB begin_prepare_cb;
     LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeConcurrentAbortCB concurrent_abort_cb;
     LogicalDecodeCommitPreparedCB commit_prepared_cb;
     LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
@@ -459,7 +460,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>,
      <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
-     and <function>shutdown_cb</function> are optional.
+     <function>shutdown_cb</function>, and
+     <function>concurrent_abort_cb</function> are optional.
      If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
@@ -847,6 +849,45 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-concurrent-abort">
+     <title>Concurrent Abort Callback</title>
+
+     <para>
+      The optional <function>concurrent_abort_cb</function> callback is called
+      whenever a transaction got aborted in the middle of sending its changes
+      to the output plugin.  This can happen in the case of decoding a
+      two-phase commit transaction or with streaming enabled, and only if the
+      transaction got aborted after its <function>begin_cb</function> has been
+      invoked.  This is not ever the case for normal single-phase transactions
+      that are not streamed.
+<programlisting>
+typedef bool (*LogicalDecodeConcurrentAbortCB) (struct LogicalDecodingContext *ctx,
+                                                ReorderBufferTXN *txn,
+                                                bool streaming,
+                                                XLogRecPtr abort_lsn);
+</programlisting>
+      The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+      have the same contents as for the <function>begin_cb</function>
+      and <function>commit_cb</function> callbacks.
+      The <parameter>streaming</parameter> parameter indicates whether this
+      happened during streaming or at the <command>PREPARE</command> of a
+      two-phase transaction.  In case of a two-phase transaction,
+      the <parameter>abort_lsn</parameter> parameter additionally points to
+      the end of the prepare record in WAL, it is invalid for streaming
+      transactions.
+     </para>
+
+     <note>
+      <para>
+       This is just an early indication for the output plugin.  For
+       concurrently aborted transactions, either
+       the <function>rollback_prepared_cb</function> or
+       the <function>stream_abort_cb</function> callback will be invoked once
+       the decoder reaches the corresponding WAL records.
+      </para>
+     </note>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-commit-prepared">
      <title>Transaction Commit Prepared Callback</title>
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..4b99fe5cc4e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -66,6 +66,8 @@ static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *t
 									   XLogRecPtr commit_lsn);
 static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 										 XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
+static void concurrent_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+										bool streaming, XLogRecPtr end_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -276,6 +278,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->prepare = prepare_cb_wrapper;
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+	ctx->reorder->concurrent_abort = concurrent_abort_cb_wrapper;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -1005,6 +1008,48 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+concurrent_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							bool streaming, XLogRecPtr end_lsn)
+{
+	LogicalDecodingContext	   *ctx = cache->private_data;
+	LogicalErrorCallbackState	state;
+	ErrorContextCallback		errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/*
+	 * We're only supposed to call this when either two-phase commits are
+	 * supported or if we're streaming.
+	 */
+	Assert(ctx->twophase || streaming);
+
+	/* The callback is optional, invoke only if provided. */
+	if (ctx->callbacks.concurrent_abort_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "concurrent_abort";
+	state.report_location = txn->final_lsn; /* beginning of record attempted to
+											   decode */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.concurrent_abort_cb(ctx, txn, streaming, end_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c291b05a423..a6d044b870b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2488,6 +2488,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			errdata = NULL;
 			curtxn->concurrent_abort = true;
 
+			/*
+			 * Call the cleanup hook to inform the output plugin that the
+			 * transaction just started had to be aborted.
+			 */
+			rb->concurrent_abort(rb, txn, streaming, commit_lsn);
+
 			/* Reset the TXN so that it is allowed to stream remaining data. */
 			ReorderBufferResetTXN(rb, txn, snapshot_now,
 								  command_id, prev_lsn,
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..e122dc48139 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -77,6 +77,14 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+/*
+ * Called for decoding errors due to concurent aborts.
+ */
+typedef void (*LogicalDecodeConcurrentAbortCB) (struct LogicalDecodingContext *ctx,
+												ReorderBufferTXN *txn,
+												bool streaming,
+												XLogRecPtr end_lsn);
+
 /*
  * Called for the generic logical decoding messages.
  */
@@ -227,6 +235,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodePrepareCB prepare_cb;
 	LogicalDecodeCommitPreparedCB commit_prepared_cb;
 	LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+	LogicalDecodeConcurrentAbortCB concurrent_abort_cb;
 
 	/* streaming of changes */
 	LogicalDecodeStreamStartCB stream_start_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6ab..153474baa84 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -460,6 +460,12 @@ typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
 												 XLogRecPtr prepare_end_lsn,
 												 TimestampTz prepare_time);
 
+/* concurrent abort callback signature */
+typedef void (*ReorderBufferConcurrentAbortCB) (ReorderBuffer *rb,
+												ReorderBufferTXN *txn,
+												bool streaming,
+												XLogRecPtr end_lsn);
+
 /* start streaming transaction callback signature */
 typedef void (*ReorderBufferStreamStartCB) (
 											ReorderBuffer *rb,
@@ -559,6 +565,7 @@ struct ReorderBuffer
 	ReorderBufferPrepareCB prepare;
 	ReorderBufferCommitPreparedCB commit_prepared;
 	ReorderBufferRollbackPreparedCB rollback_prepared;
+	ReorderBufferConcurrentAbortCB concurrent_abort;
 
 	/*
 	 * Callbacks to be called when streaming a transaction.
-- 
2.30.2

#2Andres Freund
andres@anarazel.de
In reply to: Markus Wanner (#1)
Re: [PATCH] add concurrent_abort callback for output plugin

Hi,

On 2021-03-25 10:07:28 +0100, Markus Wanner wrote:

This leads to the possibility of the transaction getting aborted concurrent
to logical decoding. In that case, it is likely for the decoder to error on
a catalog scan that conflicts with the abort of the transaction. The
reorderbuffer sports a PG_CATCH block to cleanup.

FWIW, I am seriously suspicuous of the code added as part of
7259736a6e5b7c75 and plan to look at it after the code freeze. I can't
really see this code surviving as is. The tableam.h changes, the bsyscan
stuff, ... Leaving correctness aside, the code bloat and performance
affects alone seems problematic.

Again, this callback is especially important for output plugins that invoke
further actions on downstream nodes that delay the COMMIT PREPARED of a
transaction upstream, e.g. until prepared on other nodes. Up until now, the
output plugin has no way to learn about a concurrent abort of the currently
decoded (2PC or streamed) transaction (perhaps short of continued polling on
the transaction status).

You may have only meant it as a shorthand: But imo output plugins have
absolutely no business "invoking actions downstream".

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c291b05a423..a6d044b870b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2488,6 +2488,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
errdata = NULL;
curtxn->concurrent_abort = true;
+			/*
+			 * Call the cleanup hook to inform the output plugin that the
+			 * transaction just started had to be aborted.
+			 */
+			rb->concurrent_abort(rb, txn, streaming, commit_lsn);
+
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
command_id, prev_lsn,

I don't think this would be ok, errors thrown in the callback wouldn't
be handled as they would be in other callbacks.

Greetings,

Andres Freund

#3Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Andres Freund (#2)
Re: [PATCH] add concurrent_abort callback for output plugin

On 25.03.21 21:21, Andres Freund wrote:

... the code added as part of 7259736a6e5b7c75 ...

That's the streaming part, which can be thought of as a more general
variant of the two-phase decoding in that it allows multiple "flush
points" (invoking ReorderBufferProcessTXN). Unlike the PREPARE of a
two-phase commit, where the reorderbuffer can be sure there's no further
change to be processed after the PREPARE. Nor is there any invocation
of ReorderBufferProcessTXN before that fist one at PREPARE time. With
that in mind, I'm surprised support for streaming got committed before
2PC. It clearly has different use cases, though.

However, I'm sure your inputs on how to improve and cleanup the
implementation will be appreciated. The single tiny problem this patch
addresses is the same for 2PC and streaming decoding: the output plugin
currently has no way to learn about a concurrent abort of a transaction
still being decoded, at the time this happens.

Both, 2PC and streaming do require the reorderbuffer to forward changes
(possibly) prior to the transaction's commit. That's the whole point of
these two features. Therefore, I don't think we can get around
concurrent aborts.

You may have only meant it as a shorthand: But imo output plugins have
absolutely no business "invoking actions downstream".

From my point of view, that's the raison d'être for an output plugin.
Even if it does so merely by forwarding messages. But yeah, of course a
whole bunch of other components and changes are needed to implement the
kind of global two-phase commit system I tried to describe.

I'm open to suggestions on how to reference that use case.

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c291b05a423..a6d044b870b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2488,6 +2488,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
errdata = NULL;
curtxn->concurrent_abort = true;
+			/*
+			 * Call the cleanup hook to inform the output plugin that the
+			 * transaction just started had to be aborted.
+			 */
+			rb->concurrent_abort(rb, txn, streaming, commit_lsn);
+
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
command_id, prev_lsn,

I don't think this would be ok, errors thrown in the callback wouldn't
be handled as they would be in other callbacks.

That's a good point. Maybe the CATCH block should only set a flag,
allowing for the callback to be invoked outside of it.

Regards

Markus my-callbacks-do-not-throw-error Wanner

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#1)
Re: [PATCH] add concurrent_abort callback for output plugin

On Thu, Mar 25, 2021 at 2:37 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

here is another tidbit from our experience with using logical decoding.
The attached patch adds a callback to notify the output plugin of a
concurrent abort. I'll continue to describe the problem in more detail
and how this additional callback solves it.

Streamed transactions as well as two-phase commit transactions may get
decoded before they finish. At the point the begin_cb is invoked and
first changes are delivered to the output plugin, it is not necessarily
known whether the transaction will commit or abort.

This leads to the possibility of the transaction getting aborted
concurrent to logical decoding. In that case, it is likely for the
decoder to error on a catalog scan that conflicts with the abort of the
transaction. The reorderbuffer sports a PG_CATCH block to cleanup.
However, it does not currently inform the output plugin. From its point
of view, the transaction is left dangling until another one comes along
or until the final ROLLBACK or ROLLBACK PREPARED record from WAL gets
decoded. Therefore, what the output plugin might see in this case is:

* filter_prepare_cb (txn A)
* begin_prepare_cb (txn A)
* apply_change (txn A)
* apply_change (txn A)
* apply_change (txn A)
* begin_cb (txn B)

In other words, in this example, only the begin_cb of the following
transaction implicitly tells the output plugin that txn A could not be
fully decoded. And there's no upper time boundary on when that may
happen. (It could also be another filter_prepare_cb, if the subsequent
transaction happens to be a two-phase transaction as well. Or an
explicit rollback_prepared_cb or stream_abort if there's no other
transaction in between.)

An alternative and arguably cleaner approach for streamed transactions
may be to directly invoke stream_abort. However, the lsn argument
passed could not be that of the abort record, as that's not known at the
point in time of the concurrent abort. Plus, this seems like a bad fit
for two-phase commit transactions.

Again, this callback is especially important for output plugins that
invoke further actions on downstream nodes that delay the COMMIT
PREPARED of a transaction upstream, e.g. until prepared on other nodes.
Up until now, the output plugin has no way to learn about a concurrent
abort of the currently decoded (2PC or streamed) transaction (perhaps
short of continued polling on the transaction status).

I think as you have noted that stream_abort or rollback_prepared will
be sent (the remaining changes in-between will be skipped) as we
decode them from WAL so it is not clear to me how it causes any delays
as opposed to where we don't detect concurrent abort say because after
that we didn't access catalog table.

--
With Regards,
Amit Kapila.

#5Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#4)
Re: [PATCH] add concurrent_abort callback for output plugin

On 26.03.21 04:28, Amit Kapila wrote:

I think as you have noted that stream_abort or rollback_prepared will
be sent (the remaining changes in-between will be skipped) as we
decode them from WAL

Yes, but as outlined, too late. Multiple other transactions may get
decoded until the decoder reaches the ROLLBACK PREPARED. Thus,
effectively, the output plugin currently needs to deduce that a
transaction got aborted concurrently from one out of half a dozen other
callbacks that may trigger right after that transaction, because it will
only get closed properly much later.

so it is not clear to me how it causes any delays
as opposed to where we don't detect concurrent abort say because after
that we didn't access catalog table.

You're assuming very little traffic, where the ROLLBACK ABORT follows
the PREPARE immediately in WAL. On a busy system, chances for that to
happen are rather low.

(I think the same is true for streaming and stream_abort being sent only
at the time the decoder reaches the ROLLBACK record in WAL. However, I
did not try. Unlike 2PC, where this actually bit me.)

Regards

Markus

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#5)
Re: [PATCH] add concurrent_abort callback for output plugin

On Fri, Mar 26, 2021 at 2:42 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 26.03.21 04:28, Amit Kapila wrote:

I think as you have noted that stream_abort or rollback_prepared will
be sent (the remaining changes in-between will be skipped) as we
decode them from WAL

Yes, but as outlined, too late. Multiple other transactions may get
decoded until the decoder reaches the ROLLBACK PREPARED. Thus,
effectively, the output plugin currently needs to deduce that a
transaction got aborted concurrently from one out of half a dozen other
callbacks that may trigger right after that transaction, because it will
only get closed properly much later.

so it is not clear to me how it causes any delays
as opposed to where we don't detect concurrent abort say because after
that we didn't access catalog table.

You're assuming very little traffic, where the ROLLBACK ABORT follows
the PREPARE immediately in WAL. On a busy system, chances for that to
happen are rather low.

No, I am not assuming that. I am just trying to describe you that it
is not necessary that we will be able to detect concurrent abort in
each and every case. Say if any transaction operates on one relation
and concurrent abort happens after first access of relation then we
won't access catalog and hence won't detect abort. In such cases, you
will get the abort only when it happens in WAL. So, why try to get
earlier in some cases when it is not guaranteed in every case. Also,
what will you do when you receive actual Rollback, may be the plugin
can throw it by checking in some way that it has already aborted the
transaction, if so, that sounds a bit awkward to me.

The other related thing is it may not be a good idea to finish the
transaction before we see its actual WAL record because after the
client (say subscriber) finishes xact, it sends the updated LSN
location based on which we update the slot LSNs from where it will
start decoding next time after restart, so by bad timing it might try
to decode the contents of same transaction but may be for
concurrent_aborts the plugin might arrange such that client won't send
updated LSN.

(I think the same is true for streaming and stream_abort being sent only
at the time the decoder reaches the ROLLBACK record in WAL. However, I
did not try.

Yes, both streaming and 2PC behaves in a similar way in this regard.

--
With Regards,
Amit Kapila.

#7Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#6)
Re: [PATCH] add concurrent_abort callback for output plugin

On 26.03.21 11:19, Amit Kapila wrote:

No, I am not assuming that. I am just trying to describe you that it
is not necessary that we will be able to detect concurrent abort in
each and every case.

Sure. Nor am I claiming that would be necessary or that the patch
changed anything about it.

As it stands, assuming the the output plugin basically just forwards the
events and the subscriber tries to replicate them as is, the following
would happen on the subscriber for a concurrently aborted two-phase
transaction:

* start a transaction (begin_prepare_cb)
* apply changes for it (change_cb)
* digress to other, unrelated transactions (leaving unspecified what
exactly happens to the opened transaction)
* attempt to rollback a transaction that has not ever been prepared
(rollback_prepared_cb)

The point of the patch is for the output plugin to get proper
transaction entry and exit callbacks. Even in the unfortunate case of a
concurrent abort. It offers the output plugin a clean way to learn that
the decoder stopped decoding for the current transaction and it won't
possibly see a prepare_cb for it (despite the decoder having passed the
PREPARE record in WAL).

The other related thing is it may not be a good idea to finish the
transaction

You're speaking subscriber side here. And yes, I agree, the subscriber
should not abort the transaction at a concurrent_abort. I never claimed
it should.

If you are curious, in our case I made the subscriber PREPARE the
transaction at its end when receiving a concurrent_abort notification,
so that the subscriber:

* can hop out of that started transaction and safely proceed
to process events for other transactions, and
* has the transaction in the appropriate state for processing the
subsequent rollback_prepared_cb, once that gets through

That's probably not ideal in the sense that subscribers do unnecessary
work. However, it pretty closely replicates the transaction's state as
it was on the origin at any given point in time (by LSN).

Regards

Markus

#8Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#7)
Re: [PATCH] add concurrent_abort callback for output plugin

On Fri, Mar 26, 2021 at 5:50 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 26.03.21 11:19, Amit Kapila wrote:

No, I am not assuming that. I am just trying to describe you that it
is not necessary that we will be able to detect concurrent abort in
each and every case.

Sure. Nor am I claiming that would be necessary or that the patch
changed anything about it.

As it stands, assuming the the output plugin basically just forwards the
events and the subscriber tries to replicate them as is, the following
would happen on the subscriber for a concurrently aborted two-phase
transaction:

* start a transaction (begin_prepare_cb)
* apply changes for it (change_cb)
* digress to other, unrelated transactions (leaving unspecified what
exactly happens to the opened transaction)
* attempt to rollback a transaction that has not ever been prepared
(rollback_prepared_cb)

The point of the patch is for the output plugin to get proper
transaction entry and exit callbacks. Even in the unfortunate case of a
concurrent abort. It offers the output plugin a clean way to learn that
the decoder stopped decoding for the current transaction and it won't
possibly see a prepare_cb for it (despite the decoder having passed the
PREPARE record in WAL).

The other related thing is it may not be a good idea to finish the
transaction

You're speaking subscriber side here. And yes, I agree, the subscriber
should not abort the transaction at a concurrent_abort. I never claimed
it should.

If you are curious, in our case I made the subscriber PREPARE the
transaction at its end when receiving a concurrent_abort notification,
so that the subscriber:

* can hop out of that started transaction and safely proceed
to process events for other transactions, and
* has the transaction in the appropriate state for processing the
subsequent rollback_prepared_cb, once that gets through

That's probably not ideal in the sense that subscribers do unnecessary
work.

Isn't it better to send prepare from the publisher in such a case so
that subscribers can know about it when rollback prepared arrives? I
think we have already done the same (sent prepare, exactly to handle
the case you have described above) for *streamed* transactions.

--
With Regards,
Amit Kapila.

#9Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#8)
Re: [PATCH] add concurrent_abort callback for output plugin

On 27.03.21 07:37, Amit Kapila wrote:

Isn't it better to send prepare from the publisher in such a case so
that subscribers can know about it when rollback prepared arrives?

That's exactly what this callback allows (among other options). It
provides a way for the output plugin to react to a transaction aborting
while it is being decoded. This would not be possible without this
additional callback.

Also note that I would like to retain the option to do some basic
protocol validity checks. Certain messages only make sense within a
transaction ('U'pdate, 'C'ommit). Others are only valid outside of a
transaction ('B'egin, begin_prepare_cb). This is only possible if the
output plugin has a callback for every entry into and exit out of a
transaction (being decoded). This used to be the case prior to 2PC
decoding and this patch re-establishes that.

I think we have already done the same (sent prepare, exactly to handle
the case you have described above) for *streamed* transactions.

Where can I find that? ISTM streaming transactions have the same issue:
the output plugin does not (or only implicitly) learn about a concurrent
abort of the transaction.

Regards

Markus

#10Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#9)
Re: [PATCH] add concurrent_abort callback for output plugin

On Mon, Mar 29, 2021 at 12:36 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 27.03.21 07:37, Amit Kapila wrote:

Isn't it better to send prepare from the publisher in such a case so
that subscribers can know about it when rollback prepared arrives?

That's exactly what this callback allows (among other options). It
provides a way for the output plugin to react to a transaction aborting
while it is being decoded. This would not be possible without this
additional callback.

You don't need an additional callback for that if we do what I am
suggesting above.

Also note that I would like to retain the option to do some basic
protocol validity checks. Certain messages only make sense within a
transaction ('U'pdate, 'C'ommit). Others are only valid outside of a
transaction ('B'egin, begin_prepare_cb). This is only possible if the
output plugin has a callback for every entry into and exit out of a
transaction (being decoded). This used to be the case prior to 2PC
decoding and this patch re-establishes that.

I think we have already done the same (sent prepare, exactly to handle
the case you have described above) for *streamed* transactions.

Where can I find that? ISTM streaming transactions have the same issue:
the output plugin does not (or only implicitly) learn about a concurrent
abort of the transaction.

One is you can try to test it, otherwise, there are comments atop
DecodePrepare() ("Note that we don't skip prepare even if have
detected concurrent abort because it is quite possible that ....")
which explains this.

--
With Regards,
Amit Kapila.

#11Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#10)
Re: [PATCH] add concurrent_abort callback for output plugin

On 29.03.21 11:33, Amit Kapila wrote:

You don't need an additional callback for that if we do what I am
suggesting above.

Ah, are you suggesting a different change, then? To make two-phase
transactions always send PREPARE even if concurrently aborted? In that
case, sorry, I misunderstood.

I'm perfectly fine with that approach as well (even though it removes
flexibility compared to the concurrent abort callback, as the comment
above DecodePrepare indicates, i.e. "not impossible to optimize the
concurrent abort case").

One is you can try to test it, otherwise, there are comments atop
DecodePrepare() ("Note that we don't skip prepare even if have
detected concurrent abort because it is quite possible that ....")
which explains this.

Thanks for this pointer, very helpful.

Regards

Markus

#12Ajin Cherian
itsajin@gmail.com
In reply to: Amit Kapila (#10)
Re: [PATCH] add concurrent_abort callback for output plugin

On Mon, Mar 29, 2021 at 8:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Mar 29, 2021 at 12:36 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 27.03.21 07:37, Amit Kapila wrote:

Isn't it better to send prepare from the publisher in such a case so
that subscribers can know about it when rollback prepared arrives?

Nice catch, Markus.

Interesting suggestion Amit. Let me try and code this.

regards,
Ajin Cherian
Fujitsu Australia

#13Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Ajin Cherian (#12)
Re: [PATCH] add concurrent_abort callback for output plugin

On 29.03.21 13:02, Ajin Cherian wrote:

Nice catch, Markus.
Interesting suggestion Amit. Let me try and code this.

Thanks, Ajin. Please consider this concurrent_abort callback as well.
I think it provides more flexibility for the output plugin and I would
therefore prefer it over a solution that hides this. It clearly makes
all potential optimizations impossible, as it means the output plugin
cannot distinguish between a proper PREAPRE and a bail-out PREPARE (that
does not fully replicate the PREPARE as on the origin node, either,
which I think is dangerous).

Regards

Markus

#14Ajin Cherian
itsajin@gmail.com
In reply to: Markus Wanner (#13)
1 attachment(s)
Re: [PATCH] add concurrent_abort callback for output plugin

On Mon, Mar 29, 2021 at 10:09 PM Markus Wanner <
markus.wanner@enterprisedb.com> wrote:

On 29.03.21 13:02, Ajin Cherian wrote:

Nice catch, Markus.
Interesting suggestion Amit. Let me try and code this.

Thanks, Ajin. Please consider this concurrent_abort callback as well.
I think it provides more flexibility for the output plugin and I would
therefore prefer it over a solution that hides this. It clearly makes
all potential optimizations impossible, as it means the output plugin
cannot distinguish between a proper PREAPRE and a bail-out PREPARE (that
does not fully replicate the PREPARE as on the origin node, either,
which I think is dangerous).

I understand your concern Markus, but I will leave it to one of the
committers to decide on the new callback.
For now, I've created a patch that addresses the problem reported using the
existing callbacks.
Do have a look if this fixes the problem reported.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v1-0001-Make-sure-a-prepare-is-sent-when-decoder-detects-.patchapplication/octet-stream; name=v1-0001-Make-sure-a-prepare-is-sent-when-decoder-detects-.patchDownload
From 1ba7219e8ff26148c14cdef6b54bfa480adf5cf0 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 30 Mar 2021 00:16:35 -0400
Subject: [PATCH v1] Make sure a prepare is sent when decoder detects a
 concurrent abort.

While decoding a prepared transaction, and a concurrent abort of the transaction
being decoded is detected, the decoding is stopped. But this fix makes sure that
even if the decoding is aborted, the PREPARE is sent out. This ensures that
when the ROLLBACK PREPARED is eventually sent downstream, there is a corresponding
prepared transaction to rollback.
---
 src/backend/replication/logical/reorderbuffer.c | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 127f2c4..1442af1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2664,6 +2664,13 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 
 	ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
 						txn->commit_time, txn->origin_id, txn->origin_lsn);
+
+	/*
+	 * If the transaction has been concurrently aborted, make sure we send
+	 * prepare here.
+	 */
+	if (txn->concurrent_abort)
+		rb->prepare(rb, txn, txn->final_lsn);
 }
 
 /*
-- 
1.8.3.1

#15Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Ajin Cherian (#14)
Re: [PATCH] add concurrent_abort callback for output plugin

Hello Ajin,

On 30.03.21 06:48, Ajin Cherian wrote:

For now, I've created a patch that addresses the problem reported using
the existing callbacks.

Thanks.

Do have a look if this fixes the problem reported.

Yes, this replaces the PREPARE I would do from the concurrent_abort
callback in a direct call to rb->prepare. However, it misses the most
important part: documentation. Because this clearly is a surprising
behavior for a transaction that's not fully decoded and guaranteed to
get aborted.

Regards

Markus

#16Ajin Cherian
itsajin@gmail.com
In reply to: Markus Wanner (#15)
Re: [PATCH] add concurrent_abort callback for output plugin

On Tue, Mar 30, 2021 at 5:30 PM Markus Wanner <
markus.wanner@enterprisedb.com> wrote:

Hello Ajin,

On 30.03.21 06:48, Ajin Cherian wrote:

For now, I've created a patch that addresses the problem reported using
the existing callbacks.

Thanks.

Do have a look if this fixes the problem reported.

Yes, this replaces the PREPARE I would do from the concurrent_abort
callback in a direct call to rb->prepare. However, it misses the most
important part: documentation. Because this clearly is a surprising
behavior for a transaction that's not fully decoded and guaranteed to
get aborted.

Where do you suggest this be documented? From an externally visible point
of view, I dont see much of a surprise.
A transaction that was prepared and rolled back can be decoded to be
prepared and rolled back with incomplete changes.
Are you suggesting more comments in code?

regards,
Ajin Cherian
Fujitsu Australia

#17Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Ajin Cherian (#16)
Re: [PATCH] add concurrent_abort callback for output plugin

On 30.03.21 09:39, Ajin Cherian wrote:

Where do you suggest this be documented? From an externally visible
point of view, I dont see much of a surprise.

If you start to think about the option of committing a prepared
transaction from a different node, the danger becomes immediately
apparent: A subscriber doesn't even know that the transaction is not
complete. How could it possibly know it's futile to COMMIT PREPARE it?
I think it's not just surprising, but outright dangerous to pretend
having prepared the transaction, but potentially miss some of the changes.

(Essentially: do not assume the ROLLBACK PREPARED will make it to the
subscriber. There's no such guarantee. The provider may crash, burn,
and vanish before that happens.)

So I suggest to document this as a caveat for the prepare callback,
because with this patch that's the callback which may be invoked for an
incomplete transaction without the output plugin knowing.

Regards

Markus

#18Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#15)
Re: [PATCH] add concurrent_abort callback for output plugin

On Tue, Mar 30, 2021 at 12:00 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

Hello Ajin,

On 30.03.21 06:48, Ajin Cherian wrote:

For now, I've created a patch that addresses the problem reported using
the existing callbacks.

Thanks.

Do have a look if this fixes the problem reported.

Yes, this replaces the PREPARE I would do from the concurrent_abort
callback in a direct call to rb->prepare.

That sounds clearly a better choice. Because concurrent_abort()
internally trying to prepare transaction seems a bit ugly and not only
that if we want to go via that route, it needs to distinguish between
rollback to savepoint and rollback cases as well.

Now, we can try to find a way where for such cases we don't send
prepare/rollback prepare, but somehow detect it and send rollback
instead. And also some more checks will be required so that if we have
streamed the transaction then send stream_abort. I am not telling that
all this is not possible but I don't find worth making all such
checks.

However, it misses the most
important part: documentation. Because this clearly is a surprising
behavior for a transaction that's not fully decoded and guaranteed to
get aborted.

Yeah, I guess that makes sense to me. I think we can document it in
the docs [1]https://www.postgresql.org/docs/devel/logicaldecoding-two-phase-commits.html where we explained two-phase decoding. I think we can add
a point about concurrent aborts at the end of points mentioned in the
following paragraph: "The users that want to decode prepared
transactions need to be careful ....."

[1]: https://www.postgresql.org/docs/devel/logicaldecoding-two-phase-commits.html

--
With Regards,
Amit Kapila.

#19Ajin Cherian
itsajin@gmail.com
In reply to: Markus Wanner (#17)
1 attachment(s)
Re: [PATCH] add concurrent_abort callback for output plugin

On Tue, Mar 30, 2021 at 7:10 PM Markus Wanner <
markus.wanner@enterprisedb.com> wrote:

On 30.03.21 09:39, Ajin Cherian wrote:

Where do you suggest this be documented? From an externally visible
point of view, I dont see much of a surprise.

So I suggest to document this as a caveat for the prepare callback,
because with this patch that's the callback which may be invoked for an
incomplete transaction without the output plugin knowing.

I found some documentation that already was talking about concurrent aborts
and updated that.
Patch attached.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v2-0001-Make-sure-a-prepare-is-sent-when-decoder-detects-.patchapplication/octet-stream; name=v2-0001-Make-sure-a-prepare-is-sent-when-decoder-detects-.patchDownload
From 38f482566cffd5ebaa2c5f836d1715dabb021e9f Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 30 Mar 2021 00:16:35 -0400
Subject: [PATCH v2] Make sure a prepare is sent when decoder detects a
 concurrent abort.

While decoding a prepared transaction, and a concurrent abort of the transaction
being decoded is detected, the decoding is stopped. But this fix makes sure that
even if the decoding is aborted, the PREPARE is sent out. This ensures that
when the ROLLBACK PREPARED is eventually sent downstream, there is a corresponding
prepared transaction to rollback.
---
 doc/src/sgml/logicaldecoding.sgml               | 10 ++++++----
 src/backend/replication/logical/reorderbuffer.c |  7 +++++++
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d..d2f8d39 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,14 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      executed within that transaction. A transaction that is prepared for
      a two-phase commit using <command>PREPARE TRANSACTION</command> will
      also be decoded if the output plugin callbacks needed for decoding
-     them are provided. It is possible that the current transaction which
+     them are provided. It is possible that the current prepared transaction which
      is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
      command. In that case, the logical decoding of this transaction will
-     be aborted too. We will skip all the changes of such a transaction once
-     the abort is detected and abort the transaction when we read WAL for
-     <command>ROLLBACK PREPARED</command>.
+     be aborted too. All the changes of such a transaction is skipped once
+     the abort is detected and the <function>prepare_cb</function> callback is invoked.
+     This could result in a prepared transaction with incomplete changes.
+     This is done so that eventually when the <command>ROLLBACK PREPARED</command>
+     is decoded, there is a corresponding prepared transaction with a matching gid.
     </para>
 
     <note>
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 127f2c4..1442af1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2664,6 +2664,13 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 
 	ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
 						txn->commit_time, txn->origin_id, txn->origin_lsn);
+
+	/*
+	 * If the transaction has been concurrently aborted, make sure we send
+	 * prepare here.
+	 */
+	if (txn->concurrent_abort)
+		rb->prepare(rb, txn, txn->final_lsn);
 }
 
 /*
-- 
1.8.3.1

#20Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#18)
Re: [PATCH] add concurrent_abort callback for output plugin

On 30.03.21 11:02, Amit Kapila wrote:

On Tue, Mar 30, 2021 at 12:00 PM Markus Wanner

Yes, this replaces the PREPARE I would do from the concurrent_abort
callback in a direct call to rb->prepare.

Because concurrent_abort()
internally trying to prepare transaction seems a bit ugly and not only
that if we want to go via that route, it needs to distinguish between
rollback to savepoint and rollback cases as well.

Just to clarify: of course, the concurrent_abort callback only sends a
message to the subscriber, which then (in our current implementation)
upon reception of the concurrent_abort message opts to prepare the
transaction. Different implementations would be possible.

I would recommend this more explicit API and communication over hiding
the concurrent abort in a prepare callback.

Regards

Markus

#21Markus Wanner
markus@bluegap.ch
In reply to: Ajin Cherian (#19)
Re: [PATCH] add concurrent_abort callback for output plugin

On 30.03.21 11:12, Ajin Cherian wrote:

I found some documentation that already was talking about concurrent
aborts and updated that.

Thanks.

I just noticed as of PG13, concurrent_abort is part of ReorderBufferTXN,
so it seems the prepare_cb (or stream_prepare_cb) can actually figure a
concurrent abort happened (and the transaction may be incomplete).
That's good and indeed makes an additional callback unnecessary.

I recommend giving a hint to that field in the documentation as well.

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d..d2f8d39 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,14 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
executed within that transaction. A transaction that is prepared for
a two-phase commit using <command>PREPARE TRANSACTION</command> will
also be decoded if the output plugin callbacks needed for decoding
-     them are provided. It is possible that the current transaction which
+     them are provided. It is possible that the current prepared transaction which
is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
command. In that case, the logical decoding of this transaction will
-     be aborted too. We will skip all the changes of such a transaction once
-     the abort is detected and abort the transaction when we read WAL for
-     <command>ROLLBACK PREPARED</command>.
+     be aborted too. All the changes of such a transaction is skipped once

typo: changes [..] *are* skipped, plural.

+     the abort is detected and the <function>prepare_cb</function> callback is invoked.
+     This could result in a prepared transaction with incomplete changes.

... "in which case the <literal>concurrent_abort</literal> field of the
passed <literal>ReorderBufferTXN</literal> struct is set.", as a proposal?

+     This is done so that eventually when the <command>ROLLBACK PREPARED</command>
+     is decoded, there is a corresponding prepared transaction with a matching gid.
</para>

<note>

Everything else sounds good to me.

Regards

Markus

#22Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Markus Wanner (#20)
Re: [PATCH] add concurrent_abort callback for output plugin

On 30.03.21 11:54, Markus Wanner wrote:

I would recommend this more explicit API and communication over hiding
the concurrent abort in a prepare callback.

I figured we already have the ReorderBufferTXN's concurrent_abort flag,
thus I agree the prepare_cb is sufficient and revoke this recommendation
(and the concurrent_abort callback patch).

Regards

Markus

#23Ajin Cherian
itsajin@gmail.com
In reply to: Markus Wanner (#21)
1 attachment(s)
Re: [PATCH] add concurrent_abort callback for output plugin

On Tue, Mar 30, 2021 at 10:29 PM Markus Wanner <markus@bluegap.ch> wrote:

I just noticed as of PG13, concurrent_abort is part of ReorderBufferTXN,
so it seems the prepare_cb (or stream_prepare_cb) can actually figure a
concurrent abort happened (and the transaction may be incomplete).
That's good and indeed makes an additional callback unnecessary.

I recommend giving a hint to that field in the documentation as well.

diff --git a/doc/src/sgml/logicaldecoding.sgml

b/doc/src/sgml/logicaldecoding.sgml

index 80eb96d..d2f8d39 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,14 @@ CREATE TABLE another_catalog_table(data text) WITH

(user_catalog_table = true);

executed within that transaction. A transaction that is prepared

for

a two-phase commit using <command>PREPARE TRANSACTION</command>

will

also be decoded if the output plugin callbacks needed for decoding
- them are provided. It is possible that the current transaction

which

+ them are provided. It is possible that the current prepared

transaction which

is being decoded is aborted concurrently via a <command>ROLLBACK

PREPARED</command>

command. In that case, the logical decoding of this transaction

will

- be aborted too. We will skip all the changes of such a transaction

once

- the abort is detected and abort the transaction when we read WAL

for

-     <command>ROLLBACK PREPARED</command>.
+     be aborted too. All the changes of such a transaction is skipped

once

typo: changes [..] *are* skipped, plural.

Updated.

+ the abort is detected and the <function>prepare_cb</function>

callback is invoked.

+ This could result in a prepared transaction with incomplete

changes.

... "in which case the <literal>concurrent_abort</literal> field of the
passed <literal>ReorderBufferTXN</literal> struct is set.", as a proposal?

+ This is done so that eventually when the <command>ROLLBACK

PREPARED</command>

+ is decoded, there is a corresponding prepared transaction with a

matching gid.

</para>

<note>

Everything else sounds good to me.

Updated.

regards,
Ajin Cherian
Fujitsu Australia

Attachments:

v3-0001-Make-sure-a-prepare-is-sent-when-decoder-detects-.patchapplication/octet-stream; name=v3-0001-Make-sure-a-prepare-is-sent-when-decoder-detects-.patchDownload
From afcb7662e45f2f52cc62a4b06e05604ac97f70b5 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 30 Mar 2021 00:16:35 -0400
Subject: [PATCH v3] Make sure a prepare is sent when decoder detects a
 concurrent abort.

While decoding a prepared transaction, and a concurrent abort of the transaction
being decoded is detected, the decoding is stopped. But this fix makes sure that
even if the decoding is aborted, the PREPARE is sent out. This ensures that
when the ROLLBACK PREPARED is eventually sent downstream, there is a corresponding
prepared transaction to rollback.
---
 doc/src/sgml/logicaldecoding.sgml               | 12 ++++++++----
 src/backend/replication/logical/reorderbuffer.c |  7 +++++++
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d..e3f6008 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,16 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      executed within that transaction. A transaction that is prepared for
      a two-phase commit using <command>PREPARE TRANSACTION</command> will
      also be decoded if the output plugin callbacks needed for decoding
-     them are provided. It is possible that the current transaction which
+     them are provided. It is possible that the current prepared transaction which
      is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
      command. In that case, the logical decoding of this transaction will
-     be aborted too. We will skip all the changes of such a transaction once
-     the abort is detected and abort the transaction when we read WAL for
-     <command>ROLLBACK PREPARED</command>.
+     be aborted too. All the changes of such a transaction are skipped once
+     the abort is detected and the <function>prepare_cb</function> callback is invoked.
+     This could result in a prepared transaction with incomplete changes, in which case
+     the <literal>concurrent_abort</literal> field of the passed
+     <literal>ReorderBufferTXN</literal> struct is set.
+     This is done so that eventually when the <command>ROLLBACK PREPARED</command>
+     is decoded, there is a corresponding prepared transaction with a matching gid.
     </para>
 
     <note>
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 127f2c4..1442af1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2664,6 +2664,13 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 
 	ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
 						txn->commit_time, txn->origin_id, txn->origin_lsn);
+
+	/*
+	 * If the transaction has been concurrently aborted, make sure we send
+	 * prepare here.
+	 */
+	if (txn->concurrent_abort)
+		rb->prepare(rb, txn, txn->final_lsn);
 }
 
 /*
-- 
1.8.3.1

#24Amit Kapila
amit.kapila16@gmail.com
In reply to: Ajin Cherian (#23)
1 attachment(s)
Re: [PATCH] add concurrent_abort callback for output plugin

On Wed, Mar 31, 2021 at 6:42 AM Ajin Cherian <itsajin@gmail.com> wrote:

Updated.

I have slightly adjusted the comments, docs, and commit message. What
do you think about the attached?

--
With Regards,
Amit Kapila.

Attachments:

v4-0001-Ensure-to-send-a-prepare-after-we-detect-concurre.patchapplication/octet-stream; name=v4-0001-Ensure-to-send-a-prepare-after-we-detect-concurre.patchDownload
From f6e973a6d79b867b1e6d23a9de945685200f1fa8 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 31 Mar 2021 09:49:03 +0530
Subject: [PATCH v4] Ensure to send a prepare after we detect concurrent abort
 during decoding.

It is possible that while decoding a prepared transaction, it gets aborted
concurrently via a ROLLBACK PREPARED command. In that case, we were
skipping all the changes and directly sending Rollback Prepared when we
find the same in WAL. However, the downstream has no idea of the GID of
such a transaction. So, ensure to send prepare even when a concurrent
abort is detected.

Author: Ajin Cherian
Reviewed-by: Markus Wanner, Amit Kapila
Discussion: https://postgr.es/m/f82133c6-6055-b400-7922-97dae9f2b50b@enterprisedb.com
---
 doc/src/sgml/logicaldecoding.sgml               | 17 +++++++++++------
 src/backend/replication/logical/reorderbuffer.c |  8 ++++++++
 2 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index da23f89ca32..f11d8aff13d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,17 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      executed within that transaction. A transaction that is prepared for
      a two-phase commit using <command>PREPARE TRANSACTION</command> will
      also be decoded if the output plugin callbacks needed for decoding
-     them are provided. It is possible that the current transaction which
-     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
-     command. In that case, the logical decoding of this transaction will
-     be aborted too. We will skip all the changes of such a transaction once
-     the abort is detected and abort the transaction when we read WAL for
-     <command>ROLLBACK PREPARED</command>.
+     them are provided. It is possible that the current prepared transaction
+     which is being decoded is aborted concurrently via a
+     <command>ROLLBACK PREPARED</command> command. In that case, the logical
+     decoding of this transaction will be aborted too. All the changes of such
+     a transaction are skipped once the abort is detected and the
+     <function>prepare_cb</function> callback is invoked. This could result in
+     a prepared transaction with incomplete changes, in which case the
+     <literal>concurrent_abort</literal> field of the passed
+     <literal>ReorderBufferTXN</literal> struct is set. This is done so that
+     eventually when the <command>ROLLBACK PREPARED</command> is decoded, there
+     is a corresponding prepared transaction with a matching gid.
     </para>
 
     <note>
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 127f2c4b168..52d06285a21 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2664,6 +2664,14 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 
 	ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
 						txn->commit_time, txn->origin_id, txn->origin_lsn);
+
+	/*
+	 * We send the prepare for the concurrently aborted xacts so that later
+	 * when rollback prepared is decoded and sent, the downstream should be
+	 * able to rollback such a xact. See comments atop DecodePrepare.
+	 */
+	if (txn->concurrent_abort)
+		rb->prepare(rb, txn, txn->final_lsn);
 }
 
 /*
-- 
2.28.0.windows.1

#25Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#24)
Re: [PATCH] add concurrent_abort callback for output plugin

On 31.03.21 06:39, Amit Kapila wrote:

I have slightly adjusted the comments, docs, and commit message. What
do you think about the attached?

Thank you both, Amit and Ajin. This looks good to me.

Only one minor gripe:

+     a prepared transaction with incomplete changes, in which case the
+     <literal>concurrent_abort</literal> field of the passed
+     <literal>ReorderBufferTXN</literal> struct is set. This is done so that
+     eventually when the <command>ROLLBACK PREPARED</command> is decoded, there
+     is a corresponding prepared transaction with a matching gid.

The last sentences there now seems to relate to just the setting of
"concurrent_abort", rather than the whole reason to invoke the
prepare_cb. And the reference to the "gid" is a bit lost. Maybe:

"Thus even in case of a concurrent abort, enough information is
provided to the output plugin for it to properly deal with the
<command>ROLLBACK PREPARED</command> once that is decoded."

Alternatively, state that the gid is otherwise missing earlier in the
docs (similar to how the commit message describes it).

Regards

Markus

#26Ajin Cherian
itsajin@gmail.com
In reply to: Markus Wanner (#25)
Re: [PATCH] add concurrent_abort callback for output plugin

On Wed, Mar 31, 2021 at 5:25 PM Markus Wanner <
markus.wanner@enterprisedb.com> wrote:

The last sentences there now seems to relate to just the setting of
"concurrent_abort", rather than the whole reason to invoke the
prepare_cb. And the reference to the "gid" is a bit lost. Maybe:

"Thus even in case of a concurrent abort, enough information is
provided to the output plugin for it to properly deal with the
<command>ROLLBACK PREPARED</command> once that is decoded."

Alternatively, state that the gid is otherwise missing earlier in the
docs (similar to how the commit message describes it).

I'm fine with Amit's changes and like Markus's last suggestion as well.

regards,
Ajin Cherian
Fujitsu Australia

#27Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#25)
1 attachment(s)
Re: [PATCH] add concurrent_abort callback for output plugin

On Wed, Mar 31, 2021 at 11:55 AM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 31.03.21 06:39, Amit Kapila wrote:

I have slightly adjusted the comments, docs, and commit message. What
do you think about the attached?

Thank you both, Amit and Ajin. This looks good to me.

Only one minor gripe:

+     a prepared transaction with incomplete changes, in which case the
+     <literal>concurrent_abort</literal> field of the passed
+     <literal>ReorderBufferTXN</literal> struct is set. This is done so that
+     eventually when the <command>ROLLBACK PREPARED</command> is decoded, there
+     is a corresponding prepared transaction with a matching gid.

The last sentences there now seems to relate to just the setting of
"concurrent_abort", rather than the whole reason to invoke the
prepare_cb. And the reference to the "gid" is a bit lost. Maybe:

"Thus even in case of a concurrent abort, enough information is
provided to the output plugin for it to properly deal with the
<command>ROLLBACK PREPARED</command> once that is decoded."

Okay, Changed the patch accordingly.

--
With Regards,
Amit Kapila.

Attachments:

v5-0001-Ensure-to-send-a-prepare-after-we-detect-concurre.patchapplication/octet-stream; name=v5-0001-Ensure-to-send-a-prepare-after-we-detect-concurre.patchDownload
From c35e3e21792173693a2e79df43f519c86de2e246 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 31 Mar 2021 09:49:03 +0530
Subject: [PATCH v5] Ensure to send a prepare after we detect concurrent abort
 during decoding.

It is possible that while decoding a prepared transaction, it gets aborted
concurrently via a ROLLBACK PREPARED command. In that case, we were
skipping all the changes and directly sending Rollback Prepared when we
find the same in WAL. However, the downstream has no idea of the GID of
such a transaction. So, ensure to send prepare even when a concurrent
abort is detected.

Author: Ajin Cherian
Reviewed-by: Markus Wanner, Amit Kapila
Discussion: https://postgr.es/m/f82133c6-6055-b400-7922-97dae9f2b50b@enterprisedb.com
---
 doc/src/sgml/logicaldecoding.sgml               | 15 +++++++++------
 src/backend/replication/logical/reorderbuffer.c |  8 ++++++++
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index da23f89..5d049cd 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -545,12 +545,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      executed within that transaction. A transaction that is prepared for
      a two-phase commit using <command>PREPARE TRANSACTION</command> will
      also be decoded if the output plugin callbacks needed for decoding
-     them are provided. It is possible that the current transaction which
-     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
-     command. In that case, the logical decoding of this transaction will
-     be aborted too. We will skip all the changes of such a transaction once
-     the abort is detected and abort the transaction when we read WAL for
-     <command>ROLLBACK PREPARED</command>.
+     them are provided. It is possible that the current prepared transaction
+     which is being decoded is aborted concurrently via a
+     <command>ROLLBACK PREPARED</command> command. In that case, the logical
+     decoding of this transaction will be aborted too. All the changes of such
+     a transaction are skipped once the abort is detected and the
+     <function>prepare_cb</function> callback is invoked. Thus even in case of
+     a concurrent abort, enough information is provided to the output plugin
+     for it to properly deal with <command>ROLLBACK PREPARED</command> once
+     that is decoded.
     </para>
 
     <note>
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 127f2c4..52d0628 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2664,6 +2664,14 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 
 	ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
 						txn->commit_time, txn->origin_id, txn->origin_lsn);
+
+	/*
+	 * We send the prepare for the concurrently aborted xacts so that later
+	 * when rollback prepared is decoded and sent, the downstream should be
+	 * able to rollback such a xact. See comments atop DecodePrepare.
+	 */
+	if (txn->concurrent_abort)
+		rb->prepare(rb, txn, txn->final_lsn);
 }
 
 /*
-- 
1.8.3.1

#28Markus Wanner
markus.wanner@enterprisedb.com
In reply to: Amit Kapila (#27)
Re: [PATCH] add concurrent_abort callback for output plugin

On 31.03.21 15:18, Amit Kapila wrote:

On Wed, Mar 31, 2021 at 11:55 AM Markus Wanner

The last sentences there now seems to relate to just the setting of
"concurrent_abort", rather than the whole reason to invoke the
prepare_cb. And the reference to the "gid" is a bit lost. Maybe:

"Thus even in case of a concurrent abort, enough information is
provided to the output plugin for it to properly deal with the
<command>ROLLBACK PREPARED</command> once that is decoded."

Okay, Changed the patch accordingly.

That's fine with me. I didn't necessarily mean to eliminate the hint to
the concurrent_abort field, but it's more concise that way. Thank you.

Regards

Markus

#29Amit Kapila
amit.kapila16@gmail.com
In reply to: Markus Wanner (#28)
Re: [PATCH] add concurrent_abort callback for output plugin

On Wed, Mar 31, 2021 at 7:20 PM Markus Wanner
<markus.wanner@enterprisedb.com> wrote:

On 31.03.21 15:18, Amit Kapila wrote:

On Wed, Mar 31, 2021 at 11:55 AM Markus Wanner

The last sentences there now seems to relate to just the setting of
"concurrent_abort", rather than the whole reason to invoke the
prepare_cb. And the reference to the "gid" is a bit lost. Maybe:

"Thus even in case of a concurrent abort, enough information is
provided to the output plugin for it to properly deal with the
<command>ROLLBACK PREPARED</command> once that is decoded."

Okay, Changed the patch accordingly.

That's fine with me.

Pushed.

--
With Regards,
Amit Kapila.

#30Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#29)
1 attachment(s)
Re: [PATCH] add concurrent_abort callback for output plugin

Hi,

While testing another WIP patch [1]/messages/by-id/CAHut+PuB07xOgJLnDhvbtp0t_qMDhjDD+kO+2yB+r6tgfaR-5Q@mail.gmail.com a clashing GID problem was found,
which gives us apply worker errors like:

2021-04-26 10:07:12.883 AEST [22055] ERROR: transaction identifier
"pg_gid_16403_608" is already in use
2021-04-26 10:08:05.149 AEST [22124] ERROR: transaction identifier
"pg_gid_16403_757" is already in use

These GID clashes were traced back to a problem of the
concurrent-abort logic: when "streaming" is enabled the
concurrent-abort logic was always sending "prepare" even though a
"stream_prepare" had already been sent.

PSA a patch to correct this.

------
[1]: /messages/by-id/CAHut+PuB07xOgJLnDhvbtp0t_qMDhjDD+kO+2yB+r6tgfaR-5Q@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v1-0001-Fix-concurrent-abort-for-when-streaming.patchapplication/octet-stream; name=v1-0001-Fix-concurrent-abort-for-when-streaming.patchDownload
From e109c0c813b882d4438a40ccbe25903ffbde14a5 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 26 Apr 2021 10:57:47 +1000
Subject: [PATCH v1] Fix concurrent abort for when streaming.

When a concurrent-abort was detected ReorderBufferPrepare is sending "prepare".
But when using streaming there was already a "stream_prepare" sent for same tx.

The double sending of prepares would cause a GID clash detected in worker.c, giving
errors like: ERROR:  transaction identifier "pg_gid_16403_596" is already in use

This patch prevents the double-sending of prepares when streaming.
---
 src/backend/replication/logical/reorderbuffer.c | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 341bef5..21b5f11 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2690,8 +2690,11 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 	 * We send the prepare for the concurrently aborted xacts so that later
 	 * when rollback prepared is decoded and sent, the downstream should be
 	 * able to rollback such a xact. See comments atop DecodePrepare.
+	 *
+	 * Note, for the concurrent_abort + streaming case a streaming_prepare was
+	 * already sent within the ReorderBufferReplay call above.
 	 */
-	if (txn->concurrent_abort)
+	if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
 		rb->prepare(rb, txn, txn->final_lsn);
 }
 
-- 
1.8.3.1

#31Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#30)
Re: [PATCH] add concurrent_abort callback for output plugin

On Mon, Apr 26, 2021 at 7:05 AM Peter Smith <smithpb2250@gmail.com> wrote:

Hi,

While testing another WIP patch [1] a clashing GID problem was found,
which gives us apply worker errors like:

2021-04-26 10:07:12.883 AEST [22055] ERROR: transaction identifier
"pg_gid_16403_608" is already in use
2021-04-26 10:08:05.149 AEST [22124] ERROR: transaction identifier
"pg_gid_16403_757" is already in use

These GID clashes were traced back to a problem of the
concurrent-abort logic: when "streaming" is enabled the
concurrent-abort logic was always sending "prepare" even though a
"stream_prepare" had already been sent.

PSA a patch to correct this.

Your patch looks good to me, so pushed! Thanks for the report and patch.

--
With Regards,
Amit Kapila.