commit 5a59754ea9d576a1dae39564d11a1d0f28936d17 Author: Thomas Munro Date: Thu Aug 18 09:58:09 2016 +1200 barrier-v2.patch diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index 8a55392..9dbdc26 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ +OBJS = barrier.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \ procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \ sinvaladt.o standby.o diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c new file mode 100644 index 0000000..e9abe8d --- /dev/null +++ b/src/backend/storage/ipc/barrier.c @@ -0,0 +1,161 @@ +/*------------------------------------------------------------------------- + * + * barrier.c + * Barriers for synchronizing workers. + * + * Portions Copyright (c) 2016, PostgreSQL Global Development Group + * + * This simple mechanism allows for simple cases of phase-based + * fork/join-style cooperation by a static set of workers. + * + * IDENTIFICATION + * src/backend/storage/ipc/barrier.c + * + *------------------------------------------------------------------------- + */ + +#include "storage/barrier.h" + +/* + * Initialize this barrier, setting a static number of workers that we will + * wait for at each computation phase. To use a dynamic number of workers, + * this number should be zero, and BarrierAttach and BarrierDetach should be + * used to register and deregister workers. + */ +void +BarrierInit(Barrier *barrier, int num_workers) +{ + SpinLockInit(&barrier->mutex); + barrier->num_workers = num_workers; + barrier->num_arrived = 0; + barrier->phase = 0; + ConditionVariableInit(&barrier->condition_variable); +} + +/* + * Wait for all workers to arrive at this barrier, and then return in all + * workers. + * + * Returns true in one arbitrarily selected worker (currently the first one to + * arrive). Returns false in all others. The differing return code can be + * used to coordinate a phase of work that must be done in only one worker + * while the others wait. + * + * TODO: When Michael Paquier's patch at + * https://commitfest.postgresql.org/10/629/ lands, then this function should + * take an event ID and it should be passed through the conditional variable + * wait and thence to the WaitLatch so that we can see joinpoint names show up + * in all their glory in pg_stat_activity! + * + * TODO: How should conditions like postmaster death, or being killed by the a + * parallel context owner/leader (if there is one), be handled? Should this + * component be usable in contexts other than parallel workers? + */ +bool +BarrierWait(Barrier *barrier) +{ + bool first; + bool phase; + bool release; + + SpinLockAcquire(&barrier->mutex); + ++barrier->num_arrived; + first = barrier->num_arrived == 1; + if (barrier->num_arrived == barrier->num_workers) + { + release = true; + barrier->num_arrived = 0; + barrier->phase++; + } + else + { + release = false; + phase = barrier->phase; + } + SpinLockRelease(&barrier->mutex); + + /* Check if we can release our peers and return. */ + if (release) + { + ConditionVariableBroadcast(&barrier->condition_variable); + return first; + } + + /* Wait for phase to advance. */ + for (;;) + { + ConditionVariablePrepareToSleep(&barrier->condition_variable); + SpinLockAcquire(&barrier->mutex); + release = barrier->phase != phase; + SpinLockRelease(&barrier->mutex); + if (release) + break; + ConditionVariableSleep(); + } + ConditionVariableCancelSleep(); + + return first; +} + +/* + * Attach to a barrier. All participants will now wait for this worker to + * call BarrierWait or BarrierDetach. Returns the current phase. + */ +int +BarrierAttach(Barrier *barrier) +{ + int phase; + + SpinLockAcquire(&barrier->mutex); + ++barrier->num_workers; + phase = barrier->phase; + SpinLockRelease(&barrier->mutex); + + return phase; +} + +/* + * Detach from a barrier. This may release other waiters from BarrierWait and + * advance the phase, if they were only waiting for this backend. + */ +void +BarrierDetach(Barrier *barrier) +{ + bool release; + + SpinLockAcquire(&barrier->mutex); + Assert(barrier->num_workers > 0); + --barrier->num_workers; + if (barrier->num_workers > 0 && + barrier->num_arrived == barrier->num_workers) + { + release = true; + barrier->num_arrived = 0; + barrier->phase++; + } + else + release = false; + SpinLockRelease(&barrier->mutex); + + if (release) + ConditionVariableBroadcast(&barrier->condition_variable); +} + +/* + * Return the current phase of a barrier. If the caller is neither currently + * attached to the barrier, nor one of the workers accounted for in the party + * size at initialization, then the return value is only an instantaneous + * snapshot. Otherwise, it is guaranteed to remain valid until this worker + * calls BarrierWait. + */ +int +BarrierPhase(Barrier *barrier) +{ + int phase; + + SpinLockAcquire(&barrier->mutex); + phase = barrier->phase; + SpinLockRelease(&barrier->mutex); + + return phase; +} diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h new file mode 100644 index 0000000..2d7fea2 --- /dev/null +++ b/src/include/storage/barrier.h @@ -0,0 +1,42 @@ +/*------------------------------------------------------------------------- + * + * barrier.h + * Barriers for synchronizing workers. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/barrier.h + * + *------------------------------------------------------------------------- + */ +#ifndef BARRIER_H +#define BARRIER_H + +/* + * For the header previously known as "barrier.h", please include + * "port/atomics.h", which deals with atomics, compiler barriers and memory + * barriers. + */ + +#include "postgres.h" + +#include "storage/condition_variable.h" +#include "storage/spin.h" + +typedef struct Barrier +{ + slock_t mutex; + int phase; + int num_workers; + int num_arrived; + ConditionVariable condition_variable; +} Barrier; + +extern void BarrierInit(Barrier *barrier, int num_workers); +extern bool BarrierWait(Barrier *barrier); +extern int BarrierAttach(Barrier *barrier); +extern void BarrierDetach(Barrier *barrier); +extern int BarrierPhase(Barrier *barrier); + +#endif /* BARRIER_H */