From 94cd2fbf37b5f0b824e0f9a9bc23f762a8bb19b5 Mon Sep 17 00:00:00 2001
From: Michail Nikolaev <michail.nikolaev@gmail.com>
Date: Sun, 21 Nov 2021 21:23:02 +0300
Subject: [PATCH v3 1/2] memory barrier instead of spinlock

---
 src/backend/storage/ipc/procarray.c | 42 +++++++----------------------
 1 file changed, 9 insertions(+), 33 deletions(-)

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index a9945c80eb..da0c4eaa00 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -60,7 +60,6 @@
 #include "pgstat.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
-#include "storage/spin.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/rel.h"
@@ -81,7 +80,6 @@ typedef struct ProcArrayStruct
 	int			numKnownAssignedXids;	/* current # of valid entries */
 	int			tailKnownAssignedXids;	/* index of oldest valid element */
 	int			headKnownAssignedXids;	/* index of newest element, + 1 */
-	slock_t		known_assigned_xids_lck;	/* protects head/tail pointers */
 
 	/*
 	 * Highest subxid that has been removed from KnownAssignedXids array to
@@ -425,7 +423,6 @@ CreateSharedProcArray(void)
 		procArray->numKnownAssignedXids = 0;
 		procArray->tailKnownAssignedXids = 0;
 		procArray->headKnownAssignedXids = 0;
-		SpinLockInit(&procArray->known_assigned_xids_lck);
 		procArray->lastOverflowedXid = InvalidTransactionId;
 		procArray->replication_slot_xmin = InvalidTransactionId;
 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
@@ -4553,10 +4550,9 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  * pointer.  This wouldn't require any lock at all, except that on machines
  * with weak memory ordering we need to be careful that other processors
  * see the array element changes before they see the head pointer change.
- * We handle this by using a spinlock to protect reads and writes of the
- * head/tail pointers.  (We could dispense with the spinlock if we were to
- * create suitable memory access barrier primitives and use those instead.)
- * The spinlock must be taken to read or write the head/tail pointers unless
+ * We handle this by using a memory barrier to protect writes of the
+ * head pointer.
+ * The memory barrier is taken before write the head pointer unless
  * the caller holds ProcArrayLock exclusively.
  *
  * Algorithmic analysis:
@@ -4600,7 +4596,6 @@ KnownAssignedXidsCompress(bool force)
 	int			compress_index;
 	int			i;
 
-	/* no spinlock required since we hold ProcArrayLock exclusively */
 	head = pArray->headKnownAssignedXids;
 	tail = pArray->tailKnownAssignedXids;
 
@@ -4686,7 +4681,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
 
 	/*
 	 * Since only the startup process modifies the head/tail pointers, we
-	 * don't need a lock to read them here.
+	 * are safe read them here.
 	 */
 	head = pArray->headKnownAssignedXids;
 	tail = pArray->tailKnownAssignedXids;
@@ -4744,21 +4739,20 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
 	pArray->numKnownAssignedXids += nxids;
 
 	/*
-	 * Now update the head pointer.  We use a spinlock to protect this
+	 * Now update the head pointer.  We use a memory barrier to protect this
 	 * pointer, not because the update is likely to be non-atomic, but to
 	 * ensure that other processors see the above array updates before they
 	 * see the head pointer change.
 	 *
 	 * If we're holding ProcArrayLock exclusively, there's no need to take the
-	 * spinlock.
+	 * barrier.
 	 */
 	if (exclusive_lock)
 		pArray->headKnownAssignedXids = head;
 	else
 	{
-		SpinLockAcquire(&pArray->known_assigned_xids_lck);
+		pg_write_barrier();
 		pArray->headKnownAssignedXids = head;
-		SpinLockRelease(&pArray->known_assigned_xids_lck);
 	}
 }
 
@@ -4781,20 +4775,8 @@ KnownAssignedXidsSearch(TransactionId xid, bool remove)
 	int			tail;
 	int			result_index = -1;
 
-	if (remove)
-	{
-		/* we hold ProcArrayLock exclusively, so no need for spinlock */
-		tail = pArray->tailKnownAssignedXids;
-		head = pArray->headKnownAssignedXids;
-	}
-	else
-	{
-		/* take spinlock to ensure we see up-to-date array contents */
-		SpinLockAcquire(&pArray->known_assigned_xids_lck);
-		tail = pArray->tailKnownAssignedXids;
-		head = pArray->headKnownAssignedXids;
-		SpinLockRelease(&pArray->known_assigned_xids_lck);
-	}
+	tail = pArray->tailKnownAssignedXids;
+	head = pArray->headKnownAssignedXids;
 
 	/*
 	 * Standard binary search.  Note we can ignore the KnownAssignedXidsValid
@@ -5032,13 +5014,9 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 	 * cannot enter and then leave the array while we hold ProcArrayLock.  We
 	 * might miss newly-added xids, but they should be >= xmax so irrelevant
 	 * anyway.
-	 *
-	 * Must take spinlock to ensure we see up-to-date array contents.
 	 */
-	SpinLockAcquire(&procArray->known_assigned_xids_lck);
 	tail = procArray->tailKnownAssignedXids;
 	head = procArray->headKnownAssignedXids;
-	SpinLockRelease(&procArray->known_assigned_xids_lck);
 
 	for (i = tail; i < head; i++)
 	{
@@ -5085,10 +5063,8 @@ KnownAssignedXidsGetOldestXmin(void)
 	/*
 	 * Fetch head just once, since it may change while we loop.
 	 */
-	SpinLockAcquire(&procArray->known_assigned_xids_lck);
 	tail = procArray->tailKnownAssignedXids;
 	head = procArray->headKnownAssignedXids;
-	SpinLockRelease(&procArray->known_assigned_xids_lck);
 
 	for (i = tail; i < head; i++)
 	{
-- 
2.25.1

