From 06973f7b57a9c186e53400e8815d8edf7a6bc047 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Mon, 18 Nov 2019 16:32:34 +0530
Subject: [PATCH v6 04/12] Gracefully handle concurrent aborts of uncommitted
 transactions that are being decoded alongside.

When a transaction aborts, it's changes are considered unnecessary for
other transactions. That means the changes may be either cleaned up by
vacuum or removed from HOT chains (thus made inaccessible through
indexes), and there may be other such consequences.

When decoding committed transactions this is not an issue, and we
never decode transactions that abort before the decoding starts.

But for in-progress transactions, this may cause failures when the
output plugin consults catalogs (both system and user-defined).

We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend decoding a
specific uncommitted transaction. The decoding logic on the receipt
of such an sqlerrcode aborts the ongoing decoding and returns
gracefully.
---
 doc/src/sgml/logicaldecoding.sgml             |  5 +-
 src/backend/access/heap/heapam.c              | 41 ++++++++++++++++
 src/backend/access/index/genam.c              | 49 +++++++++++++++++++
 .../replication/logical/reorderbuffer.c       |  8 +--
 src/backend/utils/time/snapmgr.c              | 25 +++++++++-
 src/include/utils/snapmgr.h                   |  4 +-
 6 files changed, 124 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index ace21ec8e5..319349a92d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -432,7 +432,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
 ALTER TABLE user_catalog_table SET (user_catalog_table = true);
 CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
 </programlisting>
-     Any actions leading to transaction ID assignment are prohibited. That, among others,
+     Note that access to user catalog tables or regular system catalog tables
+     in the output plugins has to be done via the <literal>systable_*</literal> scan APIs only.
+     Access via the <literal>heap_*</literal> scan APIs will error out.
+     Additionally, any actions leading to transaction ID assignment are prohibited. That, among others,
      includes writing to tables, performing DDL changes, and
      calling <literal>txid_current()</literal>.
     </para>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 7b8490d4e5..2d4ef48069 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1303,6 +1303,15 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg_internal("only heap AM is supported")));
 
+	/*
+	 * We don't expect direct calls to heap_getnext with valid
+	 * CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(scan->rs_base.rs_rd) ||
+		  RelationIsUsedAsCatalogTable(scan->rs_base.rs_rd))))
+		elog(ERROR, "unexpected heap_getnext call during logical decoding");
+
 	/* Note: no locking manipulations needed */
 
 	HEAPDEBUG_1;				/* heap_getnext( info ) */
@@ -1421,6 +1430,14 @@ heap_fetch(Relation relation,
 	OffsetNumber offnum;
 	bool		valid;
 
+	/*
+	 * We don't expect direct calls to heap_fetch with valid
+	 * CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		elog(ERROR, "unexpected heap_fetch call during logical decoding");
+
 	/*
 	 * Fetch and pin the appropriate page of the relation.
 	 */
@@ -1535,6 +1552,14 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 	bool		valid;
 	bool		skip;
 
+	/*
+	 * We don't expect direct calls to heap_hot_search_buffer with
+	 * valid CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		elog(ERROR, "unexpected heap_hot_search_buffer call during logical decoding");
+
 	/* If this is not the first call, previous call returned a (live!) tuple */
 	if (all_dead)
 		*all_dead = first_call;
@@ -1682,6 +1707,14 @@ heap_get_latest_tid(TableScanDesc sscan,
 	 */
 	Assert(ItemPointerIsValid(tid));
 
+	/*
+	 * We don't expect direct calls to heap_get_latest_tid with valid
+	 * CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		elog(ERROR, "unexpected heap_get_latest_tid call during logical decoding");
+
 	/*
 	 * Loop to chase down t_ctid links.  At top of loop, ctid is the tuple we
 	 * need to examine, and *tid is the TID we will return if ctid turns out
@@ -5481,6 +5514,14 @@ heap_finish_speculative(Relation relation, ItemPointer tid)
 	ItemId		lp = NULL;
 	HeapTupleHeader htup;
 
+	/*
+	 * We don't expect direct calls to heap_hot_search with
+	 * valid CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		elog(ERROR, "unexpected heap_hot_search call during logical decoding");
+
 	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 	page = (Page) BufferGetPage(buffer);
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index c16eb05416..5644b8d41a 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -28,6 +28,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -477,6 +478,22 @@ systable_getnext(SysScanDesc sysscan)
 		}
 	}
 
+	/*
+	 * If CheckXidAlive is valid, then we check if it aborted. If it did, we
+	 * error out.  Instead of directly checking the abort status we do check
+	 * if it is not in progress transaction and no committed. Because if there
+	 * were a system crash then status of the the transaction which were running
+	 * at that time might not have marked.  So we need to consider them as
+	 * aborted.  Refer detailed comments at snapmgr.c where the variable is
+	 * declared.
+	 */
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+
 	return htup;
 }
 
@@ -513,6 +530,22 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
 											sysscan->slot,
 											freshsnap);
 
+	/*
+	 * If CheckXidAlive is valid, then we check if it aborted. If it did, we
+	 * error out.  Instead of directly checking the abort status we do check
+	 * if it is not in progress transaction and no committed. Because if there
+	 * were a system crash then status of the the transaction which were running
+	 * at that time might not have marked.  So we need to consider them as
+	 * aborted.  Refer detailed comments at snapmgr.c where the variable is
+	 * declared.
+	 */
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+
 	return result;
 }
 
@@ -639,6 +672,22 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
 	if (htup && sysscan->iscan->xs_recheck)
 		elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
 
+	/*
+	 * If CheckXidAlive is valid, then we check if it aborted. If it did, we
+	 * error out.  Instead of directly checking the abort status we do check
+	 * if it is not in progress transaction and no committed. Because if there
+	 * were a system crash then status of the the transaction which were running
+	 * at that time might not have marked.  So we need to consider them as
+	 * aborted.  Refer detailed comments at snapmgr.c where the variable is
+	 * declared.
+	 */
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+
 	return htup;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 531897cf05..2da0a23a7e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -692,7 +692,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 		/* setup snapshot to allow catalog access */
-		SetupHistoricSnapshot(snapshot_now, NULL);
+		SetupHistoricSnapshot(snapshot_now, NULL, xid);
 		PG_TRY();
 		{
 			rb->message(rb, txn, lsn, false, prefix, message_size, message);
@@ -1551,7 +1551,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	ReorderBufferBuildTupleCidHash(rb, txn);
 
 	/* setup the initial snapshot */
-	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
 
 	/*
 	 * Decoding needs access to syscaches et al., which in turn use
@@ -1802,7 +1802,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 
 					/* and continue with the new one */
-					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
 					break;
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1822,7 +1822,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						snapshot_now->curcid = command_id;
 
 						TeardownHistoricSnapshot(false);
-						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
 					}
 
 					break;
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c063c592c..93a0c048c5 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -153,6 +153,13 @@ static Snapshot SecondarySnapshot = NULL;
 static Snapshot CatalogSnapshot = NULL;
 static Snapshot HistoricSnapshot = NULL;
 
+/*
+ * An xid value pointing to a possibly ongoing (sub)transaction.
+ * Currently used in logical decoding.  It's possible that such transactions
+ * can get aborted while the decoding is ongoing.
+ */
+TransactionId CheckXidAlive = InvalidTransactionId;
+
 /*
  * These are updated by GetSnapshotData.  We initialize them this way
  * for the convenience of TransactionIdIsInProgress: even in bootstrap
@@ -2029,10 +2036,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
  * access to behave just like it did at a certain point in the past.
  *
+ * If a valid xid is passed in, we check if it is uncommitted and track it in
+ * CheckXidAlive.  This is to re-check XID status while accessing catalog.
+ *
  * Needed for logical decoding.
  */
 void
-SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
+SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids,
+					  TransactionId snapshot_xid)
 {
 	Assert(historic_snapshot != NULL);
 
@@ -2041,8 +2052,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
 
 	/* setup (cmin, cmax) lookup hash */
 	tuplecid_data = tuplecids;
-}
 
+	/*
+	 * setup CheckXidAlive if it's not committed yet. We don't check
+	 * if the xid aborted. That will happen during catalog access.
+	 */
+	if (TransactionIdIsValid(snapshot_xid) &&
+		!TransactionIdDidCommit(snapshot_xid))
+		CheckXidAlive = snapshot_xid;
+	else
+		CheckXidAlive = InvalidTransactionId;
+}
 
 /*
  * Make catalog snapshots behave normally again.
@@ -2052,6 +2072,7 @@ TeardownHistoricSnapshot(bool is_error)
 {
 	HistoricSnapshot = NULL;
 	tuplecid_data = NULL;
+	CheckXidAlive = InvalidTransactionId;
 }
 
 bool
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index b28d13ce84..12f737b21a 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -145,8 +145,10 @@ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
 /* Support for catalog timetravel for logical decoding */
 struct HTAB;
+extern TransactionId CheckXidAlive;
 extern struct HTAB *HistoricSnapshotGetTupleCids(void);
-extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids);
+extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids,
+								  TransactionId snapshot_xid);
 extern void TeardownHistoricSnapshot(bool is_error);
 extern bool HistoricSnapshotActive(void);
 
-- 
2.20.1

