SERIALIZABLE on standby servers

Started by Thomas Munroabout 9 years ago13 messages
#1Thomas Munro
thomas.munro@enterprisedb.com
1 attachment(s)

Hi hackers

Here is an experimental WIP patch to allow SERIALIZABLE READ ONLY
DEFERRABLE transactions on standby servers without serialisation
anomalies, based loosely on an old email from Kevin Grittner[1]/messages/by-id/4D3735E30200002500039869@gw.wicourts.gov. I'm
not sure how far this is from what he had in mind or whether I've
misunderstood something fundamental here, but I hope this can at least
serve as a starting point and we can try to get something into
Postgres 10.

The patch works by teaching the standby how to do somethings similar
to what SERIALIZABLE READ ONLY DEFERRABLE does on the primary server,
with some help from the primary server in the form of extra
information in the WAL.

The basic idea is: the standby should wait until a point in the
transaction stream where it can take a snapshot and either (1) there
were no concurrent read/write SERIALIZABLE transactions running on the
primary, or (2) the last concurrent read/write SERIALIZABLE
transaction at snapshot time has now ended without creating a
dangerous cycle with our transaction.

In case (1), the primary sometimes adds an extra
xl_xact_snapshot_safety struct to commit messages which says 'a
snapshot taken after this commit and before any future SSI commits is
safe, because there are no concurrent read/write SSI transactions at
this moment'.

In case (2), the xl_xact_snapshot_safety struct embedded in a commit
record instead says 'a snapshot taken after this commit and before any
future SSI commits is of unknown safety, because there are concurrent
transactions; I'll tell you when it has been determined; please
remember this token'. The token (which happens to be a CSN but that
is not important) will appear in a future independent snapshot safety
message which says whether a snapshot taken at that time is safe or
unsafe.

Note that xl_xact_snapshot_safety is embedded in the commit messages
(for SSI transactions only), but also appears as its own WAL record to
report the final state of a token from an earlier commit. So if you
do a lot of non-overlapping writable SSI transactions, you'll get just
a few extra bytes in each commit record, but overlapping transactions
will generate a stream of extra snapshot safety messages, one for each
commit involved.

In order to generate follow-up snapshot safety messages, the patch
creates 'hypothetical' transactions on the primary whenever a
writeable SSI transaction commits, so that it can figure out whether
such a transaction would conflict. These phantom transactions are
proxies for any transaction that may be created on a standby at the
same point in the transaction stream (with respect to SSI commits) on
any standby, and survive in memory just until they are found to be
safe or unsafe.

Example of use:

T1 on primary: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
T1 on primary: INSERT INTO foo VALUES ('x');
T2 on primary: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
T2 on primary: INSERT INTO foo VALUES ('x');
T2 on primary: COMMIT;
T3 on standby: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ
ONLY DEFERRABLE;
T3 on standby: SELECT * FROM foo;
T3 on standby: <...waits...>
T1 on primary: COMMIT;
T3 on standby: <...continues...>

Not tested much and certainly has bugs and many details to sort out,
but first... is this sound or could it be made so? Is there a better
way?

[1]: /messages/by-id/4D3735E30200002500039869@gw.wicourts.gov

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

ssi-standby-v1.patchapplication/octet-stream; name=ssi-standby-v1.patchDownload
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 91d27d0..6e9c95c 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -116,6 +116,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e11b229..7cb5562 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5114,6 +5114,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 
 	uint8		info;
 
@@ -5187,6 +5188,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetHypotheticalSnapshotSafety(&xl_snapshot_safety.token,
+									  &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5231,6 +5239,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogIncludeOrigin();
 
@@ -5400,12 +5412,20 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		TransactionIdAsyncCommitTree(
 							  xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		/* Coordinate atomic update of snapshot safety and ProcArray. */
+		if (parsed->snapshot_token != 0)
+			BeginHypotheticalSnapshotReplay(parsed->snapshot_token,
+											parsed->snapshot_safety);
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
 		ExpireTreeKnownAssignedTransactionIds(
 						  xid, parsed->nsubxacts, parsed->subxacts, max_xid);
 
+		if (parsed->snapshot_token != 0)
+			CompleteHypotheticalSnapshotReplay();
+
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
@@ -5575,6 +5595,34 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	}
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(uint64 token, SnapshotSafety safety)
+{
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	return XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -5639,6 +5687,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index defafa5..9852e5b 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -567,14 +567,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	*extra = malloc(sizeof(int));
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 24ed21b..ff67019 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -277,6 +277,7 @@
 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -427,7 +428,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize);
 static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
-									  TransactionId sourcexid);
+									  TransactionId sourcexid,
+									  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 						  PREDICATELOCKTARGETTAG *parent);
@@ -1207,6 +1209,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->LastReplayedHypotheticalSnapshotToken = 0;
+		PredXact->LastReplayedHypotheticalSnapshotSafety = 0;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1491,6 +1496,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1503,10 +1509,39 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-													   InvalidTransactionId);
+														 InvalidTransactionId,
+														 &snapshot_safety);
 
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+				return snapshot;
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO: Can this happen?  A SNAPSHOT_UNSAFE state can only be
+				 * generated by ReleasePredicateLocks *after* a commit record
+				 * which would establish a new hypothetical snapshot.  So we
+				 * can never take a snapshot here that is already known to be
+				 * unsafe; that is, there can never be a commit record with
+				 * unknown followed by snapshot safety record that immediately
+				 * marks it unsafe, because there must be a new commit in
+				 * between.  Right?
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+		}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1514,20 +1549,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1542,13 +1605,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false);
 
 	return snapshot;
 }
 
 /*
+ * When the primary server has determined the safety of a hypothetical
+ * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+ * COMMIT record, it emits a WAL record that causes the recovery process on
+ * standbys to call this function.  Here, we will wake up any backend that is
+ * currently waiting in GetSafeSnapshot to learn about the safety of a
+ * snapshot taken after that point in the transaction stream.
+ */
+void
+NotifyHypotheticalSnapshotSafety(uint64 token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared onthe most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->LastReplayedHypotheticalSnapshotToken == token)
+		PredXact->LastReplayedHypotheticalSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
+/*
  * Acquire a snapshot that can be used for the current transaction.
  *
  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
@@ -1570,7 +1684,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 	 * check_XactIsoLevel() if default_transaction_isolation is set to
 	 * serializable, so phrase the hint accordingly.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot use serializable mode in a hot standby"),
@@ -1586,7 +1700,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 InvalidTransactionId);
+												 InvalidTransactionId,
+												 NULL);
 }
 
 /*
@@ -1616,7 +1731,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
-	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid, NULL);
 }
 
 /*
@@ -1627,10 +1742,15 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL pointer, then the safety of this snapshot
+ * if used on a standby server is written to it.  If it is SNAPSHOT_SAFE, then
+ * the snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
-									  TransactionId sourcexid)
+									  TransactionId sourcexid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1641,8 +1761,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1683,6 +1801,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->LastReplayedHypotheticalSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->LastReplayedHypotheticalSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!TransactionIdIsValid(sourcexid))
 		snapshot = GetSnapshotData(snapshot);
@@ -3439,12 +3581,35 @@ ReleasePredicateLocks(bool isCommit)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4704,6 +4869,78 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->hypotheticalSnapshotSafety = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->hypotheticalSnapshotSafety = SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->xmin = MySerializableXact->xmin; /* TODO ?! */
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (!SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->hypotheticalSnapshotSafety =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -4966,3 +5203,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
 	}
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetHypotheticalSnapshotSafety(uint64 *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->hypotheticalSnapshotSafety;
+}
+
+/*
+ * If a commit record contains SSI snapshot safety information then we need to
+ * update that atomically with ProcArray from the point of view of anyone
+ * taking a serializable snapshot.  We achive that by holding
+ * SerializableXactHashLock, mirroring the way
+ * GetSerializableTransactionSnapshotInt does that when acquiring a snapshot.
+ * The recovery process should wrap its call to
+ * ExpireTreeKnownAssignedTransactionIds in calls to
+ * BeginHypotheticalSnapshotReplay and CompleteHypotheticalSnapshotReplay when
+ * it has safety information.
+ */
+void
+BeginHypotheticalSnapshotReplay(uint64 token, SnapshotSafety safety)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+	PredXact->LastReplayedHypotheticalSnapshotToken = token;
+	PredXact->LastReplayedHypotheticalSnapshotSafety = safety;
+}
+
+void
+CompleteHypotheticalSnapshotReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b201631..bf2e2b7 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -394,6 +394,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a123d2a..3282dfa 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -74,6 +74,13 @@ extern int	synchronous_commit;
 /* Kluge for 2PC support */
 extern bool MyXactAccessedTempRel;
 
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
 /*
  *	start- and end-of-transaction callbacks for dynamically loaded modules
  */
@@ -118,7 +125,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -137,6 +144,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_INVALS			(1U << 3)
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 6)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -232,6 +240,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	uint64		token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -243,6 +257,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_invals follows if XINFO_HAS_INVALS */
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -286,6 +301,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	uint64		snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef struct xl_xact_parsed_abort
@@ -370,6 +388,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
 				   TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogSnapshotSafetyRecord(uint64, SnapshotSafety safety);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index a66b5b7..c59e1c6 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -14,6 +14,7 @@
 #ifndef PREDICATE_H
 #define PREDICATE_H
 
+#include "access/xact.h" /* for SnapshotSafe; where else to put that? */
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
@@ -27,7 +28,6 @@ extern int	max_predicate_locks_per_xact;
 /* Number of SLRU buffers to use for predicate locking */
 #define NUM_OLDSERXID_BUFFERS	16
 
-
 /*
  * function prototypes
  */
@@ -70,4 +70,10 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
 extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 							   void *recdata, uint32 len);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetHypotheticalSnapshotSafety(uint64 *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(uint64 token, SnapshotSafety safety);
+extern void BeginHypotheticalSnapshotReplay(uint64 token, SnapshotSafety safety);
+extern void CompleteHypotheticalSnapshotReplay(void);
+
 #endif   /* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3175d28..ce97a1b 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety hypotheticalSnapshotSafety;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;		/* invalid means still running; else
@@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT
 #define SXACT_FLAG_RO_UNSAFE			0x00000100
 #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000200
 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+#define SXACT_FLAG_HYPOTHETICAL			0x00000800
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -173,6 +180,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;		/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	uint64 LastReplayedHypotheticalSnapshotToken;
+	SnapshotSafety LastReplayedHypotheticalSnapshotSafety;
+
 	PredXactListElement element;
 }	PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7dc8dac..4df6eb3 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -133,6 +133,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	uint64		waitSnapshotToken;
+	int			snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
#2Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Thomas Munro (#1)
1 attachment(s)
Re: SERIALIZABLE on standby servers

On Tue, Nov 8, 2016 at 5:56 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Here is an experimental WIP patch to allow SERIALIZABLE READ ONLY
DEFERRABLE transactions on standby servers without serialisation
anomalies, based loosely on an old email from Kevin Grittner[1]. I'm
not sure how far this is from what he had in mind or whether I've
misunderstood something fundamental here, but I hope this can at least
serve as a starting point and we can try to get something into
Postgres 10.

While out walking I realised what was wrong with that. It's going to
take me a while to find the time to get back to this, so I figured I
should share this realisation in case anyone else is interested in the
topic.

The problem is that it determines snapshot safety in
PreCommit_CheckForSerializationFailure, and then races other backends
to XactLogCommitRecord. It could determine that a hypothetical
snapshot taken after this commit is safe, but then other activity
resulting in a hypothetical snapshot of unknown safety could happen
and be logged before we record our determination in the log.

One solution could be to serialise XactLogCommitRecord for SSI
transactions using SerializableXactHashLock, and determine
hypothetical snapshot safety at the same time, so that commit replay
order matches safety determination order. But it would suck to add
another point of lock contention to SSI commits. Another solution
could be to have recovery on the standby detect tokens (CSNs
incremented by PreCommit_CheckForSerializationFailure) arriving out of
order, but I don't know what exactly it should do about that when it
is detected: you shouldn't respect an out-of-order claim of safety,
but then what should you wait for? Perhaps if the last replayed
commit record before that was marked SNAPSHOT_SAFE then it's OK to
leave it that way, and if it was marked SNAPSHOT_SAFETY_UNKNOWN then
you have to wait for that one to be resolved by a follow-up snapshot
safety message and then rince-and-repeat (take a new snapshot etc). I
think that might work, but it seems strange to allow random races on
the primary to create extra delays on the standby. Perhaps there is
some much simpler way to do all this that I'm missing.

Another detail is that standbys that start up from a checkpoint and
don't see any SSI transactions commit don't yet have any snapshot
safety information, but defaulting to assuming that this point is safe
doesn't seem right, so I suspect it needs to be in checkpoints.

Attached is a tidied up version which doesn't try to address the above
problems yet. When time permits I'll come back to this.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

ssi-standby-v2.patchapplication/octet-stream; name=ssi-standby-v2.patchDownload
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 5bedaf2..60cd641 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2364,15 +2364,6 @@ LOG:  database system is ready to accept read only connections
      your setting of <varname>max_prepared_transactions</> is 0.
     </para>
    </listitem>
-   <listitem>
-    <para>
-     The Serializable transaction isolation level is not yet available in hot
-     standby.  (See <xref linkend="xact-serializable"> and
-     <xref linkend="serializable-consistency"> for details.)
-     An attempt to set a transaction to the serializable isolation level in
-     hot standby mode will generate an error.
-    </para>
-   </listitem>
   </itemizedlist>
 
    </para>
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index 306def4..f489714 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -1596,15 +1596,6 @@ SELECT pg_advisory_lock(q.id) FROM
     <para>
      See <xref linkend="xact-serializable"> for performance suggestions.
     </para>
-
-    <warning>
-     <para>
-      This level of integrity protection using Serializable transactions
-      does not yet extend to hot standby mode (<xref linkend="hot-standby">).
-      Because of that, those using hot standby may want to use Repeatable
-      Read and explicit locking on the master.
-     </para>
-    </warning>
    </sect2>
 
    <sect2 id="non-serializable-consistency">
@@ -1697,18 +1688,6 @@ SELECT pg_advisory_lock(q.id) FROM
     could cause visible inconsistency between the contents of the target
     table and other tables in the database.
    </para>
-
-   <para>
-    Support for the Serializable transaction isolation level has not yet
-    been added to Hot Standby replication targets (described in
-    <xref linkend="hot-standby">).  The strictest isolation level currently
-    supported in hot standby mode is Repeatable Read.  While performing all
-    permanent database writes within Serializable transactions on the
-    master will ensure that all standbys will eventually reach a consistent
-    state, a Repeatable Read transaction run on the standby can sometimes
-    see a transient state that is inconsistent with any serial execution
-    of the transactions on the master.
-   </para>
   </sect1>
 
   <sect1 id="locking-indexes">
diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index ca55a5b..0269e21 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -152,6 +152,18 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
   </para>
 
   <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
    The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
    transaction to run with the same <firstterm>snapshot</> as an existing
    transaction.  The pre-existing transaction must have exported its snapshot
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 91d27d0..e0bc9ad 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -116,6 +116,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
@@ -171,6 +182,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 	}
 }
 
+static const char *
+xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety)
+{
+	const char *string = "<unknown>";
+
+	switch(snapshot_safety)
+	{
+	case SNAPSHOT_SAFE:
+		string = "SNAPSHOT_SAFE";
+		break;
+	case SNAPSHOT_UNSAFE:
+		string = "SNAPSHOT_UNSAFE";
+		break;
+	case SNAPSHOT_SAFETY_UNKNOWN:
+		string = "SNAPSHOT_SAFETY_UNKNOWN";
+		break;
+	}
+
+	return string;
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -220,6 +252,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 						 (uint32) parsed.origin_lsn,
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		appendStringInfo(buf, "; snapshot safety: %s, token: %lx",
+						 xact_snapshot_safety_to_string(parsed.snapshot_safety),
+						 parsed.snapshot_token);
+	}
 }
 
 static void
@@ -266,6 +305,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
 		appendStringInfo(buf, " %u", xlrec->xsub[i]);
 }
 
+static void
+xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec)
+{
+	appendStringInfo(buf, "snapshot safety: %s, token: %lx",
+					 xact_snapshot_safety_to_string(xlrec->safety),
+					 xlrec->token);
+}
+
 void
 xact_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -297,6 +344,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec;
+
+		xact_desc_snapshot_safety(buf, xlrec);
+	}
 }
 
 const char *
@@ -324,6 +377,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_SNAPSHOT_SAFETY:
+			id = "SNAPSHOT_SAFETY";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e11b229..0913a28 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1834,13 +1834,14 @@ StartTransaction(void)
 	{
 		s->startedInRecovery = true;
 		XactReadOnly = true;
+		XactDeferrable = true;
 	}
 	else
 	{
 		s->startedInRecovery = false;
 		XactReadOnly = DefaultXactReadOnly;
+		XactDeferrable = DefaultXactDeferrable;
 	}
-	XactDeferrable = DefaultXactDeferrable;
 	XactIsoLevel = DefaultXactIsoLevel;
 	forceSyncCommit = false;
 	MyXactAccessedTempRel = false;
@@ -5114,6 +5115,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 
 	uint8		info;
 
@@ -5187,6 +5189,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token,
+										 &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5231,6 +5240,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogIncludeOrigin();
 
@@ -5400,12 +5413,23 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		TransactionIdAsyncCommitTree(
 							  xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		/* Coordinate atomic update of snapshot safety and ProcArray. */
+		if (parsed->snapshot_token != 0)
+		{
+			BeginSnapshotSafetyReplay();
+			SetNewestSnapshotSafety(parsed->snapshot_token,
+									parsed->snapshot_safety);
+		}
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
 		ExpireTreeKnownAssignedTransactionIds(
 						  xid, parsed->nsubxacts, parsed->subxacts, max_xid);
 
+		if (parsed->snapshot_token != 0)
+			CompleteSnapshotSafetyReplay();
+
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
@@ -5575,6 +5599,39 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	}
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety)
+{
+	XLogRecPtr result;
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	START_CRIT_SECTION();
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+	END_CRIT_SECTION();
+
+	return result;
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -5639,6 +5696,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6cec027..8f3ba59 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4821,6 +4821,8 @@ BootStrapXLOG(void)
 	checkPoint.newestCommitTsXid = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
+	checkPoint.newestSnapshotToken = 0;
+	checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE;
 
 	ShmemVariableCache->nextXid = checkPoint.nextXid;
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -6426,6 +6428,8 @@ StartupXLOG(void)
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
 	XLogCtl->ckptXid = checkPoint.nextXid;
+	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+							checkPoint.newestSnapshotSafety);
 
 	/*
 	 * Initialize replication slots, before there's a chance to remove
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index defafa5..9852e5b 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -567,14 +567,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	*extra = malloc(sizeof(int));
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 24ed21b..03219b4 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -277,6 +277,7 @@
 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -427,7 +428,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize);
 static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
-									  TransactionId sourcexid);
+									  TransactionId sourcexid,
+									  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 						  PREDICATELOCKTARGETTAG *parent);
@@ -1207,6 +1209,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->NewestSnapshotToken = 0;
+		PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1491,6 +1496,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1503,10 +1509,43 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-													   InvalidTransactionId);
+														 InvalidTransactionId,
+														 &snapshot_safety);
 
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+				return snapshot;
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO:  This can only happen if the master ran out of memory
+				 * while trying to create a hypothetical transaction, right?
+				 * Should we wait or error out?
+				 *
+				 * Otherwise, a SNAPSHOT_UNSAFE state can only be
+				 * generated by ReleasePredicateLocks *after* a commit record
+				 * which would establish a new hypothetical snapshot.  So we
+				 * can never take a snapshot here that is already known to be
+				 * unsafe; that is, there can never be a commit record with
+				 * unknown followed by snapshot safety record that immediately
+				 * marks it unsafe, because there must be a new commit in
+				 * between.
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+		}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1514,20 +1553,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1542,13 +1609,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false);
 
 	return snapshot;
 }
 
 /*
+ * When the primary server has determined the safety of a hypothetical
+ * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+ * COMMIT record, it emits a WAL record that causes the recovery process on
+ * standbys to call this function.  Here, we will wake up any backend that is
+ * currently waiting in GetSafeSnapshot to learn about the safety of a
+ * snapshot taken after that point in the transaction stream.
+ */
+void
+NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared onthe most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->NewestSnapshotToken == token)
+		PredXact->NewestSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
+/*
  * Acquire a snapshot that can be used for the current transaction.
  *
  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
@@ -1566,16 +1684,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 
 	/*
 	 * Can't use serializable mode while recovery is still active, as it is,
-	 * for example, on a hot standby.  We could get here despite the check in
-	 * check_XactIsoLevel() if default_transaction_isolation is set to
-	 * serializable, so phrase the hint accordingly.
+	 * for example, on a hot standby, unless DEFERRABLE mode is active.  In
+	 * that case, DEFERRABLE is the default, so this error should should only
+	 * be reachable if the user has explicitly asked for NOT DEFERRABLE via
+	 * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot use serializable mode in a hot standby"),
-				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
-				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
+				 errmsg("cannot use serializable not deferrable mode in a hot standby"),
+				 errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."),
+				 errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE.")));
 
 	/*
 	 * A special optimization is available for SERIALIZABLE READ ONLY
@@ -1586,7 +1705,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 InvalidTransactionId);
+												 InvalidTransactionId,
+												 NULL);
 }
 
 /*
@@ -1616,7 +1736,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
-	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid, NULL);
 }
 
 /*
@@ -1627,10 +1747,15 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL pointer, then the safety of this snapshot
+ * if used on a standby server is written to it.  If it is SNAPSHOT_SAFE, then
+ * the snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
-									  TransactionId sourcexid)
+									  TransactionId sourcexid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1641,8 +1766,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1683,6 +1806,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->NewestSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!TransactionIdIsValid(sourcexid))
 		snapshot = GetSnapshotData(snapshot);
@@ -3439,12 +3586,35 @@ ReleasePredicateLocks(bool isCommit)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4704,6 +4874,80 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->xmin = MySerializableXact->xmin;
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (othersxact != MySerializableXact
+					&& !SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -4966,3 +5210,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
 	}
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->snapshotSafetyAfterThisCommit;
+}
+
+void
+BeginSnapshotSafetyReplay(void)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+}
+
+/*
+ * Used in recovery when replaying commit records.  On a hot standby, these
+ * values must be set atomically with ProcArray updates, mirroring the code
+ * in GetSerializableTransactionSnapshotInt.  This is done by wrapping both
+ * in BeginSnapshotSafetyReplay/CompleteSnapshotSafetyReplay.
+ */
+void
+SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PredXact->NewestSnapshotToken = token;
+	PredXact->NewestSnapshotSafety = safety;
+}
+
+void
+CompleteSnapshotSafetyReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b201631..bf2e2b7 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -394,6 +394,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a123d2a..7ea72d5 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -118,7 +118,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -137,6 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_INVALS			(1U << 3)
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 6)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -232,6 +233,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	SnapshotToken token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -243,6 +250,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_invals follows if XINFO_HAS_INVALS */
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -286,6 +294,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	SnapshotToken snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef struct xl_xact_parsed_abort
@@ -370,6 +381,10 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
 				   TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token,
+					SnapshotSafety safety);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index c2c6632..13789fa 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -51,6 +51,18 @@ typedef uint32 TimeLineID;
 typedef uint16 RepOriginId;
 
 /*
+ * Snapshot safety information using to control SERIALIAZABLE on standby
+ * servers appears in checkpoints, so we define the types used here.
+ */
+typedef uint64 SnapshotToken;
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
+/*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
  *	it is a win to use it in all cases where we sync on each write().  We could
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index a66b5b7..0fd3aa7 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -27,7 +27,6 @@ extern int	max_predicate_locks_per_xact;
 /* Number of SLRU buffers to use for predicate locking */
 #define NUM_OLDSERXID_BUFFERS	16
 
-
 /*
  * function prototypes
  */
@@ -70,4 +69,11 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
 extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 							   void *recdata, uint32 len);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void BeginSnapshotSafetyReplay(void);
+extern void SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void CompleteSnapshotSafetyReplay(void);
+
 #endif   /* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 3175d28..baa2b0f 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety snapshotSafetyAfterThisCommit;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;		/* invalid means still running; else
@@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT
 #define SXACT_FLAG_RO_UNSAFE			0x00000100
 #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000200
 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+#define SXACT_FLAG_HYPOTHETICAL			0x00000800
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -173,6 +180,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;		/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	SnapshotToken NewestSnapshotToken;
+	SnapshotSafety NewestSnapshotSafety;
+
 	PredXactListElement element;
 }	PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7dc8dac..468018e 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -133,6 +133,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	SnapshotToken waitSnapshotToken;
+	SnapshotSafety snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
#3Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Thomas Munro (#2)
Re: SERIALIZABLE on standby servers

On Wed, Nov 16, 2016 at 9:26 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Tue, Nov 8, 2016 at 5:56 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
[..] Another solution
could be to have recovery on the standby detect tokens (CSNs
incremented by PreCommit_CheckForSerializationFailure) arriving out of
order, but I don't know what exactly it should do about that when it
is detected: you shouldn't respect an out-of-order claim of safety,
but then what should you wait for? Perhaps if the last replayed
commit record before that was marked SNAPSHOT_SAFE then it's OK to
leave it that way, and if it was marked SNAPSHOT_SAFETY_UNKNOWN then
you have to wait for that one to be resolved by a follow-up snapshot
safety message and then rince-and-repeat (take a new snapshot etc). I
think that might work, but it seems strange to allow random races on
the primary to create extra delays on the standby. Perhaps there is
some much simpler way to do all this that I'm missing.

Another detail is that standbys that start up from a checkpoint and
don't see any SSI transactions commit don't yet have any snapshot
safety information, but defaulting to assuming that this point is safe
doesn't seem right, so I suspect it needs to be in checkpoints.

Attached is a tidied up version which doesn't try to address the above
problems yet. When time permits I'll come back to this.

I haven't looked at this again yet but a nearby thread reminded me of
another problem with this which I wanted to restate explicitly here in
the context of this patch. Even without replication in the picture,
there is a race to reach ProcArrayEndTransaction() after
RecordTransactionCommit() runs, which means that the DO history
(normal primary server) and REDO history (recovery) don't always agree
on the order that transactions become visible. With this patch, this
kind of diverging DO and REDO could allow undetectable read only
serialization anomalies. I think that ProcArrayEndTransaction() and
RecordTransactionCommit() need to be made atomic in the simple case so
that DO and REDO agree. Synchronous replication can make that more
likely and it seems like some other approach is probably needed to
delay visibility of not-yet-durable transactions while keeping the
order that transactions become visible the same on all nodes.

Aside from the problems I mentioned in my earlier message (race
between snapshot safety decision and logging order, and lack of
checkpointing of snapshot safety information), it seems like the two
DO vs REDO problems (race to ProcArrayEndTransaction, and deliberately
delayed visibility in syncrep) also need to be addressed before
SERIALIZABLE DEFERRABLE on standbys could make a water tight
guarantee.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Simon Riggs
simon@2ndquadrant.com
In reply to: Thomas Munro (#1)
Re: [HACKERS] SERIALIZABLE on standby servers

On 7 November 2016 at 23:56, Thomas Munro <thomas.munro@enterprisedb.com> wrote:

The patch works by teaching the standby how to do somethings similar
to what SERIALIZABLE READ ONLY DEFERRABLE does on the primary server,
with some help from the primary server in the form of extra
information in the WAL.

This is going in a direction I'm interested in, though I had some
independent thoughts.

The basic idea is: the standby should wait until a point in the
transaction stream where it can take a snapshot and either (1) there
were no concurrent read/write SERIALIZABLE transactions running on the
primary, or (2) the last concurrent read/write SERIALIZABLE
transaction at snapshot time has now ended without creating a
dangerous cycle with our transaction.

In case (1), the primary sometimes adds an extra
xl_xact_snapshot_safety struct to commit messages which says 'a
snapshot taken after this commit and before any future SSI commits is
safe, because there are no concurrent read/write SSI transactions at
this moment'.

That has the problem that we need to decorate every commit. I'd be
looking for a way to make the default case the safe path, so we add
little new data to WAL.

ISTM possible to insert a wait prior to the commit record when we
detect that our commit would cause a pivot failure, so we actively try
to avoid unsafe orderings.

The purpose of that is to 1) ensure that the WAL ordering is a
Serializable ordering in most cases. The wait would have a timeout for
the cases where we can't determine safety. But is also has purpose 2)
reduce the number of serialization failures on primary.

Once we know that the WAL sequence is serializable we can use that
information for both physical and logical.

In case (2), the xl_xact_snapshot_safety struct embedded in a commit
record instead says 'a snapshot taken after this commit and before any
future SSI commits is of unknown safety, because there are concurrent
transactions; I'll tell you when it has been determined; please
remember this token'. The token (which happens to be a CSN but that
is not important) will appear in a future independent snapshot safety
message which says whether a snapshot taken at that time is safe or
unsafe.

Note that xl_xact_snapshot_safety is embedded in the commit messages
(for SSI transactions only), but also appears as its own WAL record to
report the final state of a token from an earlier commit. So if you
do a lot of non-overlapping writable SSI transactions, you'll get just
a few extra bytes in each commit record, but overlapping transactions
will generate a stream of extra snapshot safety messages, one for each
commit involved.

I would want to add the Serilializable ordering info, not just "it is safe".

I think we'd need to add a timeout to the deferrable snapshot request,
because it could be hours.

In order to generate follow-up snapshot safety messages, the patch
creates 'hypothetical' transactions on the primary whenever a
writeable SSI transaction commits, so that it can figure out whether
such a transaction would conflict. These phantom transactions are
proxies for any transaction that may be created on a standby at the
same point in the transaction stream (with respect to SSI commits) on
any standby, and survive in memory just until they are found to be
safe or unsafe.

That needs a lot more explanation to determine the soundness of that
theory. README++

I think we need some isolationtester tests that generate various
orderings and then standby tests to check that works.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#5Simon Riggs
simon@2ndquadrant.com
In reply to: Thomas Munro (#3)
Re: [HACKERS] SERIALIZABLE on standby servers

On 19 January 2017 at 16:16, Thomas Munro <thomas.munro@enterprisedb.com> wrote:

On Wed, Nov 16, 2016 at 9:26 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Tue, Nov 8, 2016 at 5:56 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
[..] Another solution
could be to have recovery on the standby detect tokens (CSNs
incremented by PreCommit_CheckForSerializationFailure) arriving out of
order, but I don't know what exactly it should do about that when it
is detected: you shouldn't respect an out-of-order claim of safety,
but then what should you wait for? Perhaps if the last replayed
commit record before that was marked SNAPSHOT_SAFE then it's OK to
leave it that way, and if it was marked SNAPSHOT_SAFETY_UNKNOWN then
you have to wait for that one to be resolved by a follow-up snapshot
safety message and then rince-and-repeat (take a new snapshot etc). I
think that might work, but it seems strange to allow random races on
the primary to create extra delays on the standby. Perhaps there is
some much simpler way to do all this that I'm missing.

Another detail is that standbys that start up from a checkpoint and
don't see any SSI transactions commit don't yet have any snapshot
safety information, but defaulting to assuming that this point is safe
doesn't seem right, so I suspect it needs to be in checkpoints.

Attached is a tidied up version which doesn't try to address the above
problems yet. When time permits I'll come back to this.

I haven't looked at this again yet but a nearby thread reminded me of
another problem with this which I wanted to restate explicitly here in
the context of this patch. Even without replication in the picture,
there is a race to reach ProcArrayEndTransaction() after
RecordTransactionCommit() runs, which means that the DO history
(normal primary server) and REDO history (recovery) don't always agree
on the order that transactions become visible. With this patch, this
kind of diverging DO and REDO could allow undetectable read only
serialization anomalies. I think that ProcArrayEndTransaction() and
RecordTransactionCommit() need to be made atomic in the simple case so
that DO and REDO agree.

Not atomic, we just need to make ProcArrayEndTransaction() apply
changes in the order of commits.

I think that is more easily possible by reusing the
SyncRepWaitForLSN() code, since that already orders things by LSN.

So make all committers wait and then get WALwriter to wake people
after ProcArrayEndTransaction() has been applied.

Synchronous replication can make that more
likely and it seems like some other approach is probably needed to
delay visibility of not-yet-durable transactions while keeping the
order that transactions become visible the same on all nodes.
Aside from the problems I mentioned in my earlier message (race
between snapshot safety decision and logging order, and lack of
checkpointing of snapshot safety information), it seems like the two
DO vs REDO problems (race to ProcArrayEndTransaction, and deliberately
delayed visibility in syncrep) also need to be addressed before
SERIALIZABLE DEFERRABLE on standbys could make a water tight
guarantee.

While the difference in ordering is there, it would be useful to show
how that allows serializable anomalies iff the WAL ordering is already
known serializable.

Mixing robustness modes with access to the same data is avoidable by
design, but we could have a parameter(s) to prevent that if desirable,
but only to prevent documented problems.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#6Robert Haas
robertmhaas@gmail.com
In reply to: Simon Riggs (#5)
Re: [HACKERS] SERIALIZABLE on standby servers

On Thu, Nov 16, 2017 at 5:52 PM, Simon Riggs <simon@2ndquadrant.com> wrote:

I haven't looked at this again yet but a nearby thread reminded me of
another problem with this which I wanted to restate explicitly here in
the context of this patch. Even without replication in the picture,
there is a race to reach ProcArrayEndTransaction() after
RecordTransactionCommit() runs, which means that the DO history
(normal primary server) and REDO history (recovery) don't always agree
on the order that transactions become visible. With this patch, this
kind of diverging DO and REDO could allow undetectable read only
serialization anomalies. I think that ProcArrayEndTransaction() and
RecordTransactionCommit() need to be made atomic in the simple case so
that DO and REDO agree.

Not atomic, we just need to make ProcArrayEndTransaction() apply
changes in the order of commits.

I think that is more easily possible by reusing the
SyncRepWaitForLSN() code, since that already orders things by LSN.

So make all committers wait and then get WALwriter to wake people
after ProcArrayEndTransaction() has been applied.

That doesn't solve any problem we have. Making committers wait after
ProcArrayEndTransaction() wouldn't keep the effects of those
transactions from being visible to other transactions in the system.
And that's precisely the issue: on the primary, transactions become
visible to other transactions in the order in which they perform
ProcArrayEndTransaction(), but on the standby, they become visible in
the order in which they RecordTransactionCommit(). That difference is
a source of serialization anomalies.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#7Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Robert Haas (#6)
1 attachment(s)
Re: [HACKERS] SERIALIZABLE on standby servers

Hi Kevin, all,

/me pokes ancient thread

I haven't done any more work on the problems mentioned above, but I
ran into Kevin at PostgresOpen in San Francisco and he said he might
have some time to look at this problem. So, here is a long overdue
rebase of the WIP patch. It shows a first order approximation of
DEFERRABLE working on a standby (for example, see the sequence from
the first message in the thread[1]/messages/by-id/CAEepm=2b9TV+vJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ@mail.gmail.com). I'll add it to the next
Commitfest so I know when to rebase it.

[1]: /messages/by-id/CAEepm=2b9TV+vJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

0001-SERIALIZABLE-READ-ONLY-DEFERRABLE-for-hot-standby-v3.patchapplication/octet-stream; name=0001-SERIALIZABLE-READ-ONLY-DEFERRABLE-for-hot-standby-v3.patchDownload
From b0f953e1447bcf020bf2f25c065c0b151af2ee5d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Fri, 21 Sep 2018 01:49:22 +1200
Subject: [PATCH] SERIALIZABLE READ ONLY DEFERRABLE for hot standbys.

Work in progress!

Author: Thomas Munro, based on an idea from Kevin Grittner
Discussion: https://postgr.es/m/CAEepm%3D2b9TV%2BvJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ%40mail.gmail.com
---
 doc/src/sgml/high-availability.sgml       |   9 -
 doc/src/sgml/mvcc.sgml                    |  21 --
 doc/src/sgml/ref/set_transaction.sgml     |  36 +++
 src/backend/access/rmgrdesc/xactdesc.c    |  56 ++++
 src/backend/access/transam/xact.c         |  73 ++++-
 src/backend/access/transam/xlog.c         |   7 +
 src/backend/commands/variable.c           |   8 -
 src/backend/storage/lmgr/predicate.c      | 336 ++++++++++++++++++++--
 src/backend/storage/lmgr/proc.c           |   5 +
 src/include/access/xact.h                 |  17 +-
 src/include/access/xlogdefs.h             |  12 +
 src/include/storage/predicate.h           |   8 +-
 src/include/storage/predicate_internals.h |  12 +
 src/include/storage/proc.h                |   5 +
 14 files changed, 534 insertions(+), 71 deletions(-)

diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 8cb77f85ec0..1a26c26a71f 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2449,15 +2449,6 @@ LOG:  database system is ready to accept read only connections
      your setting of <varname>max_prepared_transactions</varname> is 0.
     </para>
    </listitem>
-   <listitem>
-    <para>
-     The Serializable transaction isolation level is not yet available in hot
-     standby.  (See <xref linkend="xact-serializable"/> and
-     <xref linkend="serializable-consistency"/> for details.)
-     An attempt to set a transaction to the serializable isolation level in
-     hot standby mode will generate an error.
-    </para>
-   </listitem>
   </itemizedlist>
 
    </para>
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index 73934e5cf37..49aa7d51c4a 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -1599,15 +1599,6 @@ SELECT pg_advisory_lock(q.id) FROM
     <para>
      See <xref linkend="xact-serializable"/> for performance suggestions.
     </para>
-
-    <warning>
-     <para>
-      This level of integrity protection using Serializable transactions
-      does not yet extend to hot standby mode (<xref linkend="hot-standby"/>).
-      Because of that, those using hot standby may want to use Repeatable
-      Read and explicit locking on the master.
-     </para>
-    </warning>
    </sect2>
 
    <sect2 id="non-serializable-consistency">
@@ -1700,18 +1691,6 @@ SELECT pg_advisory_lock(q.id) FROM
     could cause visible inconsistency between the contents of the target
     table and other tables in the database.
    </para>
-
-   <para>
-    Support for the Serializable transaction isolation level has not yet
-    been added to Hot Standby replication targets (described in
-    <xref linkend="hot-standby"/>).  The strictest isolation level currently
-    supported in hot standby mode is Repeatable Read.  While performing all
-    permanent database writes within Serializable transactions on the
-    master will ensure that all standbys will eventually reach a consistent
-    state, a Repeatable Read transaction run on the standby can sometimes
-    see a transient state that is inconsistent with any serial execution
-    of the transactions on the master.
-   </para>
   </sect1>
 
   <sect1 id="locking-indexes">
diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index 43b1c6c892b..0c7d8b22710 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -151,6 +151,42 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    is well suited for long-running reports or backups.
   </para>
 
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
   <para>
    The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
    transaction to run with the same <firstterm>snapshot</firstterm> as an existing
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 6d5ebd475b4..e4ab28b9eab 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -123,6 +123,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
@@ -209,6 +220,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 	}
 }
 
+static const char *
+xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety)
+{
+	const char *string = "<unknown>";
+
+	switch(snapshot_safety)
+	{
+	case SNAPSHOT_SAFE:
+		string = "SNAPSHOT_SAFE";
+		break;
+	case SNAPSHOT_UNSAFE:
+		string = "SNAPSHOT_UNSAFE";
+		break;
+	case SNAPSHOT_SAFETY_UNKNOWN:
+		string = "SNAPSHOT_SAFETY_UNKNOWN";
+		break;
+	}
+
+	return string;
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -258,6 +290,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 						 (uint32) parsed.origin_lsn,
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		appendStringInfo(buf, "; snapshot safety: %s, token: %lx",
+						 xact_snapshot_safety_to_string(parsed.snapshot_safety),
+						 parsed.snapshot_token);
+	}
 }
 
 static void
@@ -304,6 +343,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
 		appendStringInfo(buf, " %u", xlrec->xsub[i]);
 }
 
+static void
+xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec)
+{
+	appendStringInfo(buf, "snapshot safety: %s, token: %lx",
+					 xact_snapshot_safety_to_string(xlrec->safety),
+					 xlrec->token);
+}
+
 void
 xact_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -335,6 +382,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec;
+
+		xact_desc_snapshot_safety(buf, xlrec);
+	}
 }
 
 const char *
@@ -362,6 +415,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_SNAPSHOT_SAFETY:
+			id = "SNAPSHOT_SAFETY";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 875be180fe4..98136e05c6f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1812,13 +1812,14 @@ StartTransaction(void)
 	{
 		s->startedInRecovery = true;
 		XactReadOnly = true;
+		XactDeferrable = true;
 	}
 	else
 	{
 		s->startedInRecovery = false;
 		XactReadOnly = DefaultXactReadOnly;
+		XactDeferrable = DefaultXactDeferrable;
 	}
-	XactDeferrable = DefaultXactDeferrable;
 	XactIsoLevel = DefaultXactIsoLevel;
 	forceSyncCommit = false;
 	MyXactFlags = 0;
@@ -5228,6 +5229,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 	uint8		info;
 
 	Assert(CritSectionCount > 0);
@@ -5306,6 +5308,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token,
+										 &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5354,6 +5363,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
@@ -5565,12 +5578,23 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		TransactionIdAsyncCommitTree(
 									 xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		/* Coordinate atomic update of snapshot safety and ProcArray. */
+		if (parsed->snapshot_token != 0)
+		{
+			BeginSnapshotSafetyReplay();
+			SetNewestSnapshotSafety(parsed->snapshot_token,
+									parsed->snapshot_safety);
+		}
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
 		ExpireTreeKnownAssignedTransactionIds(
 											  xid, parsed->nsubxacts, parsed->subxacts, max_xid);
 
+		if (parsed->snapshot_token != 0)
+			CompleteSnapshotSafetyReplay();
+
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
@@ -5724,6 +5748,46 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety)
+{
+	XLogRecPtr result;
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	START_CRIT_SECTION();
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+	END_CRIT_SECTION();
+
+	/*
+	 * TODO: How can we avoid having to flush again (after already flushing
+	 * for the commit)?  If we don't have this flush here, then they standby
+	 * has to wait a while to find out whether its snapshot is safe.
+	 */
+	XLogFlush(result);
+
+	return result;
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -5797,6 +5861,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3025d0badb8..fec37f67e1b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5177,6 +5177,8 @@ BootStrapXLOG(void)
 	checkPoint.newestCommitTsXid = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
+//	checkPoint.newestSnapshotToken = 0;
+//	checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE;
 
 	ShmemVariableCache->nextXid = checkPoint.nextXid;
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -5186,6 +5188,8 @@ BootStrapXLOG(void)
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/* Set up the XLOG page header */
 	page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -6813,6 +6817,9 @@ StartupXLOG(void)
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
 	XLogCtl->ckptXid = checkPoint.nextXid;
+	/* TODO: figure out checkpoint protocol */
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/*
 	 * Initialize replication slots, before there's a chance to remove
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 9a754dae3fd..9c6083d0edc 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -564,14 +564,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	*extra = malloc(sizeof(int));
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index e8390311d03..8053d23131a 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -279,6 +279,7 @@
 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -433,7 +434,8 @@ static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid);
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 						  PREDICATELOCKTARGETTAG *parent);
@@ -1186,6 +1188,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->NewestSnapshotToken = 0;
+		PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1468,6 +1473,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1480,10 +1486,36 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-														 NULL, InvalidPid);
-
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+														 NULL, InvalidPid,
+														 &snapshot_safety);
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+			{
+				elog(WARNING, "got a safe snapshot!");
+				return snapshot;
+			}
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO:  This can only happen if the master ran out of memory
+				 * while trying to create a hypothetical transaction, right?
+				 * Should we wait or error out?
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+	}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1491,20 +1523,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1519,12 +1579,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false);
 
 	return snapshot;
 }
 
+  /*
+   * When the primary server has determined the safety of a hypothetical
+   * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+   * COMMIT record, it emits a WAL record that causes the recovery process on
+   * standbys to call this function.  Here, we will wake up any backend that is
+   * currently waiting in GetSafeSnapshot to learn about the safety of a
+   * snapshot taken after that point in the transaction stream.
+   */
+void
+NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+	elog(LOG, "NotifyHypotheticalSnapshotSafety token = %ld, safety = %d", token, safety);
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared onthe most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->NewestSnapshotToken == token)
+		PredXact->NewestSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
 /*
  * GetSafeSnapshotBlockingPids
  *		If the specified process is currently blocked in GetSafeSnapshot,
@@ -1593,16 +1705,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 
 	/*
 	 * Can't use serializable mode while recovery is still active, as it is,
-	 * for example, on a hot standby.  We could get here despite the check in
-	 * check_XactIsoLevel() if default_transaction_isolation is set to
-	 * serializable, so phrase the hint accordingly.
+	 * for example, on a hot standby, unless DEFERRABLE mode is active.  In
+	 * that case, DEFERRABLE is the default, so this error should should only
+	 * be reachable if the user has explicitly asked for NOT DEFERRABLE via
+	 * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot use serializable mode in a hot standby"),
-				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
-				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
+				 errmsg("cannot use serializable not deferrable mode in a hot standby"),
+				 errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."),
+				 errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE.")));
 
 	/*
 	 * A special optimization is available for SERIALIZABLE READ ONLY
@@ -1613,7 +1726,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 NULL, InvalidPid);
+												 NULL, InvalidPid, NULL);
 }
 
 /*
@@ -1645,7 +1758,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
 	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
-												 sourcepid);
+												 sourcepid, NULL);
 }
 
 /*
@@ -1656,11 +1769,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL, then the safety of this snapshot if used
+ * on a standby server is written to it.  If it is SNAPSHOT_SAFE, then the
+ * snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then the
+ * caller must wait for the safety to be announced in the WAL.
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid)
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1671,8 +1790,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1713,6 +1830,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->NewestSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!sourcevxid)
 		snapshot = GetSnapshotData(snapshot);
@@ -3476,12 +3617,35 @@ ReleasePredicateLocks(bool isCommit)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4741,6 +4905,80 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->xmin = MySerializableXact->xmin;
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (othersxact != MySerializableXact
+					&& !SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -5003,3 +5241,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
 	}
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->snapshotSafetyAfterThisCommit;
+}
+
+void
+BeginSnapshotSafetyReplay(void)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+}
+
+/*
+ * Used in recovery when replaying commit records.  On a hot standby, these
+ * values must be set atomically with ProcArray updates, mirroring the code
+ * in GetSerializableTransactionSnapshotInt.  This is done by wrapping both
+ * in BeginSnapshotSafetyReplay/CompleteSnapshotSafetyReplay.
+ */
+void
+SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PredXact->NewestSnapshotToken = token;
+	PredXact->NewestSnapshotSafety = safety;
+}
+
+void
+CompleteSnapshotSafetyReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f9aaa52faf..33e7ace17bb 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -398,6 +398,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 083e879d5c3..da632445c68 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -143,7 +143,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -164,6 +164,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 8)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -259,6 +260,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	SnapshotToken token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -271,6 +278,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -318,6 +326,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	SnapshotToken snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
@@ -416,6 +427,10 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nrels, RelFileNode *rels,
 				   int xactflags, TransactionId twophase_xid,
 				   const char *twophase_gid);
+
+extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token,
+											  SnapshotSafety safety);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 0a48d1cfb40..c747a126865 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -50,6 +50,18 @@ typedef uint32 TimeLineID;
  */
 typedef uint16 RepOriginId;
 
+/*
+ * Snapshot safety information using to control SERIALIAZABLE on standby
+ * servers appears in checkpoints, so we define the types used here.
+ */
+typedef uint64 SnapshotToken;
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
 /*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 6a3464daa1e..a84f80d88fd 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -30,7 +30,6 @@ extern int	max_predicate_locks_per_page;
 /* Number of SLRU buffers to use for predicate locking */
 #define NUM_OLDSERXID_BUFFERS	16
 
-
 /*
  * function prototypes
  */
@@ -74,4 +73,11 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
 extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 							   void *recdata, uint32 len);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void BeginSnapshotSafetyReplay(void);
+extern void SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void CompleteSnapshotSafetyReplay(void);
+
 #endif							/* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 0f736d37dff..db69fba2925 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety snapshotSafetyAfterThisCommit;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;	/* invalid means still running; else the
@@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT
 #define SXACT_FLAG_RO_UNSAFE			0x00000100
 #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000200
 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+#define SXACT_FLAG_HYPOTHETICAL			0x00000800
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -172,6 +179,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;	/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	SnapshotToken NewestSnapshotToken;
+	SnapshotSafety NewestSnapshotSafety;
+
 	PredXactListElement element;
 }			PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index cb613c8076e..1497cdf66c4 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -152,6 +152,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	SnapshotToken waitSnapshotToken;
+	SnapshotSafety snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
-- 
2.17.0

#8Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Thomas Munro (#7)
1 attachment(s)
Re: [HACKERS] SERIALIZABLE on standby servers

On Sat, Sep 22, 2018 at 12:28 AM Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I'll add it to the next
Commitfest so I know when to rebase it.

And cfbot immediately showed that this assertion in
OldSerXidSetActiveSerXmin() could fail in the isolation tests:

Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
|| TransactionIdFollows(xid, oldSerXidControl->tailXid));

Not sure how that ever worked or if I screwed something up while
rebasing, but the quick (and possibly wrong?) solution I found was to
exclude hypothetical SERIALIABLEXACTs when scanning for the new oldest
xid:

@@ -3181,6 +3322,7 @@ SetNewSxactGlobalXmin(void)
        for (sxact = FirstPredXact(); sxact != NULL; sxact =
NextPredXact(sxact))
        {
                if (!SxactIsRolledBack(sxact)
+                       && !SxactIsHypothetical(sxact)
                        && !SxactIsCommitted(sxact)
                        && sxact != OldCommittedSxact)

Here's a version like that in the meantime so that the CI passes. The
real solution might be to give them their own xid (right now they
"borrow" one, see PreCommit_CheckForSerializationFailure()... now that
I think about it, that must be wrong), when I have more time for this
project.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

0001-SERIALIZABLE-READ-ONLY-DEFERRABLE-for-hot-standby-v4.patchapplication/octet-stream; name=0001-SERIALIZABLE-READ-ONLY-DEFERRABLE-for-hot-standby-v4.patchDownload
From 19854e30d9122ffd4a948e61d3fd9a5add208d10 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Fri, 21 Sep 2018 01:49:22 +1200
Subject: [PATCH] SERIALIZABLE READ ONLY DEFERRABLE for hot standbys.

Work in progress!

Author: Thomas Munro, based on an idea from Kevin Grittner
Discussion: https://postgr.es/m/CAEepm%3D2b9TV%2BvJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ%40mail.gmail.com
---
 doc/src/sgml/high-availability.sgml       |   9 -
 doc/src/sgml/mvcc.sgml                    |  21 --
 doc/src/sgml/ref/set_transaction.sgml     |  36 +++
 src/backend/access/rmgrdesc/xactdesc.c    |  56 ++++
 src/backend/access/transam/xact.c         |  73 ++++-
 src/backend/access/transam/xlog.c         |   7 +
 src/backend/commands/variable.c           |   8 -
 src/backend/storage/lmgr/predicate.c      | 337 ++++++++++++++++++++--
 src/backend/storage/lmgr/proc.c           |   5 +
 src/include/access/xact.h                 |  17 +-
 src/include/access/xlogdefs.h             |  12 +
 src/include/storage/predicate.h           |   8 +-
 src/include/storage/predicate_internals.h |  12 +
 src/include/storage/proc.h                |   5 +
 14 files changed, 535 insertions(+), 71 deletions(-)

diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 8cb77f85ec0..1a26c26a71f 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2449,15 +2449,6 @@ LOG:  database system is ready to accept read only connections
      your setting of <varname>max_prepared_transactions</varname> is 0.
     </para>
    </listitem>
-   <listitem>
-    <para>
-     The Serializable transaction isolation level is not yet available in hot
-     standby.  (See <xref linkend="xact-serializable"/> and
-     <xref linkend="serializable-consistency"/> for details.)
-     An attempt to set a transaction to the serializable isolation level in
-     hot standby mode will generate an error.
-    </para>
-   </listitem>
   </itemizedlist>
 
    </para>
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index 73934e5cf37..49aa7d51c4a 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -1599,15 +1599,6 @@ SELECT pg_advisory_lock(q.id) FROM
     <para>
      See <xref linkend="xact-serializable"/> for performance suggestions.
     </para>
-
-    <warning>
-     <para>
-      This level of integrity protection using Serializable transactions
-      does not yet extend to hot standby mode (<xref linkend="hot-standby"/>).
-      Because of that, those using hot standby may want to use Repeatable
-      Read and explicit locking on the master.
-     </para>
-    </warning>
    </sect2>
 
    <sect2 id="non-serializable-consistency">
@@ -1700,18 +1691,6 @@ SELECT pg_advisory_lock(q.id) FROM
     could cause visible inconsistency between the contents of the target
     table and other tables in the database.
    </para>
-
-   <para>
-    Support for the Serializable transaction isolation level has not yet
-    been added to Hot Standby replication targets (described in
-    <xref linkend="hot-standby"/>).  The strictest isolation level currently
-    supported in hot standby mode is Repeatable Read.  While performing all
-    permanent database writes within Serializable transactions on the
-    master will ensure that all standbys will eventually reach a consistent
-    state, a Repeatable Read transaction run on the standby can sometimes
-    see a transient state that is inconsistent with any serial execution
-    of the transactions on the master.
-   </para>
   </sect1>
 
   <sect1 id="locking-indexes">
diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index 43b1c6c892b..0c7d8b22710 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -151,6 +151,42 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    is well suited for long-running reports or backups.
   </para>
 
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the master server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
   <para>
    The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
    transaction to run with the same <firstterm>snapshot</firstterm> as an existing
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 6d5ebd475b4..e4ab28b9eab 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -123,6 +123,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
@@ -209,6 +220,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 	}
 }
 
+static const char *
+xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety)
+{
+	const char *string = "<unknown>";
+
+	switch(snapshot_safety)
+	{
+	case SNAPSHOT_SAFE:
+		string = "SNAPSHOT_SAFE";
+		break;
+	case SNAPSHOT_UNSAFE:
+		string = "SNAPSHOT_UNSAFE";
+		break;
+	case SNAPSHOT_SAFETY_UNKNOWN:
+		string = "SNAPSHOT_SAFETY_UNKNOWN";
+		break;
+	}
+
+	return string;
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -258,6 +290,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 						 (uint32) parsed.origin_lsn,
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		appendStringInfo(buf, "; snapshot safety: %s, token: %lx",
+						 xact_snapshot_safety_to_string(parsed.snapshot_safety),
+						 parsed.snapshot_token);
+	}
 }
 
 static void
@@ -304,6 +343,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
 		appendStringInfo(buf, " %u", xlrec->xsub[i]);
 }
 
+static void
+xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec)
+{
+	appendStringInfo(buf, "snapshot safety: %s, token: %lx",
+					 xact_snapshot_safety_to_string(xlrec->safety),
+					 xlrec->token);
+}
+
 void
 xact_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -335,6 +382,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec;
+
+		xact_desc_snapshot_safety(buf, xlrec);
+	}
 }
 
 const char *
@@ -362,6 +415,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_SNAPSHOT_SAFETY:
+			id = "SNAPSHOT_SAFETY";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 875be180fe4..98136e05c6f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1812,13 +1812,14 @@ StartTransaction(void)
 	{
 		s->startedInRecovery = true;
 		XactReadOnly = true;
+		XactDeferrable = true;
 	}
 	else
 	{
 		s->startedInRecovery = false;
 		XactReadOnly = DefaultXactReadOnly;
+		XactDeferrable = DefaultXactDeferrable;
 	}
-	XactDeferrable = DefaultXactDeferrable;
 	XactIsoLevel = DefaultXactIsoLevel;
 	forceSyncCommit = false;
 	MyXactFlags = 0;
@@ -5228,6 +5229,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 	uint8		info;
 
 	Assert(CritSectionCount > 0);
@@ -5306,6 +5308,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token,
+										 &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5354,6 +5363,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
@@ -5565,12 +5578,23 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		TransactionIdAsyncCommitTree(
 									 xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		/* Coordinate atomic update of snapshot safety and ProcArray. */
+		if (parsed->snapshot_token != 0)
+		{
+			BeginSnapshotSafetyReplay();
+			SetNewestSnapshotSafety(parsed->snapshot_token,
+									parsed->snapshot_safety);
+		}
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
 		ExpireTreeKnownAssignedTransactionIds(
 											  xid, parsed->nsubxacts, parsed->subxacts, max_xid);
 
+		if (parsed->snapshot_token != 0)
+			CompleteSnapshotSafetyReplay();
+
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
@@ -5724,6 +5748,46 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety)
+{
+	XLogRecPtr result;
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	START_CRIT_SECTION();
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+	END_CRIT_SECTION();
+
+	/*
+	 * TODO: How can we avoid having to flush again (after already flushing
+	 * for the commit)?  If we don't have this flush here, then they standby
+	 * has to wait a while to find out whether its snapshot is safe.
+	 */
+	XLogFlush(result);
+
+	return result;
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -5797,6 +5861,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3025d0badb8..fec37f67e1b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5177,6 +5177,8 @@ BootStrapXLOG(void)
 	checkPoint.newestCommitTsXid = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
+//	checkPoint.newestSnapshotToken = 0;
+//	checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE;
 
 	ShmemVariableCache->nextXid = checkPoint.nextXid;
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -5186,6 +5188,8 @@ BootStrapXLOG(void)
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/* Set up the XLOG page header */
 	page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -6813,6 +6817,9 @@ StartupXLOG(void)
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
 	XLogCtl->ckptXid = checkPoint.nextXid;
+	/* TODO: figure out checkpoint protocol */
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/*
 	 * Initialize replication slots, before there's a chance to remove
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 9a754dae3fd..9c6083d0edc 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -564,14 +564,6 @@ check_XactIsoLevel(char **newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	*extra = malloc(sizeof(int));
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index e8390311d03..87bf45e918a 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -279,6 +279,7 @@
 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -433,7 +434,8 @@ static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid);
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 						  PREDICATELOCKTARGETTAG *parent);
@@ -1186,6 +1188,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->NewestSnapshotToken = 0;
+		PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1468,6 +1473,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1480,10 +1486,36 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-														 NULL, InvalidPid);
-
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+														 NULL, InvalidPid,
+														 &snapshot_safety);
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+			{
+				elog(WARNING, "got a safe snapshot!");
+				return snapshot;
+			}
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO:  This can only happen if the master ran out of memory
+				 * while trying to create a hypothetical transaction, right?
+				 * Should we wait or error out?
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+	}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1491,20 +1523,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1519,12 +1579,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false);
 
 	return snapshot;
 }
 
+  /*
+   * When the primary server has determined the safety of a hypothetical
+   * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+   * COMMIT record, it emits a WAL record that causes the recovery process on
+   * standbys to call this function.  Here, we will wake up any backend that is
+   * currently waiting in GetSafeSnapshot to learn about the safety of a
+   * snapshot taken after that point in the transaction stream.
+   */
+void
+NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+	elog(LOG, "NotifyHypotheticalSnapshotSafety token = %ld, safety = %d", token, safety);
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared onthe most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->NewestSnapshotToken == token)
+		PredXact->NewestSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
 /*
  * GetSafeSnapshotBlockingPids
  *		If the specified process is currently blocked in GetSafeSnapshot,
@@ -1593,16 +1705,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 
 	/*
 	 * Can't use serializable mode while recovery is still active, as it is,
-	 * for example, on a hot standby.  We could get here despite the check in
-	 * check_XactIsoLevel() if default_transaction_isolation is set to
-	 * serializable, so phrase the hint accordingly.
+	 * for example, on a hot standby, unless DEFERRABLE mode is active.  In
+	 * that case, DEFERRABLE is the default, so this error should should only
+	 * be reachable if the user has explicitly asked for NOT DEFERRABLE via
+	 * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot use serializable mode in a hot standby"),
-				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
-				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
+				 errmsg("cannot use serializable not deferrable mode in a hot standby"),
+				 errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."),
+				 errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE.")));
 
 	/*
 	 * A special optimization is available for SERIALIZABLE READ ONLY
@@ -1613,7 +1726,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 NULL, InvalidPid);
+												 NULL, InvalidPid, NULL);
 }
 
 /*
@@ -1645,7 +1758,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
 	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
-												 sourcepid);
+												 sourcepid, NULL);
 }
 
 /*
@@ -1656,11 +1769,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL, then the safety of this snapshot if used
+ * on a standby server is written to it.  If it is SNAPSHOT_SAFE, then the
+ * snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then the
+ * caller must wait for the safety to be announced in the WAL.
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid)
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1671,8 +1790,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1713,6 +1830,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->NewestSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!sourcevxid)
 		snapshot = GetSnapshotData(snapshot);
@@ -3181,6 +3322,7 @@ SetNewSxactGlobalXmin(void)
 	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
 	{
 		if (!SxactIsRolledBack(sxact)
+			&& !SxactIsHypothetical(sxact)
 			&& !SxactIsCommitted(sxact)
 			&& sxact != OldCommittedSxact)
 		{
@@ -3476,12 +3618,35 @@ ReleasePredicateLocks(bool isCommit)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4741,6 +4906,80 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->xmin = MySerializableXact->xmin;
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (othersxact != MySerializableXact
+					&& !SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -5003,3 +5242,41 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
 		CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
 	}
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->snapshotSafetyAfterThisCommit;
+}
+
+void
+BeginSnapshotSafetyReplay(void)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+}
+
+/*
+ * Used in recovery when replaying commit records.  On a hot standby, these
+ * values must be set atomically with ProcArray updates, mirroring the code
+ * in GetSerializableTransactionSnapshotInt.  This is done by wrapping both
+ * in BeginSnapshotSafetyReplay/CompleteSnapshotSafetyReplay.
+ */
+void
+SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PredXact->NewestSnapshotToken = token;
+	PredXact->NewestSnapshotSafety = safety;
+}
+
+void
+CompleteSnapshotSafetyReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f9aaa52faf..33e7ace17bb 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -398,6 +398,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 083e879d5c3..da632445c68 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -143,7 +143,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -164,6 +164,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 8)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -259,6 +260,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	SnapshotToken token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -271,6 +278,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -318,6 +326,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	SnapshotToken snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
@@ -416,6 +427,10 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nrels, RelFileNode *rels,
 				   int xactflags, TransactionId twophase_xid,
 				   const char *twophase_gid);
+
+extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token,
+											  SnapshotSafety safety);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 0a48d1cfb40..c747a126865 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -50,6 +50,18 @@ typedef uint32 TimeLineID;
  */
 typedef uint16 RepOriginId;
 
+/*
+ * Snapshot safety information using to control SERIALIAZABLE on standby
+ * servers appears in checkpoints, so we define the types used here.
+ */
+typedef uint64 SnapshotToken;
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
 /*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 6a3464daa1e..a84f80d88fd 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -30,7 +30,6 @@ extern int	max_predicate_locks_per_page;
 /* Number of SLRU buffers to use for predicate locking */
 #define NUM_OLDSERXID_BUFFERS	16
 
-
 /*
  * function prototypes
  */
@@ -74,4 +73,11 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
 extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 							   void *recdata, uint32 len);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void BeginSnapshotSafetyReplay(void);
+extern void SetNewestSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void CompleteSnapshotSafetyReplay(void);
+
 #endif							/* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 0f736d37dff..db69fba2925 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -97,6 +97,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety snapshotSafetyAfterThisCommit;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;	/* invalid means still running; else the
@@ -123,6 +129,7 @@ typedef struct SERIALIZABLEXACT
 #define SXACT_FLAG_RO_UNSAFE			0x00000100
 #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000200
 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+#define SXACT_FLAG_HYPOTHETICAL			0x00000800
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -172,6 +179,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;	/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	SnapshotToken NewestSnapshotToken;
+	SnapshotSafety NewestSnapshotSafety;
+
 	PredXactListElement element;
 }			PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index cb613c8076e..1497cdf66c4 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -152,6 +152,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	SnapshotToken waitSnapshotToken;
+	SnapshotSafety snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
-- 
2.17.0

#9Kevin Grittner
kgrittn@gmail.com
In reply to: Thomas Munro (#7)
Re: [HACKERS] SERIALIZABLE on standby servers

On Fri, Sep 21, 2018 at 7:29 AM Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I'll add it to the next Commitfest so I know when to rebase it.

I signed up as reviewer in that CF.

--
Kevin Grittner
VMware vCenter Server
https://www.vmware.com/

#10Dmitry Dolgov
9erthalion6@gmail.com
In reply to: Kevin Grittner (#9)
Re: [HACKERS] SERIALIZABLE on standby servers

On Fri, Sep 21, 2018 at 2:29 PM Thomas Munro <thomas.munro@enterprisedb.com> wrote:

Hi Kevin, all,

/me pokes ancient thread

This amazing feeling of being like Indiana Jones, thinking whether it's worth
it to touch another ancient artifact.

On Tue, Sep 25, 2018 at 4:51 PM Kevin Grittner <kgrittn@gmail.com> wrote:

On Fri, Sep 21, 2018 at 7:29 AM Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I'll add it to the next Commitfest so I know when to rebase it.

I signed up as reviewer in that CF.

Great! Can you provide a review already?

#11Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Dmitry Dolgov (#10)
Re: [HACKERS] SERIALIZABLE on standby servers

On Fri, Nov 30, 2018 at 3:01 AM Dmitry Dolgov <9erthalion6@gmail.com> wrote:

On Tue, Sep 25, 2018 at 4:51 PM Kevin Grittner <kgrittn@gmail.com> wrote:

On Fri, Sep 21, 2018 at 7:29 AM Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I'll add it to the next Commitfest so I know when to rebase it.

I signed up as reviewer in that CF.

Great! Can you provide a review already?

Just to be clear, although this patch is registered in the commitfest
and currently applies and tests pass, it is prototype/WIP code with
significant problems that remain to be resolved. I sort of wish there
were a way to indicate that in the CF but since there isn't, I'm
saying that here. What I hope to get from Kevin, Simon or other
reviewers is some feedback on the general approach and problems
discussed upthread (and other problems and better ideas I might have
missed). So it's not seriously proposed for commit in this CF.

(Unlike the nearby "SERIALIZABLE with parallel query" thread, which is
about ready to land by all accounts, pending a round of perf testing
if a decent test can be selected.)

--
Thomas Munro
http://www.enterprisedb.com

#12Michael Paquier
michael@paquier.xyz
In reply to: Thomas Munro (#11)
Re: [HACKERS] SERIALIZABLE on standby servers

On Fri, Dec 28, 2018 at 02:21:44PM +1300, Thomas Munro wrote:

Just to be clear, although this patch is registered in the commitfest
and currently applies and tests pass, it is prototype/WIP code with
significant problems that remain to be resolved. I sort of wish there
were a way to indicate that in the CF but since there isn't, I'm
saying that here. What I hope to get from Kevin, Simon or other
reviewers is some feedback on the general approach and problems
discussed upthread (and other problems and better ideas I might have
missed). So it's not seriously proposed for commit in this CF.

No feedback has actually come, so I have moved it to next CF.
--
Michael

#13Thomas Munro
thomas.munro@gmail.com
In reply to: Michael Paquier (#12)
3 attachment(s)
Re: [HACKERS] SERIALIZABLE on standby servers

On Mon, Jun 15, 2020 at 5:00 PM Michael Paquier <michael@paquier.xyz> wrote:

On Fri, Dec 28, 2018 at 02:21:44PM +1300, Thomas Munro wrote:

Just to be clear, although this patch is registered in the commitfest
and currently applies and tests pass, it is prototype/WIP code with
significant problems that remain to be resolved. I sort of wish there
were a way to indicate that in the CF but since there isn't, I'm
saying that here. What I hope to get from Kevin, Simon or other
reviewers is some feedback on the general approach and problems
discussed upthread (and other problems and better ideas I might have
missed). So it's not seriously proposed for commit in this CF.

No feedback has actually come, so I have moved it to next CF.

Having been nerd-sniped by SSI again, I spent some time this weekend
rebasing this old patch, making a few improvements, and reformulating
the problems to be solved as I see them. It's very roughly based on
Kevin Grittner and Dan Ports' description of how you could give
SERIALIZABLE a useful meaning on hot standbys. The short version of
the theory is that you can make it work like SERIALIZABLE READ ONLY
DEFERRABLE by adding a bit of extra information into the WAL stream.

Problems:

1. As a prerequisite, we'd need to teach primary servers to make
transactions visible in the same order that they log commits.
Otherwise, we permit nonsense like seeing TX1 but not TX2 on the
primary, and TX2 but not TX1 on the replica. You can probably argue
that our read replicas don't satisfy the lower isolation levels, let
alone serializable.

2. Similarly, it's probably not OK that
PreCommit_CheckForSerializationFailure() determines
MySerializableXact->snapshotSafetyAfterThisCommit. That may not
happen in exactly the same order as commits are logged. Or maybe
there is some argument for why that is OK, based on what we're doing
with prepareSeqNo, or maybe we can do something with that to detect
disorder.

3. The patch doesn't yet attempt to checkpoint the snapshot safety
state. That's needed to start up in a sane state, without having to
wait for WAL activity.

4. XactLogSnapshotSafetyRecord() flushes the WAL an extra time after
a commit is flushed, which I put in for testing; that's silly...
somehow it needs to be better integrated so we don't generate two sync
I/Os in a row.

5. You probably want a way to turn off the extra WAL records and
SERIALIZABLEXACT consumption if you're using SERIALIZABLE on a primary
but not on the standby. Or maybe there is some way to make it come on
automatically.

I think I have cleared up the matter of xmin tracking for
"hypothetical" SERIALIZABLEXACTs mentioned earlier. It's not needed,
so should be set to InvalidTransactionId, and I added a comment to
explain why.

I also wrote a TAP test to exercise this thing. It is the same
schedule as src/test/isolation/specs/read-only-anomaly-3.spec, except
that transaction 3 runs on a streaming replica.

One thing to point out is that this patch only aims to make it so that
streaming replicas can't observe a state that would have caused a
transaction to abort if it had been observed on the primary. The TAP
test still has to insert its own wait-for-LSN loop to make sure step
"s1c" is replayed before "s3r" runs. We could use
synchronous_commit=remote_apply, and that'd probably work just as well
for this particular test, but I'm not sure how to square that with
fixing problem #1 above.

The perl hackery I used to do overlapping transactions in a TAP test
is pretty crufty. I guess we'd ideally have the isolation tester
support per-session connection strings, and somehow get some perl code
to orchestrate the cluster setup but then run the real isolation
tester. Or something like that.

Attachments:

v5-0001-SERIALIZABLE-READ-ONLY-DEFERRABLE-on-streaming-re.patchtext/x-patch; charset=US-ASCII; name=v5-0001-SERIALIZABLE-READ-ONLY-DEFERRABLE-on-streaming-re.patchDownload
From 30b25251964b494e75a8ad2d48cd49583f32ebc5 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 14 Jun 2020 21:46:02 +1200
Subject: [PATCH v5 1/3] SERIALIZABLE READ ONLY DEFERRABLE on streaming
 replicas.

Allow streaming replicas to wait for "safe" snapshots.  Do this by
injecting extra information into the WAL, so that replica servers can
determine when a safe snapshot can be taken.

WORK IN PROGRESS -- see thread for list of problems

Discussion: https://postgr.es/m/CAEepm%3D2b9TV%2BvJ4UeSBixDrW7VUiTjxPwWq8K3QwFSWx0pTXHQ%40mail.gmail.com
Discussion: https://postgr.es/m/4D3735E30200002500039869@gw.wicourts.gov
---
 doc/src/sgml/high-availability.sgml       |   9 -
 doc/src/sgml/mvcc.sgml                    |  21 --
 doc/src/sgml/ref/set_transaction.sgml     |  12 +
 src/backend/access/rmgrdesc/xactdesc.c    |  56 ++++
 src/backend/access/transam/xact.c         |  69 ++++-
 src/backend/access/transam/xlog.c         |   7 +
 src/backend/commands/variable.c           |   8 -
 src/backend/storage/lmgr/predicate.c      | 337 ++++++++++++++++++++--
 src/backend/storage/lmgr/proc.c           |   5 +
 src/include/access/xact.h                 |  15 +-
 src/include/access/xlogdefs.h             |  12 +
 src/include/storage/predicate.h           |   7 +-
 src/include/storage/predicate_internals.h |  12 +
 src/include/storage/proc.h                |   5 +
 14 files changed, 504 insertions(+), 71 deletions(-)

diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 65c3fc62a9..ebd50d940d 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2463,15 +2463,6 @@ LOG:  database system is ready to accept read only connections
      your setting of <varname>max_prepared_transactions</varname> is 0.
     </para>
    </listitem>
-   <listitem>
-    <para>
-     The Serializable transaction isolation level is not yet available in hot
-     standby.  (See <xref linkend="xact-serializable"/> and
-     <xref linkend="serializable-consistency"/> for details.)
-     An attempt to set a transaction to the serializable isolation level in
-     hot standby mode will generate an error.
-    </para>
-   </listitem>
   </itemizedlist>
 
    </para>
diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml
index dda6f1f2ad..744d000498 100644
--- a/doc/src/sgml/mvcc.sgml
+++ b/doc/src/sgml/mvcc.sgml
@@ -1636,15 +1636,6 @@ SELECT pg_advisory_lock(q.id) FROM
     <para>
      See <xref linkend="xact-serializable"/> for performance suggestions.
     </para>
-
-    <warning>
-     <para>
-      This level of integrity protection using Serializable transactions
-      does not yet extend to hot standby mode (<xref linkend="hot-standby"/>).
-      Because of that, those using hot standby may want to use Repeatable
-      Read and explicit locking on the master.
-     </para>
-    </warning>
    </sect2>
 
    <sect2 id="non-serializable-consistency">
@@ -1738,18 +1729,6 @@ SELECT pg_advisory_lock(q.id) FROM
     table and other tables in the database.
    </para>
 
-   <para>
-    Support for the Serializable transaction isolation level has not yet
-    been added to Hot Standby replication targets (described in
-    <xref linkend="hot-standby"/>).  The strictest isolation level currently
-    supported in hot standby mode is Repeatable Read.  While performing all
-    permanent database writes within Serializable transactions on the
-    master will ensure that all standbys will eventually reach a consistent
-    state, a Repeatable Read transaction run on the standby can sometimes
-    see a transient state that is inconsistent with any serial execution
-    of the transactions on the master.
-   </para>
-
    <para>
     Internal access to the system catalogs is not done using the isolation
     level of the current transaction.  This means that newly created database
diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml
index ec436b2d16..6eaadd47ac 100644
--- a/doc/src/sgml/ref/set_transaction.sgml
+++ b/doc/src/sgml/ref/set_transaction.sgml
@@ -155,6 +155,18 @@ SET SESSION CHARACTERISTICS AS TRANSACTION <replaceable class="parameter">transa
    is well suited for long-running reports or backups.
   </para>
 
+  <para>
+    Serializable transactions cannot use the <literal>NOT DEFERRABLE</literal>
+    option when run on Hot Standby servers.  <literal>READ ONLY</literal>
+    and <literal>DEFERRABLE</literal> are the default settings for Hot Standby
+    servers, but may also be specified explicitly.  The first statement run in
+    a Serializable transaction on a Hot Standby server may need to wait until a
+    point in the transaction stream where it is not possible to see a transient
+    state that is inconsistent with any serial execution of the transactions on
+    the primary server.  This may introduce a pause, but avoids the need to
+    coordinate predicate locking across multiple servers.
+  </para>
+
   <para>
    The <literal>SET TRANSACTION SNAPSHOT</literal> command allows a new
    transaction to run with the same <firstterm>snapshot</firstterm> as an existing
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 9fce75565f..440241fe00 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -123,6 +123,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xl_snapshot_safety =
+			(xl_xact_snapshot_safety *) data;
+
+		parsed->snapshot_token = xl_snapshot_safety->token;
+		parsed->snapshot_safety = xl_snapshot_safety->safety;
+
+		data += sizeof(xl_xact_snapshot_safety);
+	}
 }
 
 void
@@ -279,6 +290,27 @@ xact_desc_subxacts(StringInfo buf, int nsubxacts, TransactionId *subxacts)
 	}
 }
 
+static const char *
+xact_snapshot_safety_to_string(SnapshotSafety snapshot_safety)
+{
+	const char *string = "<unknown>";
+
+	switch(snapshot_safety)
+	{
+	case SNAPSHOT_SAFE:
+		string = "SNAPSHOT_SAFE";
+		break;
+	case SNAPSHOT_UNSAFE:
+		string = "SNAPSHOT_UNSAFE";
+		break;
+	case SNAPSHOT_SAFETY_UNKNOWN:
+		string = "SNAPSHOT_SAFETY_UNKNOWN";
+		break;
+	}
+
+	return string;
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -310,6 +342,13 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 						 (uint32) parsed.origin_lsn,
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	if (parsed.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+	{
+		appendStringInfo(buf, "; snapshot safety: %s, token: %lx",
+						 xact_snapshot_safety_to_string(parsed.snapshot_safety),
+						 parsed.snapshot_token);
+	}
 }
 
 static void
@@ -359,6 +398,14 @@ xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
 		appendStringInfo(buf, " %u", xlrec->xsub[i]);
 }
 
+static void
+xact_desc_snapshot_safety(StringInfo buf, xl_xact_snapshot_safety *xlrec)
+{
+	appendStringInfo(buf, "snapshot safety: %s, token: %lx",
+					 xact_snapshot_safety_to_string(xlrec->safety),
+					 xlrec->token);
+}
+
 void
 xact_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -396,6 +443,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec = (xl_xact_snapshot_safety *) rec;
+
+		xact_desc_snapshot_safety(buf, xlrec);
+	}
 }
 
 const char *
@@ -423,6 +476,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_SNAPSHOT_SAFETY:
+			id = "SNAPSHOT_SAFETY";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cd30b62d36..0a1ee18fb8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1946,13 +1946,14 @@ StartTransaction(void)
 	{
 		s->startedInRecovery = true;
 		XactReadOnly = true;
+		XactDeferrable = true;
 	}
 	else
 	{
 		s->startedInRecovery = false;
 		XactReadOnly = DefaultXactReadOnly;
+		XactDeferrable = DefaultXactDeferrable;
 	}
-	XactDeferrable = DefaultXactDeferrable;
 	XactIsoLevel = DefaultXactIsoLevel;
 	forceSyncCommit = false;
 	MyXactFlags = 0;
@@ -5480,6 +5481,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_snapshot_safety xl_snapshot_safety;
 	uint8		info;
 
 	Assert(CritSectionCount > 0);
@@ -5558,6 +5560,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	if (IsolationIsSerializable())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_SNAPSHOT_SAFETY;
+		GetSnapshotSafetyAfterThisCommit(&xl_snapshot_safety.token,
+										 &xl_snapshot_safety.safety);
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5606,6 +5615,10 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+		XLogRegisterData((char *) (&xl_snapshot_safety),
+						 sizeof(xl_xact_snapshot_safety));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
@@ -5792,6 +5805,10 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		 */
 		RecordKnownAssignedTransactionIds(max_xid);
 
+		if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+			BeginSnapshotSafetyReplay(parsed->snapshot_token,
+									  parsed->snapshot_safety);
+
 		/*
 		 * Mark the transaction committed in pg_xact. We use async commit
 		 * protocol during recovery to provide information on database
@@ -5803,6 +5820,9 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		 */
 		TransactionIdAsyncCommitTree(xid, parsed->nsubxacts, parsed->subxacts, lsn);
 
+		if (parsed->xinfo & XACT_XINFO_HAS_SNAPSHOT_SAFETY)
+			CompleteSnapshotSafetyReplay();
+
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
@@ -5944,6 +5964,46 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
 	DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 }
 
+XLogRecPtr
+XactLogSnapshotSafetyRecord(SnapshotToken token, SnapshotSafety safety)
+{
+	XLogRecPtr result;
+	xl_xact_snapshot_safety snapshot_safety;
+
+	snapshot_safety.token = token;
+	snapshot_safety.safety = safety;
+
+	START_CRIT_SECTION();
+	XLogBeginInsert();
+	XLogRegisterData((char *) &snapshot_safety, sizeof(snapshot_safety));
+	result = XLogInsert(RM_XACT_ID, XLOG_XACT_SNAPSHOT_SAFETY);
+	END_CRIT_SECTION();
+
+	/*
+	 * TODO: How can we avoid having to flush again (after already flushing
+	 * for the commit)?  If we don't have this flush here, then they standby
+	 * has to wait a while to find out whether its snapshot is safe.
+	 */
+	XLogFlush(result);
+
+	return result;
+}
+
+static void
+xact_redo_snapshot_safety(xl_xact_snapshot_safety *snapshot_safety)
+{
+	/*
+	 * Any earlier COMMIT record must have carried a snapshot safety message
+	 * the same token as this record, and had safety ==
+	 * SNAPSHOT_SAFETY_UNKNOWN.  This new independent snapshot safety message
+	 * reports that the safety is now known.  We will wake any backend that is
+	 * waiting to learn if the snapshot is safe.
+	 */
+	if (standbyState >= STANDBY_INITIALIZED)
+		NotifyHypotheticalSnapshotSafety(snapshot_safety->token,
+										 snapshot_safety->safety);
+}
+
 void
 xact_redo(XLogReaderState *record)
 {
@@ -6017,6 +6077,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_SNAPSHOT_SAFETY)
+	{
+		xl_xact_snapshot_safety *xlrec =
+			(xl_xact_snapshot_safety *) XLogRecGetData(record);
+
+		xact_redo_snapshot_safety(xlrec);
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..7ca629945b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5252,6 +5252,8 @@ BootStrapXLOG(void)
 	checkPoint.newestCommitTsXid = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
+//	checkPoint.newestSnapshotToken = 0;
+//	checkPoint.newestSnapshotSafety = SNAPSHOT_SAFE;
 
 	ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -5261,6 +5263,8 @@ BootStrapXLOG(void)
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/* Set up the XLOG page header */
 	page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -6771,6 +6775,9 @@ StartupXLOG(void)
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
 	XLogCtl->ckptFullXid = checkPoint.nextFullXid;
+// TODO: figure out correct checkpointing protocol for safe snapshots
+//	SetNewestSnapshotSafety(checkPoint.newestSnapshotToken,
+//							checkPoint.newestSnapshotSafety);
 
 	/*
 	 * Initialize replication slots, before there's a chance to remove
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index 484f7ea2c0..7e5dabbab2 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -541,14 +541,6 @@ check_XactIsoLevel(int *newval, void **extra, GucSource source)
 			GUC_check_errmsg("SET TRANSACTION ISOLATION LEVEL must not be called in a subtransaction");
 			return false;
 		}
-		/* Can't go to serializable mode while recovery is still active */
-		if (newXactIsoLevel == XACT_SERIALIZABLE && RecoveryInProgress())
-		{
-			GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
-			GUC_check_errmsg("cannot use serializable mode in a hot standby");
-			GUC_check_errhint("You can use REPEATABLE READ instead.");
-			return false;
-		}
 	}
 
 	return true;
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index d24919f76b..5e7b8fc43b 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -286,6 +286,7 @@
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
 #define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
+#define SxactIsHypothetical(sxact) (((sxact)->flags & SXACT_FLAG_HYPOTHETICAL) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -449,7 +450,8 @@ static void SummarizeOldestCommittedSxact(void);
 static Snapshot GetSafeSnapshot(Snapshot snapshot);
 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 													  VirtualTransactionId *sourcevxid,
-													  int sourcepid);
+													  int sourcepid,
+													  SnapshotSafety *snapshot_safety);
 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
 									  PREDICATELOCKTARGETTAG *parent);
@@ -1206,6 +1208,9 @@ InitPredicateLocks(void)
 		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
 		PredXact->OldCommittedSxact->pid = 0;
+		SHMQueueInit(&PredXact->snapshotSafetyWaitList);
+		PredXact->NewestSnapshotToken = 0;
+		PredXact->NewestSnapshotSafety = SNAPSHOT_SAFE;
 	}
 	/* This never changes, so let's keep a local copy. */
 	OldCommittedSxact = PredXact->OldCommittedSxact;
@@ -1488,6 +1493,7 @@ static Snapshot
 GetSafeSnapshot(Snapshot origSnapshot)
 {
 	Snapshot	snapshot;
+	SnapshotSafety snapshot_safety;
 
 	Assert(XactReadOnly && XactDeferrable);
 
@@ -1500,10 +1506,33 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * one passed to it, but we avoid assuming that here.
 		 */
 		snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
-														 NULL, InvalidPid);
-
-		if (MySerializableXact == InvalidSerializableXact)
-			return snapshot;	/* no concurrent r/w xacts; it's safe */
+														 NULL, InvalidPid,
+														 &snapshot_safety);
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Check if the most recently replayed COMMIT record was either
+			 * known to be safe because it had no concurrent r/w xacts on the
+			 * primary, or has subsequently been declared safe by a snapshot
+			 * safety record.
+			 */
+			if (snapshot_safety == SNAPSHOT_SAFE)
+				return snapshot;
+			else if (snapshot_safety == SNAPSHOT_UNSAFE)
+			{
+				/*
+				 * TODO:  This can only happen if the master ran out of memory
+				 * while trying to create a hypothetical transaction, right?
+				 * Should we wait or error out?
+				 */
+				continue;
+			}
+		}
+		else
+		{
+			if (MySerializableXact == InvalidSerializableXact)
+				return snapshot;	/* no concurrent r/w xacts; it's safe */
+	}
 
 		LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -1511,20 +1540,48 @@ GetSafeSnapshot(Snapshot origSnapshot)
 		 * Wait for concurrent transactions to finish. Stop early if one of
 		 * them marked us as conflicted.
 		 */
-		MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
-		while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
-				 SxactIsROUnsafe(MySerializableXact)))
+		if (RecoveryInProgress())
 		{
-			LWLockRelease(SerializableXactHashLock);
-			ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
-			LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			/*
+			 * Running on a standby.  Wait for a the primary to tell us the
+			 * result of testing a hypothetical transaction whose
+			 * serializability matches the snapshot we have.
+			 */
+			Assert(snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN);
+			Assert(!SHMQueueIsDetached(&MyProc->safetyLinks));
+			while (MyProc->snapshotSafety == SNAPSHOT_SAFETY_UNKNOWN)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			if (MyProc->snapshotSafety == SNAPSHOT_SAFE)
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
-		MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
-
-		if (!SxactIsROUnsafe(MySerializableXact))
+		else
 		{
-			LWLockRelease(SerializableXactHashLock);
-			break;				/* success */
+			/*
+			 * Running on primary.  Wait for a signal from one of the backends
+			 * that we possibly conflict with.
+			 */
+			MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+			while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
+					 SxactIsROUnsafe(MySerializableXact)))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				ProcWaitForSignal(WAIT_EVENT_SAFE_SNAPSHOT);
+				LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+			}
+			MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
+			if (!SxactIsROUnsafe(MySerializableXact))
+			{
+				LWLockRelease(SerializableXactHashLock);
+				break;				/* success */
+			}
 		}
 
 		LWLockRelease(SerializableXactHashLock);
@@ -1539,12 +1596,64 @@ GetSafeSnapshot(Snapshot origSnapshot)
 	/*
 	 * Now we have a safe snapshot, so we don't need to do any further checks.
 	 */
-	Assert(SxactIsROSafe(MySerializableXact));
+	Assert(RecoveryInProgress() || SxactIsROSafe(MySerializableXact));
 	ReleasePredicateLocks(false, true);
 
 	return snapshot;
 }
 
+	/*
+	 * When the primary server has determined the safety of a hypothetical
+	 * snapshot which was previously reported as SNAPSHOT_SAFETY_UNKNOWN in a
+	 * COMMIT record, it emits a WAL record that causes the recovery process on
+	 * standbys to call this function.  Here, we will wake up any backend that is
+	 * currently waiting in GetSafeSnapshot to learn about the safety of a
+	 * snapshot taken after that point in the transaction stream.
+	 */
+void
+NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety)
+{
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	Assert(AmStartupProcess());
+	elog(LOG, "NotifyHypotheticalSnapshotSafety token = %ld, safety = %d", token, safety);
+
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+	/*
+	 * Walk the list of processes that are waiting in GetSafeSnapshot on a
+	 * standby, and find any that are waiting to learn the safety of a
+	 * snapshot taken at a point in time when this token appeared on the most
+	 * recently replayed SSI transaction.  If we find any of those, tell them
+	 * the final status for and wake them up.
+	 */
+	proc = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+								   &PredXact->snapshotSafetyWaitList,
+								   offsetof(PGPROC, safetyLinks));
+	while (proc != NULL)
+	{
+		next = (PGPROC *) SHMQueueNext(&PredXact->snapshotSafetyWaitList,
+									   &proc->safetyLinks,
+									   offsetof(PGPROC, safetyLinks));
+		if (proc->waitSnapshotToken == token)
+		{
+			SHMQueueDelete(&proc->safetyLinks);
+			proc->snapshotSafety = safety;
+			ProcSendSignal(proc->pid);
+		}
+		proc = next;
+	}
+
+	/*
+	 * If this happens to be the most recently replayed snapshot token then
+	 * remember this safety value.
+	 */
+	if (PredXact->NewestSnapshotToken == token)
+		PredXact->NewestSnapshotSafety = safety;
+	LWLockRelease(SerializableXactHashLock);
+}
+
 /*
  * GetSafeSnapshotBlockingPids
  *		If the specified process is currently blocked in GetSafeSnapshot,
@@ -1613,16 +1722,17 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 
 	/*
 	 * Can't use serializable mode while recovery is still active, as it is,
-	 * for example, on a hot standby.  We could get here despite the check in
-	 * check_XactIsoLevel() if default_transaction_isolation is set to
-	 * serializable, so phrase the hint accordingly.
+	 * for example, on a hot standby, unless DEFERRABLE mode is active.  In
+	 * that case, DEFERRABLE is the default, so this error should should only
+	 * be reachable if the user has explicitly asked for NOT DEFERRABLE via
+	 * SET transaction_deferrable or SET/BEGIN TRANSACTION ISOLATION LEVEL.
 	 */
-	if (RecoveryInProgress())
+	if (RecoveryInProgress() && (!XactReadOnly || !XactDeferrable))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot use serializable mode in a hot standby"),
-				 errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
-				 errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
+				 errmsg("cannot use serializable not deferrable mode in a hot standby"),
+				 errdetail("Serializable transactions must be DEFERRABLE when run on hot standby servers."),
+				 errhint("You can use \"SET transaction_deferrable = true\", use DEFERRABLE when specifying the transaction isolation level, or avoid explicitly specifying NOT DEFERRABLE.")));
 
 	/*
 	 * A special optimization is available for SERIALIZABLE READ ONLY
@@ -1633,7 +1743,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
 		return GetSafeSnapshot(snapshot);
 
 	return GetSerializableTransactionSnapshotInt(snapshot,
-												 NULL, InvalidPid);
+												 NULL, InvalidPid, NULL);
 }
 
 /*
@@ -1676,7 +1786,7 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 				 errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
 
 	(void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
-												 sourcepid);
+												 sourcepid, NULL);
 }
 
 /*
@@ -1687,11 +1797,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
  * source xact is still running after we acquire SerializableXactHashLock.
  * We do that by calling ProcArrayInstallImportedXmin.
+ *
+ * If snapshot_safety is a non-NULL, then the safety of this snapshot if used
+ * on a standby server is written to it.  If it is SNAPSHOT_SAFE, then the
+ * snapshot may be safely used.  If it is SNAPSHOT_SAFETY_UNKNOWN, then the
+ * caller must wait for the safety to be announced in the WAL.
  */
 static Snapshot
 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 									  VirtualTransactionId *sourcevxid,
-									  int sourcepid)
+									  int sourcepid,
+									  SnapshotSafety *snapshot_safety)
 {
 	PGPROC	   *proc;
 	VirtualTransactionId vxid;
@@ -1701,8 +1817,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 	/* We only do this for serializable transactions.  Once. */
 	Assert(MySerializableXact == InvalidSerializableXact);
 
-	Assert(!RecoveryInProgress());
-
 	/*
 	 * Since all parts of a serializable transaction must use the same
 	 * snapshot, it is too late to establish one after a parallel operation
@@ -1743,6 +1857,30 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 		}
 	} while (!sxact);
 
+	/*
+	 * Note the snapshot safety information for standbys.  This can be used to
+	 * know if the returned snapshot is already known to be safe/unsafe, or if
+	 * we must wait for notification of the final safety determination.
+	 */
+	if (snapshot_safety != NULL)
+	{
+		*snapshot_safety = PredXact->NewestSnapshotSafety;
+		if (*snapshot_safety == SNAPSHOT_SAFETY_UNKNOWN)
+		{
+			/*
+			 * We must put the this process into the waitlist while we hold
+			 * the lock or there would be a race condition where we might miss
+			 * a notification.  The caller must wait for
+			 * MyProc->snapshotSafety to be set to a final value.
+			 */
+			MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+			MyProc->waitSnapshotToken = PredXact->NewestSnapshotToken;
+			if (SHMQueueIsDetached(&MyProc->safetyLinks))
+				SHMQueueInsertBefore(&PredXact->snapshotSafetyWaitList,
+									 &MyProc->safetyLinks);
+		}
+	}
+
 	/* Get the snapshot, or check that it's safe to use */
 	if (!sourcevxid)
 		snapshot = GetSnapshotData(snapshot);
@@ -3214,6 +3352,7 @@ SetNewSxactGlobalXmin(void)
 	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
 	{
 		if (!SxactIsRolledBack(sxact)
+			&& !SxactIsHypothetical(sxact)
 			&& !SxactIsCommitted(sxact)
 			&& sxact != OldCommittedSxact)
 		{
@@ -3596,12 +3735,35 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
 
 			/*
 			 * Wake up the process for a waiting DEFERRABLE transaction if we
-			 * now know it's either safe or conflicted.
+			 * now know it's either safe or conflicted.  This releases
+			 * SERIALIZABLE READ ONLY DEFERRABLE transactions on the primary.
 			 */
 			if (SxactIsDeferrableWaiting(roXact) &&
 				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
 				ProcSendSignal(roXact->pid);
 
+			/*
+			 * If a hypothetical transaction is now known to be safe or
+			 * unsafe, we can report that in the WAL for the benefit of
+			 * standbys and recycle it.  This releases SERIALIZABLE READ ONLY
+			 * DEFERRABLE transactions that are waiting for the status of this
+			 * particular hypothetical tranasactions on any standby that
+			 * replays it.
+			 */
+			if (SxactIsHypothetical(roXact) &&
+				(SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
+			{
+				SnapshotSafety safety;
+
+				if (SxactIsROSafe(roXact))
+					safety = SNAPSHOT_SAFE;
+				else
+					safety = SNAPSHOT_UNSAFE;
+				XactLogSnapshotSafetyRecord(roXact->SeqNo.lastCommitBeforeSnapshot,
+											safety);
+				ReleasePredXact(roXact);
+			}
+
 			possibleUnsafeConflict = nextConflict;
 		}
 	}
@@ -4838,6 +5000,88 @@ PreCommit_CheckForSerializationFailure(void)
 	MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
 	MySerializableXact->flags |= SXACT_FLAG_PREPARED;
 
+	/*
+	 * For the benefit of hot standby servers that want to take a safe
+	 * SERIALIZABLE READ ONLY DEFERRABLE snapshot, we will check whether a
+	 * hypothetical read-only serializable transaction that starts after this
+	 * transaction commits would be safe.
+	 */
+	if (PredXact->WritableSxactCount == (XactReadOnly ? 0 : 1))
+	{
+		/*
+		 * There are no concurrent writable SERIALIZABLE transactions.  A
+		 * read-only snapshot taken immediately after this one commits is
+		 * safe.
+		 */
+		MySerializableXact->snapshotSafetyAfterThisCommit = SNAPSHOT_SAFE;
+	}
+	else
+	{
+		SERIALIZABLEXACT *sxact;
+		SERIALIZABLEXACT *othersxact;
+
+		/*
+		 * We can't yet determine whether a read-only transaction beginning
+		 * now would be safe.  Create a hypothetical SERIALIZABLEXACT and let
+		 * ReleasePredicateLocks report on its safety once that can be
+		 * determined.
+		 */
+		sxact = CreatePredXact();
+		if (sxact == NULL)
+		{
+			/* Out of space.  Don't allow SERIALIZABLE on standbys. */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_UNSAFE;
+		}
+		else
+		{
+			SetInvalidVirtualTransactionId(sxact->vxid);
+			sxact->SeqNo.lastCommitBeforeSnapshot =
+				MySerializableXact->prepareSeqNo;
+			sxact->prepareSeqNo = InvalidSerCommitSeqNo;
+			sxact->commitSeqNo = InvalidSerCommitSeqNo;
+			SHMQueueInit(&(sxact->outConflicts));
+			SHMQueueInit(&(sxact->inConflicts));
+			SHMQueueInit(&(sxact->possibleUnsafeConflicts));
+			sxact->topXid = InvalidTransactionId;
+			sxact->finishedBefore = InvalidTransactionId;
+			sxact->pid = InvalidPid;
+			SHMQueueInit(&(sxact->predicateLocks));
+			SHMQueueElemInit(&(sxact->finishedLink));
+			sxact->flags = SXACT_FLAG_READ_ONLY | SXACT_FLAG_HYPOTHETICAL;
+
+			/*
+			 * Hypothetical sxacts are not garbage-collected by
+			 * ClearOldPredicateLocks() and are not tracked by
+			 * SetNewSxactGlobalXmin().  Instead they are freed by
+			 * ReleasePredicateLocks() as soon as they are determined to be
+			 * safe or unsafe.  Therefore, they don't need an xmin.
+			 */
+			sxact->xmin = InvalidTransactionId;
+
+			/* Register concurrent r/w transactions as possible conflicts. */
+			for (othersxact = FirstPredXact();
+				 othersxact != NULL;
+				 othersxact = NextPredXact(othersxact))
+			{
+				if (othersxact != MySerializableXact
+					&& !SxactIsCommitted(othersxact)
+					&& !SxactIsDoomed(othersxact)
+					&& !SxactIsReadOnly(othersxact))
+				{
+					SetPossibleUnsafeConflict(sxact, othersxact);
+				}
+			}
+
+			/*
+			 * The status will be reported in a later WAL record once it has
+			 * been determined.
+			 */
+			MySerializableXact->snapshotSafetyAfterThisCommit =
+				SNAPSHOT_SAFETY_UNKNOWN;
+		}
+	}
+
 	LWLockRelease(SerializableXactHashLock);
 }
 
@@ -5132,3 +5376,36 @@ AttachSerializableXact(SerializableXactHandle handle)
 	if (MySerializableXact != InvalidSerializableXact)
 		CreateLocalPredicateLockHash();
 }
+
+/*
+ * Accessor for the hypothetical snapshot safety information needed for commit
+ * records generated on primary servers.  This is used by XlactLogCommitRecord
+ * to receive the safety level computed by
+ * PreCommit_CheckForSerializationFailure in a committing SSI transaction.
+ */
+void
+GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety)
+{
+	*token = MySerializableXact->prepareSeqNo;
+	*safety = MySerializableXact->snapshotSafetyAfterThisCommit;
+}
+
+/*
+ * Used in recovery when replaying commit records.  On a hot standby, these
+ * values must be set atomically with ProcArray updates, mirroring the code in
+ * GetSerializableTransactionSnapshotInt.  CompleteSnapshotSafetyReplay() must
+ * be called after the transaction is marked committed.
+ */
+void
+BeginSnapshotSafetyReplay(SnapshotToken token, SnapshotSafety safety)
+{
+	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+	PredXact->NewestSnapshotToken = token;
+	PredXact->NewestSnapshotSafety = safety;
+}
+
+void
+CompleteSnapshotSafetyReplay(void)
+{
+	LWLockRelease(SerializableXactHashLock);
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index f5eef6fa4e..78263eb986 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -421,6 +421,11 @@ InitProcess(void)
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	SHMQueueElemInit(&(MyProc->syncRepLinks));
 
+	/* Initialize fields for SERIALIZABLE on standbys */
+	MyProc->waitSnapshotToken = 0;
+	MyProc->snapshotSafety = SNAPSHOT_SAFETY_UNKNOWN;
+	SHMQueueElemInit(&(MyProc->safetyLinks));
+
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
 	MyProc->procArrayGroupMemberXid = InvalidTransactionId;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 88025b1cc2..52007675ba 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_SNAPSHOT_SAFETY   0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -167,6 +167,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
+#define XACT_XINFO_HAS_SNAPSHOT_SAFETY	(1U << 8)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -262,6 +263,12 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_snapshot_safety
+{
+	SnapshotToken token;
+	SnapshotSafety safety;
+} xl_xact_snapshot_safety;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -274,6 +281,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_snapshot_safety follows if XINFO_HAS_SNAPSHOT_SAFETY */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -339,6 +347,9 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	SnapshotToken snapshot_token;
+	SnapshotSafety snapshot_safety;
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
@@ -444,6 +455,8 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 									 int nrels, RelFileNode *rels,
 									 int xactflags, TransactionId twophase_xid,
 									 const char *twophase_gid);
+extern XLogRecPtr XactLogSnapshotSafetyRecord(SnapshotToken token,
+											  SnapshotSafety safety);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index e1f5812213..a425160f54 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -57,6 +57,18 @@ typedef uint32 TimeLineID;
  */
 typedef uint16 RepOriginId;
 
+/*
+ * Snapshot safety information using to control SERIALIAZABLE on standby
+ * servers appears in checkpoints, so we define the types used here.
+ */
+typedef uint64 SnapshotToken;
+typedef enum SnapshotSafety
+{
+	SNAPSHOT_SAFE,
+	SNAPSHOT_UNSAFE,
+	SNAPSHOT_SAFETY_UNKNOWN
+} SnapshotSafety;
+
 /*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 86e756d5fb..7015aab167 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -18,7 +18,6 @@
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
-
 /*
  * GUC variables
  */
@@ -84,4 +83,10 @@ extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
 extern SerializableXactHandle ShareSerializableXact(void);
 extern void AttachSerializableXact(SerializableXactHandle handle);
 
+/* hypothetical snapshot safety support, allowing SERIALIZABLE on standbys */
+extern void GetSnapshotSafetyAfterThisCommit(SnapshotToken *token, SnapshotSafety *safety);
+extern void NotifyHypotheticalSnapshotSafety(SnapshotToken token, SnapshotSafety safety);
+extern void BeginSnapshotSafetyReplay(SnapshotToken token, SnapshotSafety safety);
+extern void CompleteSnapshotSafetyReplay(void);
+
 #endif							/* PREDICATE_H */
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index cf9694d65e..055c8d0ea8 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -105,6 +105,12 @@ typedef struct SERIALIZABLEXACT
 	 */
 	SHM_QUEUE	possibleUnsafeConflicts;
 
+	/*
+	 * for committing transactions: would a hypothetical read-only snapshot
+	 * taken immediately after this transaction commits be safe?
+	 */
+	SnapshotSafety snapshotSafetyAfterThisCommit;
+
 	TransactionId topXid;		/* top level xid for the transaction, if one
 								 * exists; else invalid */
 	TransactionId finishedBefore;	/* invalid means still running; else the
@@ -137,6 +143,7 @@ typedef struct SERIALIZABLEXACT
  * reference to it.  It'll be recycled by the leader at end-of-transaction.
  */
 #define SXACT_FLAG_PARTIALLY_RELEASED	0x00000800
+#define SXACT_FLAG_HYPOTHETICAL			0x00001000
 
 /*
  * The following types are used to provide an ad hoc list for holding
@@ -186,6 +193,11 @@ typedef struct PredXactListData
 												 * seq no */
 	SERIALIZABLEXACT *OldCommittedSxact;	/* shared copy of dummy sxact */
 
+	/* Tracking of snapshot safety on standby servers. */
+	SHM_QUEUE	snapshotSafetyWaitList;
+	SnapshotToken NewestSnapshotToken;
+	SnapshotSafety NewestSnapshotSafety;
+
 	PredXactListElement element;
 }			PredXactListData;
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1ee9000b2b..eeb36e0657 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -154,6 +154,11 @@ struct PGPROC
 	int			syncRepState;	/* wait state for sync rep */
 	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
 
+	/* Info to allow standbys to wait for a safe SERIALIZABLE snapshot */
+	SnapshotToken waitSnapshotToken;
+	SnapshotSafety snapshotSafety;	/* space for result */
+	SHM_QUEUE	safetyLinks;	/* list link for GetSafeSnapshot */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
-- 
2.20.1

v5-0002-Add-perl-module-for-isolation-like-testing.patchtext/x-patch; charset=US-ASCII; name=v5-0002-Add-perl-module-for-isolation-like-testing.patchDownload
From 9a2d668d15526daa6f47101c2eafcf4dbb36aabb Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 14 Jun 2020 22:30:59 +1200
Subject: [PATCH v5 2/3] Add perl module for isolation-like testing.

To allow TAP tests to send interleaved statements to one or more
backends, like the isolation tester, define a PsqlSession class.
---
 src/test/perl/PsqlSession.pm | 139 +++++++++++++++++++++++++++++++++++
 1 file changed, 139 insertions(+)
 create mode 100644 src/test/perl/PsqlSession.pm

diff --git a/src/test/perl/PsqlSession.pm b/src/test/perl/PsqlSession.pm
new file mode 100644
index 0000000000..20ff923756
--- /dev/null
+++ b/src/test/perl/PsqlSession.pm
@@ -0,0 +1,139 @@
+=pod
+
+=head1 NAME
+
+PsqlSession - class representing psql connection
+
+=head1 SYNOPSIS
+
+  use PsqlSession;
+
+  my $node = PostgresNode->get_new_node('mynode');
+  my $session = PsqlSession->new($node, "dbname");
+
+  # send simple query and wait for one line response
+  my $result = $session->send("SELECT 42;", 1);
+
+  # close connection
+  $session->close();
+
+=head1 DESCRIPTION
+
+PsqlSession allows for tests of interleaved operations, similar to
+isolation tests.
+
+=cut
+
+package PsqlSession;
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use IPC::Run qw(pump finish timer);
+
+our @EXPORT = qw(
+  new
+  send
+  close
+);
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item PsqlSession::new($class, $node, $dbname)
+
+Create a new PsqlSession instance, connected to a database.
+
+=cut
+
+sub new
+{
+	my ($class, $node, $dbname) = @_;
+	my $timer = timer(5);
+	my $stdin = '';
+	my $stdout = '';
+	my $harness = $node->interactive_psql($dbname, \$stdin, \$stdout, $timer);
+	my $self = {
+		_harness => $harness,
+		_stdin => \$stdin,
+		_stdout => \$stdout,
+		_timer => $timer
+	};
+	bless $self, $class;
+	return $self;
+}
+
+=pod
+
+=item $session->send($input, $lines)
+
+Send the given input to psql, and then wait for the given number of lines
+of output, or a timeout.
+
+=cut
+
+sub count_lines
+{
+	my ($s) = @_;
+	return $s =~ tr/\n//;
+}
+
+sub send
+{
+	my ($self, $statement, $lines) = @_;
+	${$self->{_stdout}} = '';
+	${$self->{_stdin}} .= $statement;
+	$self->{_timer}->start(5);
+	pump $self->{_harness} until count_lines(${$self->{_stdout}}) == $lines || $self->{_timer}->is_expired;
+	die "expected ${lines} lines but after timeout, received only: ${$self->{_stdout}}" if $self->{_timer}->is_expired;
+	my @result = split /\n/, ${$self->{_stdout}};
+	chop(@result);
+	return @result;
+}
+
+=pod
+
+=item $session->check_is_blocked($input, $lines)
+
+Wait for a timeout to expire, and complain if any input is received before that.
+
+=cut
+
+sub check_is_blocked
+{
+	my ($self) = @_;
+	${$self->{_stdout}} = '';
+	$self->{_timer}->start(5);
+	pump $self->{_harness} until (${$self->{_stdout}} ne '') || $self->{_timer}->is_expired;
+	die "expected to be blocked, but received: ${$self->{_stdout}}" if !$self->{_timer}->is_expired;
+}
+
+=pod
+
+=item $session->close()
+
+Close a PsqlSession connection.
+
+=cut
+
+sub close
+{
+	my ($self) = @_;
+	$self->{_timer}->start(5);
+	${$self->{_stdin}} .= "\\q\n";
+	finish $self->{_harness} or die "psql returned $?";
+	$self->{_timer}->reset;
+}
+
+=pod
+
+=back
+
+=cut
+
+1;
-- 
2.20.1

v5-0003-Add-test-for-SERIALIZABLE-on-streaming-replicas.patchtext/x-patch; charset=US-ASCII; name=v5-0003-Add-test-for-SERIALIZABLE-on-streaming-replicas.patchDownload
From b203630ec3e8e57d2dece0c76f1578377ecbc897 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 14 Jun 2020 23:26:58 +1200
Subject: [PATCH v5 3/3] Add test for SERIALIZABLE on streaming replicas.

This is a (slightly) distributed version of the isolation test in
src/test/isolation/read-only-anomaly-3.spec.
---
 src/test/recovery/t/021_serializable.pl | 107 ++++++++++++++++++++++++
 1 file changed, 107 insertions(+)
 create mode 100644 src/test/recovery/t/021_serializable.pl

diff --git a/src/test/recovery/t/021_serializable.pl b/src/test/recovery/t/021_serializable.pl
new file mode 100644
index 0000000000..6ab20b8b42
--- /dev/null
+++ b/src/test/recovery/t/021_serializable.pl
@@ -0,0 +1,107 @@
+# Like src/test/isolation/specs/read-only-anomaly-3.spec, except that
+# s3 runs on a streaming replica server.
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use PsqlSession;
+use TestLib;
+use Test::More tests => 13;
+
+my $node_primary = get_new_node('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL)");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0)");
+
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_replica->start;
+
+# We need three sessions.  s1 and s2 are on the primary, s3 is on the replica.
+my $s1 = PsqlSession->new($node_primary, "postgres");
+$s1->send("\\set PROMPT1 ''\n", 2);
+$s1->send("\\set PROMPT2 ''\n", 1);
+my $s2 = PsqlSession->new($node_primary, "postgres");
+$s2->send("\\set PROMPT1 ''\n", 2);
+$s2->send("\\set PROMPT2 ''\n", 1);
+my $s3 = PsqlSession->new($node_replica, "postgres");
+$s3->send("\\set PROMPT1 ''\n", 2);
+$s3->send("\\set PROMPT2 ''\n", 1);
+
+my @lines;
+my $result;
+@lines = $s1->send("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n", 2);
+shift @lines;
+is(shift @lines, "BEGIN");
+@lines = $s2->send("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n", 2);
+shift @lines;
+is(shift @lines, "BEGIN");
+@lines = $s3->send("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;\n", 2);
+shift @lines;
+is(shift @lines, "BEGIN");
+
+# s2rx
+@lines = $s2->send("SELECT balance FROM bank_account WHERE id = 'X';\n", 2);
+shift @lines;
+is(shift @lines, "0");
+
+# s2ry
+@lines = $s2->send("SELECT balance FROM bank_account WHERE id = 'Y';\n", 2);
+shift @lines;
+is(shift @lines, "0");
+
+# s1ry
+@lines = $s1->send("SELECT balance FROM bank_account WHERE id = 'Y';\n", 2);
+shift @lines;
+is(shift @lines, "0");
+
+# s1wy
+@lines = $s1->send("UPDATE bank_account SET balance = 20 WHERE id = 'Y';\n", 2);
+shift @lines;
+is(shift @lines, "UPDATE 1");
+
+# s1c
+@lines = $s1->send("COMMIT;\n", 2);
+shift @lines;
+is(shift @lines, "COMMIT");
+
+# now, we want to wait until the replica has replayed s1c
+my $until_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_replica->poll_query_until('postgres',
+    "SELECT (pg_last_wal_replay_lsn() - '$until_lsn'::pg_lsn) >= 0")
+  or die "standby never caught up";
+
+# s3r begins...
+@lines = $s3->send("SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id;\n", 1);
+$s3->check_is_blocked();
+
+# s2wx
+@lines = $s2->send("UPDATE bank_account SET balance = -11 WHERE id = 'X';\n", 2);
+shift @lines;
+is(shift @lines, "UPDATE 1");
+
+# s2c
+@lines = $s2->send("COMMIT;\n", 2);
+shift @lines;
+is(shift @lines, "COMMIT");
+
+# ... s3r completes
+@lines = $s3->send("", 2);
+is(shift @lines, "X|-11");
+is(shift @lines, "Y|20");
+
+# s3c
+@lines = $s3->send("COMMIT;\n", 2);
+shift @lines;
+is(shift @lines, "COMMIT");
+
-- 
2.20.1