From d185cf50a47a0f9e346e49ccda038bb016ce246b Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 2 Feb 2021 14:03:43 -0300
Subject: [PATCH 2/2] make LogwrtResult atomic

---
 src/backend/access/transam/xlog.c | 66 +++++++++++++++----------------
 1 file changed, 32 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f03bd473e2..10802bd56f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -405,12 +405,9 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * These structs are identical but are declared separately to indicate their
  * slightly different functions.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * XLogCtl->LogwrtResult is read and written using atomic operations.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, each member of which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -433,6 +430,12 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;			/* last byte + 1 of write position */
+	pg_atomic_uint64 Flush;			/* last byte + 1 of flush position */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -596,6 +599,7 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult;
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -613,12 +617,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -1172,9 +1170,10 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+		/* update local result copy */
+		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	}
 
 	/*
@@ -2170,13 +2169,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
 			 * still need to write it or if someone else already did.
 			 */
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -2191,7 +2190,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2438,7 +2437,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2636,7 +2635,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 */
 	if (LogwrtResult.Flush < WriteRqst.Flush &&
 		LogwrtResult.Flush < LogwrtResult.Write)
-
 	{
 		/*
 		 * Could get here without iterating above loop, in which case we might
@@ -2675,8 +2673,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 * code in a couple of places.
 	 */
 	{
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
@@ -2696,8 +2696,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2908,10 +2908,10 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2939,7 +2939,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -3056,7 +3056,6 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -3064,8 +3063,10 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
 		WriteRqst.Write = XLogCtl->asyncXactLSN;
 		SpinLockRelease(&XLogCtl->info_lck);
@@ -3081,6 +3082,7 @@ XLogBackgroundFlush(void)
 	{
 		if (openLogFile >= 0)
 		{
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
 								 wal_segment_size))
 			{
@@ -3140,7 +3142,8 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3228,9 +3231,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -7779,7 +7780,8 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -8498,9 +8500,7 @@ GetInsertRecPtr(void)
 XLogRecPtr
 GetFlushRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	return LogwrtResult.Flush;
 }
@@ -11649,9 +11649,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	return LogwrtResult.Write;
 }
-- 
2.20.1

