From 2f4d36456430e0974d9348f9c4ea5ece6656c544 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 22 Feb 2017 00:57:33 +0100
Subject: [PATCH 3/4] Fix xl_running_xacts usage in snapshot builder

Due to race condition, the xl_running_xacts might contain no longer
running transactions. Previous coding tried to get around this by
additional locking but that did not work correctly for committs. Instead
try combining decoded commits and multiple xl_running_xacts to get the
consistent snapshot.

This also reverts changes made to GetRunningTransactionData() and
LogStandbySnapshot() by b89e151 as the additional locking does not help.
---
 src/backend/replication/logical/snapbuild.c | 65 ++++++++++++++++++++++++-----
 src/backend/storage/ipc/procarray.c         |  5 ++-
 src/backend/storage/ipc/standby.c           | 19 ---------
 3 files changed, 57 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 143e8ec..40937fb 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1221,7 +1221,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 *	  simply track the number of in-progress toplevel transactions and
 	 *	  lower it whenever one commits or aborts. When that number
 	 *	  (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
-	 *	  to CONSISTENT.
+	 *	  to CONSISTENT. Sometimes we might get xl_running_xacts which has
+	 *	  all tracked transactions as finished. We'll need to restart tracking
+	 *	  in that case and use previously collected committed transactions to
+	 *	  purge transactions mistakenly marked as running in the
+	 *	  xl_running_xacts which exist as a result of race condition in
+	 *	  LogStandbySnapshot().
 	 *	  NB: We need to search running.xip when seeing a transaction's end to
 	 *	  make sure it's a toplevel transaction and it's been one of the
 	 *	  initially running ones.
@@ -1286,9 +1291,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 * b) first encounter of a useable xl_running_xacts record. If we had
 	 * found one earlier we would either track running transactions (i.e.
 	 * builder->running.xcnt != 0) or be consistent (this function wouldn't
-	 * get called).
+	 * get called). However it's possible that we could not see all
+	 * transactions that were marked as running in xl_running_xacts, so if
+	 * we get new one that says all were closed but we are not consistent
+	 * yet, we need to restart the tracking while taking previously seen
+	 * transactions into account.
 	 */
-	else if (!builder->running.xcnt)
+	else if (!builder->running.xcnt ||
+			 running->oldestRunningXid > builder->running.xmax)
 	{
 		int			off;
 
@@ -1326,20 +1336,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		builder->running.xmin = builder->running.xip[0];
 		builder->running.xmax = builder->running.xip[running->xcnt - 1];
 
+
 		/* makes comparisons cheaper later */
 		TransactionIdRetreat(builder->running.xmin);
 		TransactionIdAdvance(builder->running.xmax);
 
 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
 
-		ereport(LOG,
-			(errmsg("logical decoding found initial starting point at %X/%X",
-					(uint32) (lsn >> 32), (uint32) lsn),
-			 errdetail_plural("%u transaction needs to finish.",
-							  "%u transactions need to finish.",
-							  builder->running.xcnt,
-							  (uint32) builder->running.xcnt)));
-
 		/*
 		 * Iterate through all xids, wait for them to finish.
 		 *
@@ -1359,9 +1362,49 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 			if (TransactionIdIsCurrentTransactionId(xid))
 				elog(ERROR, "waiting for ourselves");
 
+			/*
+			 * This isn't required for the correctness of decoding, but to allow
+			 * isolationtester to notice that we're currently waiting for
+			 * something.
+			 */
 			XactLockTableWait(xid, NULL, NULL, XLTW_None);
 		}
 
+		/*
+		 * Because of the race condition in LogStandbySnapshot() the
+		 * transactions recorded in xl_running_xacts as running might have
+		 * already committed by the time the xl_running_xacts was written
+		 * to WAL. Use the information about decoded transactions that we
+		 * gathered so far to update our idea about what's still running.
+		 *
+		 * We can use SnapBuildEndTxn directly as it only does the transaction
+		 * running check and handling without any additional side effects.
+		 */
+		for (off = 0; off < builder->committed.xcnt; off++)
+			SnapBuildEndTxn(builder, lsn, builder->committed.xip[off]);
+
+		/* Report which action we actually did here. */
+		if (!builder->running.xcnt)
+		{
+			ereport(LOG,
+				(errmsg("logical decoding found initial starting point at %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail_plural("%u transaction needs to finish.",
+								  "%u transactions need to finish.",
+								  builder->running.xcnt,
+								  (uint32) builder->running.xcnt)));
+		}
+		else
+		{
+			ereport(LOG,
+				(errmsg("logical decoding moved initial starting point to %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail_plural("%u transaction needs to finish.",
+								  "%u transactions need to finish.",
+								  builder->running.xcnt,
+								  (uint32) builder->running.xcnt)));
+		}
+
 		/* nothing could have built up so far, so don't perform cleanup */
 		return false;
 	}
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cd14667..4ea81f8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2055,12 +2055,13 @@ GetRunningTransactionData(void)
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
 
+	/* We don't release XidGenLock here, the caller is responsible for that */
+	LWLockRelease(ProcArrayLock);
+
 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
 	Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
 
-	/* We don't release the locks here, the caller is responsible for that */
-
 	return CurrentRunningXacts;
 }
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 6259070..f461f21 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -933,27 +933,8 @@ LogStandbySnapshot(void)
 	 */
 	running = GetRunningTransactionData();
 
-	/*
-	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
-	 * For Hot Standby this can be done before inserting the WAL record
-	 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
-	 * the clog. For logical decoding, though, the lock can't be released
-	 * early because the clog might be "in the future" from the POV of the
-	 * historic snapshot. This would allow for situations where we're waiting
-	 * for the end of a transaction listed in the xl_running_xacts record
-	 * which, according to the WAL, has committed before the xl_running_xacts
-	 * record. Fortunately this routine isn't executed frequently, and it's
-	 * only a shared lock.
-	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
-		LWLockRelease(ProcArrayLock);
-
 	recptr = LogCurrentRunningXacts(running);
 
-	/* Release lock if we kept it longer ... */
-	if (wal_level >= WAL_LEVEL_LOGICAL)
-		LWLockRelease(ProcArrayLock);
-
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
-- 
2.7.4

