From bfe33b34dd6368c5c874d2777c045899272e7ff8 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 24 Oct 2021 21:48:26 +1300
Subject: [PATCH 2/2] Add futex-based semaphore replacement.

Provide a drop-in replacement for POSIX unnamed semaphores using
futexes.  Select with PREFERRED_SEMAPHORES=FUTEX.  Perhaps useful on
OSes that lack shmem unnamed semaphores but have a futex facility.

XXX POC code only!
---
 configure                     |  16 ++++-
 configure.ac                  |  16 ++++-
 src/backend/port/posix_sema.c | 106 +++++++++++++++++++++++++++++++++-
 src/include/pg_config.h.in    |   3 +
 4 files changed, 134 insertions(+), 7 deletions(-)

diff --git a/configure b/configure
index 23f1cbe9d0..4e57c6436b 100755
--- a/configure
+++ b/configure
@@ -18486,6 +18486,10 @@ if test "$ac_res" != no; then :
 fi
 
   fi
+  if test x"$PREFERRED_SEMAPHORES" = x"FUTEX" ; then
+    # Need futex implementation for this
+    USE_FUTEX_SEMAPHORES=1
+  fi
   { $as_echo "$as_me:${as_lineno-$LINENO}: checking which semaphore API to use" >&5
 $as_echo_n "checking which semaphore API to use... " >&6; }
   if test x"$USE_NAMED_POSIX_SEMAPHORES" = x"1" ; then
@@ -18502,11 +18506,19 @@ $as_echo "#define USE_UNNAMED_POSIX_SEMAPHORES 1" >>confdefs.h
       SEMA_IMPLEMENTATION="src/backend/port/posix_sema.c"
       sematype="unnamed POSIX"
     else
+      if test x"$USE_FUTEX_SEMAPHORES" = x"1" ; then
+
+$as_echo "#define USE_FUTEX_SEMAPHORES 1" >>confdefs.h
+
+        SEMA_IMPLEMENTATION="src/backend/port/posix_sema.c"
+        sematype="futex"
+      else
 
 $as_echo "#define USE_SYSV_SEMAPHORES 1" >>confdefs.h
 
-      SEMA_IMPLEMENTATION="src/backend/port/sysv_sema.c"
-      sematype="System V"
+        SEMA_IMPLEMENTATION="src/backend/port/sysv_sema.c"
+        sematype="System V"
+      fi
     fi
   fi
   { $as_echo "$as_me:${as_lineno-$LINENO}: result: $sematype" >&5
diff --git a/configure.ac b/configure.ac
index 542b46437e..149ee401de 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2217,6 +2217,10 @@ if test "$PORTNAME" != "win32"; then
     # Need sem_init for this
     AC_SEARCH_LIBS(sem_init, [rt pthread], [USE_UNNAMED_POSIX_SEMAPHORES=1])
   fi
+  if test x"$PREFERRED_SEMAPHORES" = x"FUTEX" ; then
+    # Need futex implementation for this
+    USE_FUTEX_SEMAPHORES=1
+  fi
   AC_MSG_CHECKING([which semaphore API to use])
   if test x"$USE_NAMED_POSIX_SEMAPHORES" = x"1" ; then
     AC_DEFINE(USE_NAMED_POSIX_SEMAPHORES, 1, [Define to select named POSIX semaphores.])
@@ -2228,9 +2232,15 @@ if test "$PORTNAME" != "win32"; then
       SEMA_IMPLEMENTATION="src/backend/port/posix_sema.c"
       sematype="unnamed POSIX"
     else
-      AC_DEFINE(USE_SYSV_SEMAPHORES, 1, [Define to select SysV-style semaphores.])
-      SEMA_IMPLEMENTATION="src/backend/port/sysv_sema.c"
-      sematype="System V"
+      if test x"$USE_FUTEX_SEMAPHORES" = x"1" ; then
+        AC_DEFINE(USE_FUTEX_SEMAPHORES, 1, [Define to select futex semaphores.])
+        SEMA_IMPLEMENTATION="src/backend/port/posix_sema.c"
+        sematype="futex"
+      else
+        AC_DEFINE(USE_SYSV_SEMAPHORES, 1, [Define to select SysV-style semaphores.])
+        SEMA_IMPLEMENTATION="src/backend/port/sysv_sema.c"
+        sematype="System V"
+      fi
     fi
   fi
   AC_MSG_RESULT([$sematype])
diff --git a/src/backend/port/posix_sema.c b/src/backend/port/posix_sema.c
index 114da3b30c..b0c65d8716 100644
--- a/src/backend/port/posix_sema.c
+++ b/src/backend/port/posix_sema.c
@@ -36,6 +36,10 @@
 #include "storage/pg_sema.h"
 #include "storage/shmem.h"
 
+#if defined(USE_FUTEX_SEMAPHORES)
+#include "port/atomics.h"
+#include "port/pg_futex.h"
+#endif
 
 /* see file header comment */
 #if defined(USE_NAMED_POSIX_SEMAPHORES) && defined(EXEC_BACKEND)
@@ -45,6 +49,9 @@
 typedef union SemTPadded
 {
 	sem_t		pgsem;
+#if defined(USE_FUTEX_SEMAPHORES)
+	pg_atomic_uint32 futexsem;
+#endif
 	char		pad[PG_CACHE_LINE_SIZE];
 } SemTPadded;
 
@@ -70,6 +77,78 @@ static int	nextSemKey;			/* next name to try */
 
 static void ReleaseSemaphores(int status, Datum arg);
 
+#ifdef USE_FUTEX_SEMAPHORES
+
+/*
+ * An implementation of POSIX unnamed semaphores in shared memory, for OSes
+ * that lack them but have futexes.
+ */
+
+static void
+pg_futex_sem_init(pg_atomic_uint32 *fut, uint32 value)
+{
+	pg_atomic_init_u32(fut, value);
+}
+
+static int
+pg_futex_sem_wait(pg_atomic_uint32 *fut)
+{
+	/*
+	 * This will only work if our atomic types are wrappers around raw
+	 * integers, so that the kernel can test the value at that address.
+	 */
+	StaticAssertStmt(sizeof(*fut) == sizeof(uint32),
+					 "cannot use emulated atomics with futexes");
+
+	for (;;)
+	{
+		uint32 old_value = pg_atomic_read_u32(fut);
+		if (old_value == 0)
+		{
+			/* Wait for someone else to move it above 0. */
+			if (pg_futex_wait_u32(fut, 0, NULL) < 0)
+			{
+				if (errno != EAGAIN)
+					return -1;
+				/* The value changed under our feet.  Try again. */
+			}
+		}
+		else
+		{
+			/* Try to decrement it. */
+			if (pg_atomic_compare_exchange_u32(fut, &old_value, old_value - 1))
+				return 0;		/* success */
+		}
+	}
+
+	pg_unreachable();
+}
+
+static int
+pg_futex_sem_post(pg_atomic_uint32 *fut)
+{
+	for (;;)
+	{
+		uint32 old_value = pg_atomic_read_u32(fut);
+
+		if (pg_atomic_compare_exchange_u32(fut, &old_value, old_value + 1))
+		{
+			/*
+			 * XXX TODO: encode nwaiters into value, so we can suppress useless
+			 * wake calls.
+			 */
+			if (pg_futex_wake(fut, INT_MAX) < 0)
+			{
+				/* Undo value change? */
+				return -1;
+			}
+			break;
+		}
+	}
+	return 0;
+}
+
+#endif
 
 #ifdef USE_NAMED_POSIX_SEMAPHORES
 
@@ -124,7 +203,7 @@ PosixSemaphoreCreate(void)
 
 	return mySem;
 }
-#else							/* !USE_NAMED_POSIX_SEMAPHORES */
+#elif defined(USE_UNNAMED_POSIX_SEMAPHORES)
 
 /*
  * PosixSemaphoreCreate
@@ -139,6 +218,7 @@ PosixSemaphoreCreate(sem_t *sem)
 }
 #endif							/* USE_NAMED_POSIX_SEMAPHORES */
 
+#ifndef USE_FUTEX_SEMAPHORES
 
 /*
  * PosixSemaphoreKill	- removes a semaphore
@@ -156,6 +236,7 @@ PosixSemaphoreKill(sem_t *sem)
 		elog(LOG, "sem_destroy failed: %m");
 #endif
 }
+#endif
 
 
 /*
@@ -239,18 +320,22 @@ PGReserveSemaphores(int maxSemas)
 static void
 ReleaseSemaphores(int status, Datum arg)
 {
+#ifdef USE_NAMED_POSIX_SEMAPHORES
 	int			i;
 
-#ifdef USE_NAMED_POSIX_SEMAPHORES
 	for (i = 0; i < numSems; i++)
 		PosixSemaphoreKill(mySemPointers[i]);
 	free(mySemPointers);
 #endif
 
 #ifdef USE_UNNAMED_POSIX_SEMAPHORES
+	int			i;
+
 	for (i = 0; i < numSems; i++)
 		PosixSemaphoreKill(PG_SEM_REF(sharedSemas + i));
 #endif
+
+	/* Futex-based semaphores have no kernel resource to clean up. */
 }
 
 /*
@@ -262,7 +347,9 @@ PGSemaphore
 PGSemaphoreCreate(void)
 {
 	PGSemaphore sema;
+#ifndef USE_FUTEX_SEMAPHORES
 	sem_t	   *newsem;
+#endif
 
 	/* Can't do this in a backend, because static state is postmaster's */
 	Assert(!IsUnderPostmaster);
@@ -275,6 +362,9 @@ PGSemaphoreCreate(void)
 	/* Remember new sema for ReleaseSemaphores */
 	mySemPointers[numSems] = newsem;
 	sema = (PGSemaphore) newsem;
+#elif defined(USE_FUTEX_SEMAPHORES)
+	sema = &sharedSemas[numSems];
+	pg_futex_sem_init(&sema->sem_padded.futexsem, 1);
 #else
 	sema = &sharedSemas[numSems];
 	newsem = PG_SEM_REF(sema);
@@ -294,6 +384,9 @@ PGSemaphoreCreate(void)
 void
 PGSemaphoreReset(PGSemaphore sema)
 {
+#ifdef USE_FUTEX_SEMAPHORES
+	pg_atomic_write_u32(&sema->sem_padded.futexsem, 0);
+#else
 	/*
 	 * There's no direct API for this in POSIX, so we have to ratchet the
 	 * semaphore down to 0 with repeated trywait's.
@@ -309,6 +402,7 @@ PGSemaphoreReset(PGSemaphore sema)
 			elog(FATAL, "sem_trywait failed: %m");
 		}
 	}
+#endif
 }
 
 /*
@@ -324,7 +418,11 @@ PGSemaphoreLock(PGSemaphore sema)
 	/* See notes in sysv_sema.c's implementation of PGSemaphoreLock. */
 	do
 	{
+#if defined(USE_FUTEX_SEMAPHORES)
+		errStatus = pg_futex_sem_wait(&sema->sem_padded.futexsem);
+#else
 		errStatus = sem_wait(PG_SEM_REF(sema));
+#endif
 	} while (errStatus < 0 && errno == EINTR);
 
 	if (errStatus < 0)
@@ -349,7 +447,11 @@ PGSemaphoreUnlock(PGSemaphore sema)
 	 */
 	do
 	{
+#if defined(USE_FUTEX_SEMAPHORES)
+		errStatus = pg_futex_sem_post(&sema->sem_padded.futexsem);
+#else
 		errStatus = sem_post(PG_SEM_REF(sema));
+#endif
 	} while (errStatus < 0 && errno == EINTR);
 
 	if (errStatus < 0)
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 6bd2f7b5d8..b365d6ced3 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -916,6 +916,9 @@
 /* Define to 1 to build with BSD Authentication support. (--with-bsd-auth) */
 #undef USE_BSD_AUTH
 
+/* Define to select futex semaphores. */
+#undef USE_FUTEX_SEMAPHORES
+
 /* Define to build with ICU support. (--with-icu) */
 #undef USE_ICU
 
-- 
2.30.2

