From f791c7ee9815871a1843116febffc0077d29a9a3 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 3 Apr 2024 11:48:27 +0200
Subject: [PATCH v16 1/2] Make XLogCtl->log{Write,Flush}Result accessible with
 atomics

This removes the need to hold both the info_lck spinlock and
WALWriteLock to update them.
---
 src/backend/access/transam/xlog.c | 91 +++++++++++++++++--------------
 1 file changed, 49 insertions(+), 42 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f26533ec01..9b68afd400 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -292,12 +292,7 @@ static bool doPageWrites;
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
  * The positions already written/fsynced are maintained in logWriteResult
- * and logFlushResult.
- *
- * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
- * info_lck or WALWriteLock.  To update them, you need to hold both locks.
- * The point of this arrangement is that the value can be examined by code
- * that already holds WALWriteLock without needing to grab info_lck as well.
+ * and logFlushResult using atomic access.
  * In addition to the shared variable, each backend has a private copy of
  * both in LogwrtResult, which is updated when convenient.
  *
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogRecPtr	logWriteResult; /* last byte + 1 written out */
-	XLogRecPtr	logFlushResult; /* last byte + 1 flushed */
+	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
+	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
@@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0};
 
 /*
  * Update local copy of shared XLogCtl->log{Write,Flush}Result
+ *
+ * It's critical that Flush always trails Write, so the order of the reads is
+ * important, as is the barrier.
  */
 #define RefreshXLogWriteResult(_target) \
 	do { \
-		_target.Write = XLogCtl->logWriteResult; \
-		_target.Flush = XLogCtl->logFlushResult; \
+		_target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \
+		pg_read_barrier(); \
+		_target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \
 	} while (0)
 
 /*
@@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		RefreshXLogWriteResult(LogwrtResult);
 		SpinLockRelease(&XLogCtl->info_lck);
+		RefreshXLogWriteResult(LogwrtResult);
 	}
 
 	/*
@@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			RefreshXLogWriteResult(LogwrtResult);
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
+			 * Acquire an up-to-date LogwrtResult value and see if we still
+			 * need to write it or if someone else already did.
 			 */
+			RefreshXLogWriteResult(LogwrtResult);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -2559,14 +2554,31 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->logWriteResult = LogwrtResult.Write;
-		XLogCtl->logFlushResult = LogwrtResult.Flush;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
 		SpinLockRelease(&XLogCtl->info_lck);
+
+		pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write);
+		pg_write_barrier();
+		pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush);
 	}
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		/* See RefreshXLogWriteResult about ordering */
+		XLogRecPtr	Flush;
+		XLogRecPtr	Write;
+
+		Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
+		pg_read_barrier();
+		Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
+
+		/* WAL flushed to disk is always ahead of WAL written */
+		Assert(Write >= Flush);
+	}
+#endif
 }
 
 /*
@@ -2583,7 +2595,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	RefreshXLogWriteResult(LogwrtResult);
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2609,6 +2620,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	{
 		int			flushblocks;
 
+		RefreshXLogWriteResult(LogwrtResult);
+
 		flushblocks =
 			WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
 
@@ -2791,14 +2804,8 @@ XLogFlush(XLogRecPtr record)
 	{
 		XLogRecPtr	insertpos;
 
-		/* read LogwrtResult and update local state */
-		SpinLockAcquire(&XLogCtl->info_lck);
-		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
-			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		RefreshXLogWriteResult(LogwrtResult);
-		SpinLockRelease(&XLogCtl->info_lck);
-
 		/* done already? */
+		RefreshXLogWriteResult(LogwrtResult);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2806,6 +2813,10 @@ XLogFlush(XLogRecPtr record)
 		 * Before actually performing the write, wait for all in-flight
 		 * insertions to the pages we're about to write to finish.
 		 */
+		SpinLockAcquire(&XLogCtl->info_lck);
+		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
+			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+		SpinLockRelease(&XLogCtl->info_lck);
 		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
 
 		/*
@@ -2948,9 +2959,8 @@ XLogBackgroundFlush(void)
 	 */
 	insertTLI = XLogCtl->InsertTimeLineID;
 
-	/* read LogwrtResult and update local state */
+	/* read updated LogwrtRqst */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	RefreshXLogWriteResult(LogwrtResult);
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -2958,6 +2968,7 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	RefreshXLogWriteResult(LogwrtResult);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
@@ -3126,9 +3137,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
 	RefreshXLogWriteResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -5962,11 +5971,13 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status.  This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
-
-	XLogCtl->logWriteResult = LogwrtResult.Write;
-	XLogCtl->logFlushResult = LogwrtResult.Flush;
-
+	pg_atomic_init_u64(&XLogCtl->logWriteResult, EndOfLog);
+	pg_atomic_init_u64(&XLogCtl->logFlushResult, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
@@ -6411,9 +6422,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
 	RefreshXLogWriteResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9327,9 +9336,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
 	RefreshXLogWriteResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	return LogwrtResult.Write;
 }
-- 
2.39.2

