From: Markus Wanner <markus.wanner@enterprisedb.com>
Date: Thu, 11 Feb 2021 13:49:55 +0100
Subject: [PATCH] Add a concurrent_abort callback for the output plugin.

Logical decoding of a prepared or streamed transaction may fail if the
transaction got aborted after invoking the begin_cb (and likely having
sent some changes via change_cb), but before the necessary catalog scans
could be performed.  In this case, decoding the transaction is neither
possible nor necessary (given it got rolled back).

To give the output plugin a chance to cleanup the aborted transaction as
well, introduce a concurrent_abort callback.  It is only ever invoked to
terminate unfinished transactions, not for normal aborts.

Adjust contrib/test_decoding to define a concurrent_abort callback.
---
 contrib/test_decoding/test_decoding.c         | 29 ++++++++++++
 doc/src/sgml/logicaldecoding.sgml             | 43 +++++++++++++++++-
 src/backend/replication/logical/logical.c     | 45 +++++++++++++++++++
 .../replication/logical/reorderbuffer.c       |  6 +++
 src/include/replication/output_plugin.h       |  9 ++++
 src/include/replication/reorderbuffer.h       |  7 +++
 6 files changed, 138 insertions(+), 1 deletion(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..a5dd80e0957 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -83,6 +83,9 @@ static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn,
 								  XLogRecPtr prepare_lsn);
+static void pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn, bool streaming,
+										   XLogRecPtr lsn);
 static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
 										  ReorderBufferTXN *txn,
 										  XLogRecPtr commit_lsn);
@@ -137,6 +140,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pg_decode_change;
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->concurrent_abort_cb = pg_decode_concurrent_abort_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
@@ -386,6 +390,31 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+							   ReorderBufferTXN *txn, bool streaming,
+							   XLogRecPtr lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool		xact_wrote_changes = txndata->xact_wrote_changes;
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	if (data->include_xids)
+		appendStringInfo(ctx->out, "<concurrent abort> %u", txn->xid);
+	else
+		appendStringInfoString(ctx->out, "<concurrent abort>");
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 /* COMMIT PREPARED callback */
 static void
 pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..c44dee55a02 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -441,6 +441,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
     LogicalDecodeBeginPrepareCB begin_prepare_cb;
     LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeConcurrentAbortCB concurrent_abort_cb;
     LogicalDecodeCommitPreparedCB commit_prepared_cb;
     LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
@@ -459,7 +460,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>,
      <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
-     and <function>shutdown_cb</function> are optional.
+     <function>shutdown_cb</function>, and
+     <function>concurrent_abort_cb</function> are optional.
      If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
@@ -847,6 +849,45 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-concurrent-abort">
+     <title>Concurrent Abort Callback</title>
+
+     <para>
+      The optional <function>concurrent_abort_cb</function> callback is called
+      whenever a transaction got aborted in the middle of sending its changes
+      to the output plugin.  This can happen in the case of decoding a
+      two-phase commit transaction or with streaming enabled, and only if the
+      transaction got aborted after its <function>begin_cb</function> has been
+      invoked.  This is not ever the case for normal single-phase transactions
+      that are not streamed.
+<programlisting>
+typedef bool (*LogicalDecodeConcurrentAbortCB) (struct LogicalDecodingContext *ctx,
+                                                ReorderBufferTXN *txn,
+                                                bool streaming,
+                                                XLogRecPtr abort_lsn);
+</programlisting>
+      The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+      have the same contents as for the <function>begin_cb</function>
+      and <function>commit_cb</function> callbacks.
+      The <parameter>streaming</parameter> parameter indicates whether this
+      happened during streaming or at the <command>PREPARE</command> of a
+      two-phase transaction.  In case of a two-phase transaction,
+      the <parameter>abort_lsn</parameter> parameter additionally points to
+      the end of the prepare record in WAL, it is invalid for streaming
+      transactions.
+     </para>
+
+     <note>
+      <para>
+       This is just an early indication for the output plugin.  For
+       concurrently aborted transactions, either
+       the <function>rollback_prepared_cb</function> or
+       the <function>stream_abort_cb</function> callback will be invoked once
+       the decoder reaches the corresponding WAL records.
+      </para>
+     </note>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-commit-prepared">
      <title>Transaction Commit Prepared Callback</title>
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..4b99fe5cc4e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -66,6 +66,8 @@ static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *t
 									   XLogRecPtr commit_lsn);
 static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 										 XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
+static void concurrent_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+										bool streaming, XLogRecPtr end_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -276,6 +278,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->prepare = prepare_cb_wrapper;
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+	ctx->reorder->concurrent_abort = concurrent_abort_cb_wrapper;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -1005,6 +1008,48 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+concurrent_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							bool streaming, XLogRecPtr end_lsn)
+{
+	LogicalDecodingContext	   *ctx = cache->private_data;
+	LogicalErrorCallbackState	state;
+	ErrorContextCallback		errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/*
+	 * We're only supposed to call this when either two-phase commits are
+	 * supported or if we're streaming.
+	 */
+	Assert(ctx->twophase || streaming);
+
+	/* The callback is optional, invoke only if provided. */
+	if (ctx->callbacks.concurrent_abort_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "concurrent_abort";
+	state.report_location = txn->final_lsn; /* beginning of record attempted to
+											   decode */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.concurrent_abort_cb(ctx, txn, streaming, end_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c291b05a423..a6d044b870b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2488,6 +2488,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			errdata = NULL;
 			curtxn->concurrent_abort = true;
 
+			/*
+			 * Call the cleanup hook to inform the output plugin that the
+			 * transaction just started had to be aborted.
+			 */
+			rb->concurrent_abort(rb, txn, streaming, commit_lsn);
+
 			/* Reset the TXN so that it is allowed to stream remaining data. */
 			ReorderBufferResetTXN(rb, txn, snapshot_now,
 								  command_id, prev_lsn,
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..e122dc48139 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -77,6 +77,14 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+/*
+ * Called for decoding errors due to concurent aborts.
+ */
+typedef void (*LogicalDecodeConcurrentAbortCB) (struct LogicalDecodingContext *ctx,
+												ReorderBufferTXN *txn,
+												bool streaming,
+												XLogRecPtr end_lsn);
+
 /*
  * Called for the generic logical decoding messages.
  */
@@ -227,6 +235,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodePrepareCB prepare_cb;
 	LogicalDecodeCommitPreparedCB commit_prepared_cb;
 	LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+	LogicalDecodeConcurrentAbortCB concurrent_abort_cb;
 
 	/* streaming of changes */
 	LogicalDecodeStreamStartCB stream_start_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6ab..153474baa84 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -460,6 +460,12 @@ typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
 												 XLogRecPtr prepare_end_lsn,
 												 TimestampTz prepare_time);
 
+/* concurrent abort callback signature */
+typedef void (*ReorderBufferConcurrentAbortCB) (ReorderBuffer *rb,
+												ReorderBufferTXN *txn,
+												bool streaming,
+												XLogRecPtr end_lsn);
+
 /* start streaming transaction callback signature */
 typedef void (*ReorderBufferStreamStartCB) (
 											ReorderBuffer *rb,
@@ -559,6 +565,7 @@ struct ReorderBuffer
 	ReorderBufferPrepareCB prepare;
 	ReorderBufferCommitPreparedCB commit_prepared;
 	ReorderBufferRollbackPreparedCB rollback_prepared;
+	ReorderBufferConcurrentAbortCB concurrent_abort;
 
 	/*
 	 * Callbacks to be called when streaming a transaction.
-- 
2.30.2

