From 370d4d740eb011f69cbd9a656dee53c7cdad3211 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 29 Oct 2018 22:16:02 -0700
Subject: [PATCH 3/3] WIP: global barriers.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/postmaster/autovacuum.c   |   3 +-
 src/backend/postmaster/bgwriter.c     |   4 +
 src/backend/postmaster/checkpointer.c |   4 +
 src/backend/postmaster/startup.c      |   3 +
 src/backend/postmaster/walwriter.c    |   2 +
 src/backend/replication/walreceiver.c |   5 +-
 src/backend/storage/buffer/bufmgr.c   |   4 +
 src/backend/storage/ipc/procsignal.c  | 138 ++++++++++++++++++++++++++
 src/backend/storage/lmgr/proc.c       |  20 ++++
 src/backend/tcop/postgres.c           |   7 ++
 src/include/storage/proc.h            |   9 ++
 src/include/storage/procsignal.h      |  23 ++++-
 12 files changed, 217 insertions(+), 5 deletions(-)

diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 978089575b8..e821adcb4f3 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -651,8 +651,9 @@ AutoVacLauncherMain(int argc, char *argv[])
 
 		ResetLatch(MyLatch);
 
-		/* Process sinval catchup interrupts that happened while sleeping */
+		/* Process pending interrupts. */
 		ProcessCatchupInterrupt();
+		ProcessGlobalBarrierIntterupt();
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 87157b543fa..cd82cce4938 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -262,6 +262,10 @@ BackgroundWriterMain(void)
 			proc_exit(0);		/* done */
 		}
 
+		/* Process all pending interrupts. */
+		if (GlobalBarrierInterruptPending)
+			ProcessGlobalBarrierIntterupt();
+
 		/*
 		 * Do one cycle of dirty-buffer writing.
 		 */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 31c9644759a..25e741d9869 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -349,6 +349,10 @@ CheckpointerMain(void)
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
 
+		/* Process all pending interrupts. */
+		if (GlobalBarrierInterruptPending)
+			ProcessGlobalBarrierIntterupt();
+
 		/*
 		 * Process any requests or signals received recently.
 		 */
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index b8d5f5a2073..2e4b0d55056 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -151,6 +151,9 @@ HandleStartupProcInterrupts(void)
 	 */
 	if (IsUnderPostmaster && !PostmasterIsAlive())
 		exit(1);
+
+	if (GlobalBarrierInterruptPending)
+		ProcessGlobalBarrierIntterupt();
 }
 
 
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 62c239e4a2c..e36198ce0d4 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -260,6 +260,8 @@ WalWriterMain(void)
 			/* Normal exit from the walwriter is here */
 			proc_exit(0);		/* done */
 		}
+		if (GlobalBarrierInterruptPending)
+			ProcessGlobalBarrierIntterupt();
 
 		/*
 		 * Do what we're here for; then, if XLogBackgroundFlush() found useful
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 5ec471259f9..4f6838b2a5f 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -155,9 +155,8 @@ static void
 ProcessWalRcvInterrupts(void)
 {
 	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32.
+	 * The CHECK_FOR_INTERRUPTS() call ensures global barriers are handled,
+	 * and incoming signals on Win32 are received.
 	 */
 	CHECK_FOR_INTERRUPTS();
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 01eabe57063..02e3b9ac396 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1877,6 +1877,10 @@ BufferSync(int flags)
 
 		cur_tsid = CkptBufferIds[i].tsId;
 
+		/* XXX: need a more principled approach here */
+		if (GlobalBarrierInterruptPending)
+			ProcessGlobalBarrierIntterupt();
+
 		/*
 		 * Grow array of per-tablespace status structs, every time a new
 		 * tablespace is found.
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index b0dd7d1b377..c12a6e85cb0 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -18,6 +18,7 @@
 #include <unistd.h>
 
 #include "access/parallel.h"
+#include "access/twophase.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "replication/walsender.h"
@@ -62,9 +63,11 @@ typedef struct
 
 static ProcSignalSlot *ProcSignalSlots = NULL;
 static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
+volatile sig_atomic_t GlobalBarrierInterruptPending = false;
 
 static bool CheckProcSignal(ProcSignalReason reason);
 static void CleanupProcSignalState(int status, Datum arg);
+static void HandleGlobalBarrierSignal(void);
 
 /*
  * ProcSignalShmemSize
@@ -262,6 +265,8 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 {
 	int			save_errno = errno;
 
+	pg_read_barrier();
+
 	if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT))
 		HandleCatchupInterrupt();
 
@@ -292,9 +297,142 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+	if (CheckProcSignal(PROCSIG_GLOBAL_BARRIER))
+		HandleGlobalBarrierSignal();
+
 	SetLatch(MyLatch);
 
 	latch_sigusr1_handler();
 
 	errno = save_errno;
 }
+
+/*
+ *
+ */
+uint64
+EmitGlobalBarrier(GlobalBarrierKind kind)
+{
+	uint64 generation;
+
+	/*
+	 * Broadcast flag, without incrementing generation. This ensures that all
+	 * backends could know about this.
+	 *
+	 * It's OK if the to-be-signalled backend enters after our check here. A
+	 * new backend should have current settings.
+	 */
+	for (int i = 0; i < (MaxBackends + max_prepared_xacts); i++)
+	{
+		PGPROC *proc = &ProcGlobal->allProcs[i];
+
+		if (proc->pid == 0)
+			continue;
+
+		pg_atomic_fetch_or_u32(&proc->barrierFlags, (uint32) kind);
+
+		elog(LOG, "setting flags for %u", proc->pid);
+	}
+
+	/*
+	 * Broadcast flag generation. If any backend joins after this, it's either
+	 * going to be signalled below, or has read a new enough generation that
+	 * WaitForGlobalBarrier() will not wait for it.
+	 */
+	generation = pg_atomic_add_fetch_u64(&ProcGlobal->globalBarrierGen, 1);
+
+	/* Wake up each backend (including ours) */
+	for (int i = 0; i < NumProcSignalSlots; i++)
+	{
+		ProcSignalSlot *slot = &ProcSignalSlots[i];
+
+		if (slot->pss_pid == 0)
+			continue;
+
+		/* Atomically set the proper flag */
+		slot->pss_signalFlags[PROCSIG_GLOBAL_BARRIER] = true;
+
+		pg_write_barrier();
+
+		/* Send signal */
+		kill(slot->pss_pid, SIGUSR1);
+	}
+
+	return generation;
+}
+
+/*
+ * Wait for all barriers to be absorbed.  This guarantees that all changes
+ * requested by a specific EmitGlobalBarrier() have taken effect.
+ */
+void
+WaitForGlobalBarrier(uint64 generation)
+{
+	for (int i = 0; i < (MaxBackends + max_prepared_xacts); i++)
+	{
+		PGPROC *proc = &ProcGlobal->allProcs[i];
+		uint64 oldval;
+
+		pg_memory_barrier();
+		oldval = pg_atomic_read_u64(&proc->barrierGen);
+
+		/*
+		 * Unused proc slots get their barrierGen set to UINT64_MAX, so we
+		 * need not care about that.
+		 */
+		while (oldval < generation)
+		{
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(10000);
+
+			pg_memory_barrier();
+			oldval = pg_atomic_read_u64(&proc->barrierGen);
+		}
+	}
+}
+
+/*
+ * Absorb the global barrier procsignal.
+ */
+static void
+HandleGlobalBarrierSignal(void)
+{
+	InterruptPending = true;
+	GlobalBarrierInterruptPending = true;
+	SetLatch(MyLatch);
+}
+
+/*
+ * Perform global barrier related interrupt checking. If CHECK_FOR_INTERRUPTS
+ * is used, it'll be called by that, if a backend type doesn't do so, it has
+ * to be called explicitly.
+ */
+void
+ProcessGlobalBarrierIntterupt(void)
+{
+	if (GlobalBarrierInterruptPending)
+	{
+		uint64 generation;
+		uint32 flags;
+
+		GlobalBarrierInterruptPending = false;
+
+		generation = pg_atomic_read_u64(&ProcGlobal->globalBarrierGen);
+		pg_memory_barrier();
+		flags = pg_atomic_exchange_u32(&MyProc->barrierFlags, 0);
+		pg_memory_barrier();
+
+		if (flags & GLOBBAR_CHECKSUM)
+		{
+			/*
+			 * By virtue of getting here (i.e. interrupts being processed), we
+			 * know that this backend won't have any in-progress writes (which
+			 * might have missed the checksum change).
+			 */
+		}
+
+		pg_atomic_write_u64(&MyProc->barrierGen, generation);
+
+		elog(LOG, "processed interrupts for %u", MyProcPid);
+	}
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f9aaa52faf..354574ef4a5 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -187,6 +187,7 @@ InitProcGlobal(void)
 	ProcGlobal->checkpointerLatch = NULL;
 	pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO);
 	pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO);
+	pg_atomic_init_u64(&ProcGlobal->globalBarrierGen, 1);
 
 	/*
 	 * Create and initialize all the PGPROC structures we'll need.  There are
@@ -267,6 +268,9 @@ InitProcGlobal(void)
 
 		/* Initialize lockGroupMembers list. */
 		dlist_init(&procs[i].lockGroupMembers);
+
+		pg_atomic_init_u32(&procs[i].barrierFlags, 0);
+		pg_atomic_init_u64(&procs[i].barrierGen, PG_UINT64_MAX);
 	}
 
 	/*
@@ -418,6 +422,12 @@ InitProcess(void)
 	MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
 	pg_atomic_init_u32(&MyProc->clogGroupNext, INVALID_PGPROCNO);
 
+	/* pairs with globalBarrierGen increase */
+	pg_memory_barrier();
+	pg_atomic_write_u32(&MyProc->barrierFlags, 0);
+	pg_atomic_write_u64(&MyProc->barrierGen,
+						pg_atomic_read_u64(&ProcGlobal->globalBarrierGen));
+
 	/*
 	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
 	 * on it.  That allows us to repoint the process latch, which so far
@@ -561,6 +571,13 @@ InitAuxiliaryProcess(void)
 	MyProc->lwWaitMode = 0;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
+
+	/* pairs with globalBarrierGen increase */
+	pg_memory_barrier();
+	pg_atomic_write_u32(&MyProc->barrierFlags, 0);
+	pg_atomic_write_u64(&MyProc->barrierGen,
+						pg_atomic_read_u64(&ProcGlobal->globalBarrierGen));
+
 #ifdef USE_ASSERT_CHECKING
 	{
 		int			i;
@@ -859,6 +876,9 @@ ProcKill(int code, Datum arg)
 		LWLockRelease(leader_lwlock);
 	}
 
+	pg_atomic_write_u32(&MyProc->barrierFlags, 0);
+	pg_atomic_write_u64(&MyProc->barrierGen, PG_UINT64_MAX);
+
 	/*
 	 * Reset MyLatch to the process local one.  This is so that signal
 	 * handlers et al can continue using the latch after the shared latch
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 6e13d14fcd0..806c66bc320 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -589,6 +589,10 @@ ProcessClientWriteInterrupt(bool blocked)
 		CHECK_FOR_INTERRUPTS();
 	}
 
+	/* safe to handle during client communication */
+	if (GlobalBarrierInterruptPending)
+		ProcessGlobalBarrierIntterupt();
+
 	errno = save_errno;
 }
 
@@ -3123,6 +3127,9 @@ ProcessInterrupts(void)
 
 	if (ParallelMessagePending)
 		HandleParallelMessages();
+
+	if (GlobalBarrierInterruptPending)
+		ProcessGlobalBarrierIntterupt();
 }
 
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index cb613c8076e..1280c4f41f1 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -203,6 +203,13 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	/*
+	 * Support for "super barriers". These can be used to e.g. make sure that
+	 * all backends have acknowledged a configuration change.
+	 */
+	pg_atomic_uint64 barrierGen;
+	pg_atomic_uint32 barrierFlags;
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
@@ -270,6 +277,8 @@ typedef struct PROC_HDR
 	int			startupProcPid;
 	/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
 	int			startupBufferPinWaitBufId;
+
+	pg_atomic_uint64 globalBarrierGen;
 } PROC_HDR;
 
 extern PGDLLIMPORT PROC_HDR *ProcGlobal;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 6db0d69b71f..388eba83807 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -14,8 +14,9 @@
 #ifndef PROCSIGNAL_H
 #define PROCSIGNAL_H
 
-#include "storage/backendid.h"
+#include <signal.h>
 
+#include "storage/backendid.h"
 
 /*
  * Reasons for signalling a Postgres child process (a backend or an auxiliary
@@ -42,6 +43,8 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+	PROCSIG_GLOBAL_BARRIER,
+
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
 
@@ -57,4 +60,22 @@ extern int SendProcSignal(pid_t pid, ProcSignalReason reason,
 
 extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
 
+/*
+ * These collapse. The flag values better be distinct bits.
+ */
+typedef enum GlobalBarrierKind
+{
+	/*
+	 * Guarantee that all processes have the correct view of whether checksums
+	 * enabled/disabled, and no writes are in-progress with previous value(s).
+	 */
+	GLOBBAR_CHECKSUM = 1 << 0
+} GlobalBarrierKind;
+
+extern uint64 EmitGlobalBarrier(GlobalBarrierKind kind);
+extern void WaitForGlobalBarrier(uint64 generation);
+extern void ProcessGlobalBarrierIntterupt(void);
+
+extern PGDLLIMPORT volatile sig_atomic_t GlobalBarrierInterruptPending;
+
 #endif							/* PROCSIGNAL_H */
-- 
2.18.0.rc2.dirty

