From ca74df4ff11ce0fd1e51786eccaeca810921fc6d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 10 Jun 2023 09:14:07 +1200
Subject: [PATCH 4/5] Add port/pg_threads.h for a common threading API.

Loosely based on C11's <threads.h>, but with pg_ prefixes, this will
allow us to clean up many places that have to cope with POSIX and
Windows threads.
---
 src/include/port/pg_threads.h    | 252 +++++++++++++++++++++++++++++++
 src/port/Makefile                |   1 +
 src/port/meson.build             |   1 +
 src/port/pg_threads.c            | 117 ++++++++++++++
 src/tools/pgindent/typedefs.list |   7 +
 5 files changed, 378 insertions(+)
 create mode 100644 src/include/port/pg_threads.h
 create mode 100644 src/port/pg_threads.c

diff --git a/src/include/port/pg_threads.h b/src/include/port/pg_threads.h
new file mode 100644
index 0000000000..1706709994
--- /dev/null
+++ b/src/include/port/pg_threads.h
@@ -0,0 +1,252 @@
+/*
+ * A multi-threading API abstraction loosely based on the C11 standard's
+ * <threads.h> header.  The identifiers have a pg_ prefix.  Perhaps one day
+ * we'll use standard C threads directly, and we'll drop the prefixes.
+ *
+ * Exceptions:
+ *  - pg_thrd_barrier_t is not based on C11
+ */
+
+#ifndef PG_THREADS_H
+#define PG_THREADS_H
+
+#ifdef WIN32
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#include <processthreadsapi.h>
+#include <fibersapi.h>
+#include <synchapi.h>
+#else
+#include <errno.h>
+#include "port/pg_pthread.h"
+#endif
+
+#include <stdint.h>
+
+#ifdef WIN32
+typedef HANDLE pg_thrd_t;
+typedef CRITICAL_SECTION pg_mtx_t;
+typedef CONDITION_VARIABLE pg_cnd_t;
+typedef SYNCHRONIZATION_BARRIER pg_thrd_barrier_t;
+typedef DWORD pg_tss_t;
+typedef INIT_ONCE pg_once_flag;
+#define PG_ONCE_FLAG_INIT INIT_ONCE_STATIC_INIT
+#else
+typedef pthread_t pg_thrd_t;
+typedef pthread_mutex_t pg_mtx_t;
+typedef pthread_cond_t pg_cnd_t;
+typedef pthread_barrier_t pg_thrd_barrier_t;
+typedef pthread_key_t pg_tss_t;
+typedef pthread_once_t pg_once_flag;
+#define PG_ONCE_FLAG_INIT PTHREAD_ONCE_INIT
+#endif
+
+typedef int (*pg_thrd_start_t) (void *);
+typedef void (*pg_tss_dtor_t) (void *);
+typedef void (*pg_call_once_function_t) (void);
+
+enum
+{
+	pg_thrd_success = 0,
+	pg_thrd_nomem = 1,
+	pg_thrd_timedout = 2,
+	pg_thrd_busy = 3,
+	pg_thrd_error = 4
+};
+
+enum
+{
+	pg_mtx_plain = 0
+};
+
+extern int	pg_thrd_create(pg_thrd_t *thread, pg_thrd_start_t function, void *argument);
+extern int	pg_thrd_join(pg_thrd_t thread, int *result);
+extern void pg_thrd_exit(int result);
+
+#ifndef WIN32
+static inline int
+pg_thrd_maperror(int error)
+{
+	if (error == 0)
+		return pg_thrd_success;
+	if (error == ENOMEM)
+		return pg_thrd_nomem;
+	return pg_thrd_error;
+}
+#endif
+
+#ifdef WIN32
+BOOL		pg_call_once_trampoline(pg_once_flag *flag, void *parameter, void **context);
+#endif
+
+static inline void
+pg_call_once(pg_once_flag *flag, pg_call_once_function_t function)
+{
+#ifdef WIN32
+	InitOnceExecuteOnce(flag, pg_call_once_trampoline, (void *) function, NULL);
+#else
+	pthread_once(flag, function);
+#endif
+}
+
+static inline int
+pg_thrd_equal(pg_thrd_t lhs, pg_thrd_t rhs)
+{
+#ifdef WIN32
+	return lhs == rhs;
+#else
+	return pthread_equal(lhs, rhs);
+#endif
+}
+
+static inline int
+pg_tss_create(pg_tss_t *key, pg_tss_dtor_t destructor)
+{
+#ifdef WIN32
+	//*key = FlsAlloc(destructor);
+	*key = FlsAlloc(NULL);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_key_create(key, destructor));
+#endif
+}
+
+static inline void *
+pg_tss_get(pg_tss_t key)
+{
+#ifdef WIN32
+	return FlsGetValue(key);
+#else
+	return pthread_getspecific(key);
+#endif
+}
+
+static inline int
+pg_tss_set(pg_tss_t key, void *value)
+{
+#ifdef WIN32
+	return FlsSetValue(key, value) ? pg_thrd_success : pg_thrd_error;
+#else
+	return pg_thrd_maperror(pthread_setspecific(key, value));
+#endif
+}
+
+static inline int
+pg_mtx_init(pg_mtx_t *mutex, int type)
+{
+#ifdef WIN32
+	InitializeCriticalSection(mutex);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_mutex_init(mutex, NULL));
+#endif
+}
+
+static inline int
+pg_mtx_lock(pg_mtx_t *mutex)
+{
+#ifdef WIN32
+	EnterCriticalSection(mutex);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_mutex_lock(mutex));
+#endif
+}
+
+static inline int
+pg_mtx_unlock(pg_mtx_t *mutex)
+{
+#ifdef WIN32
+	LeaveCriticalSection(mutex);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_mutex_unlock(mutex));
+#endif
+}
+
+static inline int
+pg_mtx_destroy(pg_mtx_t *mutex)
+{
+#ifdef WIN32
+	DeleteCriticalSection(mutex);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_mutex_destroy(mutex));
+#endif
+}
+
+static inline int
+pg_cnd_init(pg_cnd_t *condvar)
+{
+#ifdef WIN32
+	InitializeConditionVariable(condvar);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_cond_init(condvar, NULL));
+#endif
+}
+
+static inline int
+pg_cnd_broadcast(pg_cnd_t *condvar)
+{
+#ifdef WIN32
+	WakeAllConditionVariable(condvar);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_cond_broadcast(condvar));
+#endif
+}
+
+static inline int
+pg_cnd_wait(pg_cnd_t *condvar, pg_mtx_t *mutex)
+{
+#ifdef WIN32
+	SleepConditionVariableCS(condvar, mutex, INFINITE);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_cond_wait(condvar, mutex));
+#endif
+}
+
+static inline int
+pg_cnd_destroy(pg_cnd_t *condvar)
+{
+#ifdef WIN32
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_cond_destroy(condvar));
+#endif
+}
+
+static inline int
+pg_thrd_barrier_init(pg_thrd_barrier_t *barrier, int count)
+{
+#ifdef WIN32
+	return InitializeSynchronizationBarrier(barrier, count, 0) ? pg_thrd_success : pg_thrd_error;
+#else
+	return pg_thrd_maperror(pthread_barrier_init(barrier, NULL, count));
+#endif
+}
+
+static inline int
+pg_thrd_barrier_wait(pg_thrd_barrier_t *barrier)
+{
+#ifdef WIN32
+	EnterSynchronizationBarrier(barrier, SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY);
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_barrier_wait(barrier));
+#endif
+}
+
+static inline int
+pg_thrd_barrier_destroy(pg_thrd_barrier_t *barrier)
+{
+#ifdef WIN32
+	return pg_thrd_success;
+#else
+	return pg_thrd_maperror(pthread_barrier_destroy(barrier));
+#endif
+}
+
+#endif
diff --git a/src/port/Makefile b/src/port/Makefile
index f205c2c9c5..6e9ec3b35f 100644
--- a/src/port/Makefile
+++ b/src/port/Makefile
@@ -47,6 +47,7 @@ OBJS = \
 	path.o \
 	pg_bitutils.o \
 	pg_strong_random.o \
+	pg_threads.o \
 	pgcheckdir.o \
 	pgmkdirp.o \
 	pgsleep.o \
diff --git a/src/port/meson.build b/src/port/meson.build
index 9d0cd93c43..3d2277d151 100644
--- a/src/port/meson.build
+++ b/src/port/meson.build
@@ -8,6 +8,7 @@ pgport_sources = [
   'path.c',
   'pg_bitutils.c',
   'pg_strong_random.c',
+  'pg_threads.c',
   'pgcheckdir.c',
   'pgmkdirp.c',
   'pgsleep.c',
diff --git a/src/port/pg_threads.c b/src/port/pg_threads.c
new file mode 100644
index 0000000000..ded62b669d
--- /dev/null
+++ b/src/port/pg_threads.c
@@ -0,0 +1,117 @@
+#include "c.h"
+#include "port/pg_threads.h"
+
+#include <errno.h>
+#include <stdlib.h>
+
+/*
+ * There are small differences between the function types in C11, POSIX (return
+ * type) and Windows (return type signedness, calling convention).  The
+ * int return value will survive casting to/from void * and DWORD respectively,
+ * but we still need a small trampoline function to deal with the different
+ * function pointer type.
+ */
+typedef struct pg_thrd_thunk
+{
+	pg_thrd_start_t function;
+	void	   *argument;
+} pg_thrd_thunk;
+
+#ifdef WIN32
+BOOL
+pg_call_once_trampoline(pg_once_flag *flag, void *parameter, void **context)
+{
+	pg_call_once_function_t function = (pg_call_once_function_t) parameter;
+
+	function();
+	return TRUE;
+}
+#endif
+
+#ifdef WIN32
+static DWORD __stdcall
+pg_thrd_trampoline(void *vthunk)
+#else
+static void *
+pg_thrd_trampoline(void *vthunk)
+#endif
+{
+	pg_thrd_thunk *thunk = (pg_thrd_thunk *) vthunk;
+	void	   *argument = thunk->argument;
+	pg_thrd_start_t function = thunk->function;
+	int			result;
+
+	free(vthunk);
+
+	result = function(argument);
+
+#ifdef WIN32
+	return (DWORD) result;
+#else
+	return (void *) (intptr_t) result;
+#endif
+}
+
+int
+pg_thrd_create(pg_thrd_t *thread, pg_thrd_start_t function, void *argument)
+{
+	pg_thrd_thunk *thunk;
+
+	thunk = malloc(sizeof(*thunk));
+	if (thunk == NULL)
+		return pg_thrd_nomem;
+	thunk->function = function;
+	thunk->argument = argument;
+
+#ifdef WIN32
+	*thread = CreateThread(NULL, 0, pg_thrd_trampoline, thunk, 0, 0);
+	if (*thread != NULL)
+		return pg_thrd_success;
+#else
+	if (pthread_create(thread, NULL, pg_thrd_trampoline, thunk) == 0)
+		return pg_thrd_success;
+#endif
+
+	free(thunk);
+	return pg_thrd_error;
+}
+
+int
+pg_thrd_join(pg_thrd_t thread, int *result)
+{
+#ifdef WIN32
+	DWORD	   dword_result;
+
+	if (WaitForSingleObject(thread, INFINITE) == WAIT_OBJECT_0)
+	{
+		if (result)
+		{
+			if (!GetExitCodeThread(thread, &dword_result))
+				return pg_thrd_error;
+			*result = (int) dword_result;
+		}
+		CloseHandle(thread);
+		return pg_thrd_success;
+	}
+#else
+	void	   *void_star_result;
+
+	if (pthread_join(thread, &void_star_result) == 0)
+	{
+		if (result)
+			*result = (int) (intptr_t) void_star_result;
+		return pg_thrd_success;
+	}
+#endif
+	return pg_thrd_error;
+}
+
+void
+pg_thrd_exit(int result)
+{
+#ifdef WIN32
+	ExitThread((DWORD) result);
+#else
+	pthread_exit((void *) (intptr_t) result);
+#endif
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 260854747b..b4a22e678f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3515,6 +3515,7 @@ pg_be_sasl_mech
 pg_checksum_context
 pg_checksum_raw_context
 pg_checksum_type
+pg_cnd_t
 pg_compress_algorithm
 pg_compress_specification
 pg_conn_host
@@ -3540,7 +3541,9 @@ pg_local_to_utf_combined
 pg_locale_t
 pg_mb_radix_tree
 pg_md5_ctx
+pg_mtx_t
 pg_on_exit_callback
+pg_once_flag
 pg_prng_state
 pg_re_flags
 pg_saslprep_rc
@@ -3551,8 +3554,12 @@ pg_sha384_ctx
 pg_sha512_ctx
 pg_snapshot
 pg_stack_base_t
+pg_thrd_t
+pg_thrd_thunk
+pg_thrd_barrier_t
 pg_time_t
 pg_time_usec_t
+pg_tss_t
 pg_tz
 pg_tz_cache
 pg_tzenum
-- 
2.39.2

