From f9845f70b90859f1816df53cfd0a692896cf842c Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Tue, 3 Dec 2019 19:00:40 +0100 Subject: [PATCH 1/2] Global Barriers --- src/backend/postmaster/autovacuum.c | 3 +- src/backend/postmaster/bgworker.c | 31 ++++-- src/backend/postmaster/bgwriter.c | 3 + src/backend/postmaster/checkpointer.c | 3 + src/backend/postmaster/pgstat.c | 3 + src/backend/postmaster/startup.c | 3 + src/backend/postmaster/walwriter.c | 2 + src/backend/replication/walreceiver.c | 9 +- src/backend/storage/buffer/bufmgr.c | 4 + src/backend/storage/ipc/procsignal.c | 141 ++++++++++++++++++++++++++ src/backend/storage/lmgr/proc.c | 20 ++++ src/backend/tcop/postgres.c | 7 ++ src/include/pgstat.h | 1 + src/include/storage/proc.h | 9 ++ src/include/storage/procsignal.h | 23 ++++- 15 files changed, 244 insertions(+), 18 deletions(-) diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index c1dd8168ca..623934e084 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -649,8 +649,9 @@ AutoVacLauncherMain(int argc, char *argv[]) ResetLatch(MyLatch); - /* Process sinval catchup interrupts that happened while sleeping */ + /* Process pending interrupts. */ ProcessCatchupInterrupt(); + ProcessGlobalBarrierIntterupt(); /* the normal shutdown case */ if (got_SIGTERM) diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 5f8a007e73..51612257c3 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -734,23 +734,32 @@ StartBackgroundWorker(void) /* * Set up signal handlers. */ + + + /* + * SIGINT is used to signal canceling the current action for processes + * able to run queries. + */ if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) - { - /* - * SIGINT is used to signal canceling the current action - */ pqsignal(SIGINT, StatementCancelHandler); - pqsignal(SIGUSR1, procsignal_sigusr1_handler); - pqsignal(SIGFPE, FloatExceptionHandler); - - /* XXX Any other handlers needed here? */ - } else - { pqsignal(SIGINT, SIG_IGN); + + /* + * Everything with a PGPROC should be able to receive procsignal.h style + * signals. + */ + if (worker->bgw_flags & (BGWORKER_BACKEND_DATABASE_CONNECTION | + BGWORKER_SHMEM_ACCESS)) + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + else pqsignal(SIGUSR1, bgworker_sigusr1_handler); + + if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) + pqsignal(SIGFPE, FloatExceptionHandler); + else pqsignal(SIGFPE, SIG_IGN); - } + pqsignal(SIGTERM, bgworker_die); pqsignal(SIGHUP, SIG_IGN); diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 2fa631ea7a..39e2a5bb21 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -256,6 +256,9 @@ BackgroundWriterMain(void) /* Normal exit from the bgwriter is here */ proc_exit(0); /* done */ } + /* Process all pending interrupts. */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); /* * Do one cycle of dirty-buffer writing. diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index d93c941871..a78facce50 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -346,6 +346,9 @@ CheckpointerMain(void) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); + /* Process all pending interrupts. */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); /* * Process any requests or signals received recently. */ diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index fabcf31de8..8ff66e0c13 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3763,6 +3763,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_EXECUTE_GATHER: event_name = "ExecuteGather"; break; + case WAIT_EVENT_GLOBAL_BARRIER: + event_name = "GlobalBarrier"; + break; case WAIT_EVENT_HASH_BATCH_ALLOCATING: event_name = "Hash/Batch/Allocating"; break; diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index f43e57dadb..da0a670bdf 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -151,6 +151,9 @@ HandleStartupProcInterrupts(void) */ if (IsUnderPostmaster && !PostmasterIsAlive()) exit(1); + + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); } diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index cce9713408..19120aa6e1 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -255,6 +255,8 @@ WalWriterMain(void) /* Normal exit from the walwriter is here */ proc_exit(0); /* done */ } + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); /* * Do what we're here for; then, if XLogBackgroundFlush() found useful diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index c1e439adb4..2a6617876a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -146,11 +146,10 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS); void ProcessWalRcvInterrupts(void) { - /* - * Although walreceiver interrupt handling doesn't use the same scheme as - * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive - * any incoming signals on Win32. - */ + /* + * The CHECK_FOR_INTERRUPTS() call ensures global barriers are handled, + * and incoming signals on Win32 are received. + */ CHECK_FOR_INTERRUPTS(); if (got_SIGTERM) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7ad10736d5..92b5bbfcf3 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1885,6 +1885,10 @@ BufferSync(int flags) cur_tsid = CkptBufferIds[i].tsId; + /* XXX: need a more principled approach here */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); + /* * Grow array of per-tablespace status structs, every time a new * tablespace is found. diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index fde97a1036..434fb17c33 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -18,8 +18,10 @@ #include #include "access/parallel.h" +#include "access/twophase.h" #include "commands/async.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -61,9 +63,11 @@ typedef struct static ProcSignalSlot *ProcSignalSlots = NULL; static volatile ProcSignalSlot *MyProcSignalSlot = NULL; +volatile sig_atomic_t GlobalBarrierInterruptPending = false; static bool CheckProcSignal(ProcSignalReason reason); static void CleanupProcSignalState(int status, Datum arg); +static void HandleGlobalBarrierSignal(void); /* * ProcSignalShmemSize @@ -261,6 +265,8 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) { int save_errno = errno; + pg_read_barrier(); + if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT)) HandleCatchupInterrupt(); @@ -291,9 +297,144 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_GLOBAL_BARRIER)) + HandleGlobalBarrierSignal(); + SetLatch(MyLatch); latch_sigusr1_handler(); errno = save_errno; } + +/* + * + */ +uint64 +EmitGlobalBarrier(GlobalBarrierKind kind) +{ + uint64 generation; + + /* + * Broadcast flag, without incrementing generation. This ensures that all + * backends could know about this. + * + * It's OK if the to-be-signalled backend enters after our check here. A + * new backend should have current settings. + */ + for (int i = 0; i < (MaxBackends + max_prepared_xacts); i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + + if (proc->pid == 0) + continue; + + pg_atomic_fetch_or_u32(&proc->barrierFlags, (uint32) kind); + + elog(LOG, "setting flags for %u", proc->pid); + } + + /* + * Broadcast flag generation. If any backend joins after this, it's either + * going to be signalled below, or has read a new enough generation that + * WaitForGlobalBarrier() will not wait for it. + */ + generation = pg_atomic_add_fetch_u64(&ProcGlobal->globalBarrierGen, 1); + + /* Wake up each backend (including ours) */ + for (int i = 0; i < NumProcSignalSlots; i++) + { + ProcSignalSlot *slot = &ProcSignalSlots[i]; + + if (slot->pss_pid == 0) + continue; + + /* Atomically set the proper flag */ + slot->pss_signalFlags[PROCSIG_GLOBAL_BARRIER] = true; + + pg_write_barrier(); + + /* Send signal */ + kill(slot->pss_pid, SIGUSR1); + } + + return generation; +} + +/* + * Wait for all barriers to be absorbed. This guarantees that all changes + * requested by a specific EmitGlobalBarrier() have taken effect. + */ +void +WaitForGlobalBarrier(uint64 generation) +{ + pgstat_report_wait_start(WAIT_EVENT_GLOBAL_BARRIER); + for (int i = 0; i < (MaxBackends + max_prepared_xacts); i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + uint64 oldval; + + pg_memory_barrier(); + oldval = pg_atomic_read_u64(&proc->barrierGen); + + /* + * Unused proc slots get their barrierGen set to UINT64_MAX, so we + * need not care about that. + */ + while (oldval < generation) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000); + + pg_memory_barrier(); + oldval = pg_atomic_read_u64(&proc->barrierGen); + } + } + pgstat_report_wait_end(); +} + +/* + * Absorb the global barrier procsignal. + */ +static void +HandleGlobalBarrierSignal(void) +{ + InterruptPending = true; + GlobalBarrierInterruptPending = true; + SetLatch(MyLatch); +} + +/* + * Perform global barrier related interrupt checking. If CHECK_FOR_INTERRUPTS + * is used, it'll be called by that, if a backend type doesn't do so, it has + * to be called explicitly. + */ +void +ProcessGlobalBarrierIntterupt(void) +{ + if (GlobalBarrierInterruptPending) + { + uint64 generation; + uint32 flags; + + GlobalBarrierInterruptPending = false; + + generation = pg_atomic_read_u64(&ProcGlobal->globalBarrierGen); + pg_memory_barrier(); + flags = pg_atomic_exchange_u32(&MyProc->barrierFlags, 0); + pg_memory_barrier(); + + if (flags & GLOBBAR_CHECKSUM) + { + /* + * By virtue of getting here (i.e. interrupts being processed), we + * know that this backend won't have any in-progress writes (which + * might have missed the checksum change). + */ + } + + pg_atomic_write_u64(&MyProc->barrierGen, generation); + + elog(LOG, "processed interrupts for %u", MyProcPid); + } +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index fff0628e58..27d8a20fca 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -189,6 +189,7 @@ InitProcGlobal(void) ProcGlobal->checkpointerLatch = NULL; pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO); pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO); + pg_atomic_init_u64(&ProcGlobal->globalBarrierGen, 1); /* * Create and initialize all the PGPROC structures we'll need. There are @@ -283,6 +284,9 @@ InitProcGlobal(void) */ pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO); pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO); + + pg_atomic_init_u32(&procs[i].barrierFlags, 0); + pg_atomic_init_u64(&procs[i].barrierGen, PG_UINT64_MAX); } /* @@ -441,6 +445,12 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO); + /* pairs with globalBarrierGen increase */ + pg_memory_barrier(); + pg_atomic_write_u32(&MyProc->barrierFlags, 0); + pg_atomic_write_u64(&MyProc->barrierGen, + pg_atomic_read_u64(&ProcGlobal->globalBarrierGen)); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far @@ -584,6 +594,13 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + + /* pairs with globalBarrierGen increase */ + pg_memory_barrier(); + pg_atomic_write_u32(&MyProc->barrierFlags, 0); + pg_atomic_write_u64(&MyProc->barrierGen, + pg_atomic_read_u64(&ProcGlobal->globalBarrierGen)); + #ifdef USE_ASSERT_CHECKING { int i; @@ -882,6 +899,9 @@ ProcKill(int code, Datum arg) LWLockRelease(leader_lwlock); } + pg_atomic_write_u32(&MyProc->barrierFlags, 0); + pg_atomic_write_u64(&MyProc->barrierGen, PG_UINT64_MAX); + /* * Reset MyLatch to the process local one. This is so that signal * handlers et al can continue using the latch after the shared latch diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 3b85e48333..b721b5a929 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -606,6 +606,10 @@ ProcessClientWriteInterrupt(bool blocked) SetLatch(MyLatch); } + /* safe to handle during client communication */ + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); + errno = save_errno; } @@ -3181,6 +3185,9 @@ ProcessInterrupts(void) if (ParallelMessagePending) HandleParallelMessages(); + + if (GlobalBarrierInterruptPending) + ProcessGlobalBarrierIntterupt(); } diff --git a/src/include/pgstat.h b/src/include/pgstat.h index fe076d823d..c997add881 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -824,6 +824,7 @@ typedef enum WAIT_EVENT_CHECKPOINT_DONE, WAIT_EVENT_CHECKPOINT_START, WAIT_EVENT_EXECUTE_GATHER, + WAIT_EVENT_GLOBAL_BARRIER, WAIT_EVENT_HASH_BATCH_ALLOCATING, WAIT_EVENT_HASH_BATCH_ELECTING, WAIT_EVENT_HASH_BATCH_LOADING, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 281e1db725..f108ac52c6 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -203,6 +203,13 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * Support for "super barriers". These can be used to e.g. make sure that + * all backends have acknowledged a configuration change. + */ + pg_atomic_uint64 barrierGen; + pg_atomic_uint32 barrierFlags; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ @@ -272,6 +279,8 @@ typedef struct PROC_HDR int startupProcPid; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + + pg_atomic_uint64 globalBarrierGen; } PROC_HDR; extern PGDLLIMPORT PROC_HDR *ProcGlobal; diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 05b186a05c..a978db9b24 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -14,8 +14,9 @@ #ifndef PROCSIGNAL_H #define PROCSIGNAL_H -#include "storage/backendid.h" +#include +#include "storage/backendid.h" /* * Reasons for signalling a Postgres child process (a backend or an auxiliary @@ -42,6 +43,8 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + PROCSIG_GLOBAL_BARRIER, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; @@ -57,4 +60,22 @@ extern int SendProcSignal(pid_t pid, ProcSignalReason reason, extern void procsignal_sigusr1_handler(SIGNAL_ARGS); +/* + * These collapse. The flag values better be distinct bits. + */ +typedef enum GlobalBarrierKind +{ + /* + * Guarantee that all processes have the correct view of whether checksums + * enabled/disabled, and no writes are in-progress with previous value(s). + */ + GLOBBAR_CHECKSUM = 1 << 0 +} GlobalBarrierKind; + +extern uint64 EmitGlobalBarrier(GlobalBarrierKind kind); +extern void WaitForGlobalBarrier(uint64 generation); +extern void ProcessGlobalBarrierIntterupt(void); + +extern PGDLLIMPORT volatile sig_atomic_t GlobalBarrierInterruptPending; + #endif /* PROCSIGNAL_H */ -- 2.21.0 (Apple Git-122.2)