From 501e6c9842e225a90c52b8bf3f91adba03d38390 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 22 Feb 2017 00:57:33 +0100
Subject: [PATCH 3/3] Fix xl_running_xacts usage in snapshot builder

Due to race condition, the xl_running_xacts might contain no longer
running transactions. Previous coding tried to get around this by
additional locking but that did not work correctly for committs. Instead
try combining decoded commits and multiple xl_running_xacts to get the
consistent snapshot.
---
 src/backend/replication/logical/snapbuild.c | 79 +++++++++++++++++++++++------
 src/backend/storage/ipc/standby.c           |  7 +--
 2 files changed, 64 insertions(+), 22 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0b10044..e5f9a33 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1262,7 +1262,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 *	  simply track the number of in-progress toplevel transactions and
 	 *	  lower it whenever one commits or aborts. When that number
 	 *	  (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
-	 *	  to CONSISTENT.
+	 *	  to CONSISTENT. Sometimes we might get xl_running_xacts which has
+	 *	  all tracked transactions as finished. We'll need to restart tracking
+	 *	  in that case and use previously collected committed transactions to
+	 *	  purge transactions mistakenly marked as running in the
+	 *	  xl_running_xacts which exist as a result of race condition in
+	 *	  LogStandbySnapshot().
 	 *	  NB: We need to search running.xip when seeing a transaction's end to
 	 *	  make sure it's a toplevel transaction and it's been one of the
 	 *	  initially running ones.
@@ -1327,11 +1332,17 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 * b) first encounter of a useable xl_running_xacts record. If we had
 	 * found one earlier we would either track running transactions (i.e.
 	 * builder->running.xcnt != 0) or be consistent (this function wouldn't
-	 * get called).
+	 * get called). However it's possible that we could not see all
+	 * transactions that were marked as running in xl_running_xacts, so if
+	 * we get new one that says all were closed but we are not consistent
+	 * yet, we need to restart the tracking while taking previously seen
+	 * transactions into account.
 	 */
-	else if (!builder->running.xcnt)
+	else if (!builder->running.xcnt ||
+			 running->oldestRunningXid > builder->running.xmax)
 	{
 		int			off;
+		int			purge_running = builder->running.xcnt > 0;
 
 		/*
 		 * We only care about toplevel xids as those are the ones we
@@ -1367,26 +1378,15 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		builder->running.xmin = builder->running.xip[0];
 		builder->running.xmax = builder->running.xip[running->xcnt - 1];
 
+
 		/* makes comparisons cheaper later */
 		TransactionIdRetreat(builder->running.xmin);
 		TransactionIdAdvance(builder->running.xmax);
 
 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
 
-		ereport(LOG,
-			(errmsg("logical decoding found initial starting point at %X/%X",
-					(uint32) (lsn >> 32), (uint32) lsn),
-			 errdetail_plural("%u transaction needs to finish.",
-							  "%u transactions need to finish.",
-							  builder->running.xcnt,
-							  (uint32) builder->running.xcnt)));
-
 		/*
-		 * Iterate through all xids, wait for them to finish.
-		 *
-		 * This isn't required for the correctness of decoding, but to allow
-		 * isolationtester to notice that we're currently waiting for
-		 * something.
+		 * Iterate through all xids and do additional checking/purging.
 		 */
 		for (off = 0; off < builder->running.xcnt; off++)
 		{
@@ -1400,9 +1400,56 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 			if (TransactionIdIsCurrentTransactionId(xid))
 				elog(ERROR, "waiting for ourselves");
 
+			/*
+			 * Use gathered info about committed transactions to purge
+			 * committed transactions recorded xl_running_xacts as running
+			 * because of race condition in LogStandbySnapshot(). This may
+			 * be slow but it should be called at most once per slot
+			 * initialization.
+			 */
+			if (purge_running)
+			{
+				int i;
+
+				for (i = 0; i < builder->committed.xcnt; i++)
+				{
+					if (builder->committed.xip[i] == xid)
+					{
+						SnapBuildEndTxn(builder, lsn, xid);
+						continue;
+					}
+				}
+			}
+
+			/*
+			 * This isn't required for the correctness of decoding, but to allow
+			 * isolationtester to notice that we're currently waiting for
+			 * something.
+			 */
 			XactLockTableWait(xid, NULL, NULL, XLTW_None);
 		}
 
+		if (!purge_running)
+		{
+			ereport(LOG,
+				(errmsg("logical decoding found initial starting point at %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail_plural("%u transaction needs to finish.",
+								  "%u transactions need to finish.",
+								  builder->running.xcnt,
+								  (uint32) builder->running.xcnt)));
+		}
+		else
+		{
+			ereport(LOG,
+				(errmsg("logical decoding moved initial starting point to %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail_plural("%u transaction needs to finish.",
+								  "%u transactions need to finish.",
+								  builder->running.xcnt,
+								  (uint32) builder->running.xcnt)));
+		}
+
 		/* nothing could have built up so far, so don't perform cleanup */
 		return false;
 	}
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 6259070..9b41a28 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -945,15 +945,10 @@ LogStandbySnapshot(void)
 	 * record. Fortunately this routine isn't executed frequently, and it's
 	 * only a shared lock.
 	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
-		LWLockRelease(ProcArrayLock);
+	LWLockRelease(ProcArrayLock);
 
 	recptr = LogCurrentRunningXacts(running);
 
-	/* Release lock if we kept it longer ... */
-	if (wal_level >= WAL_LEVEL_LOGICAL)
-		LWLockRelease(ProcArrayLock);
-
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
-- 
2.7.4

