condition variables
Hi,
Some of my EnterpriseDB colleagues and I have been working on various
parallel query projects, all of which have been previously disclosed
here:
https://wiki.postgresql.org/wiki/EnterpriseDB_database_server_roadmap
One issue we've encountered is that it's not very easy for one process
in a group of cooperating parallel processes to wait for another
process in that same group. One idea is to have one process grab an
LWLock and other processes try to acquire it, but that actually
doesn't work very well. A pretty obvious problem is that it holds of
interrupts for the entire time that you are holding the lock, which is
pretty undesirable. A more subtle problem is that it's easy to
conceive of situations where the LWLock paradigm is just a very poor
fit for what you actually want to do. For example, suppose you have a
computation which proceeds in two phases: each backend that finishes
phase 1 must wait until all backends finish phase 1, and once all have
finished, all can begin phase 2. You could handle this case by having
an LWLock which everyone holds during phase 1 in shared mode, and then
everyone must briefly acquire it in exclusive mode before starting
phase 2, but that's an awful hack. It also has race conditions: what
if someone finishes phase 1 before everyone has started phase 1? And
what if there are 10 phases instead of 2?
Another approach to the problem is to use a latch wait loop. That
almost works. Interrupts can be serviced, and you can recheck shared
memory to see whether the condition for proceeding is satisfied after
each iteration of the loop. There's only one problem: when you do
something that might cause the condition to be satisfied for other
waiting backends, you need to set their latch - but you don't have an
easy way to know exactly which processes are waiting, so how do you
call SetLatch? I originally thought of adding a function like
SetAllLatches(ParallelContext *) and maybe that can work, but then I
had what I think is a better idea, which is to introduce a notion of
condition variables. Condition variables, of course, are a standard
synchronization primitive:
https://en.wikipedia.org/wiki/Monitor_(synchronization)#Condition_variables_2
Basically, a condition variable has three operations: you can wait for
the condition variable; you can signal the condition variable to wake
up one waiter; or you can broadcast on the condition variable to wake
up all waiters. Atomically with entering the wait, you must be able
to check whether the condition is satisfied. So, in my
implementation, a condition variable wait loop looks like this:
for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();
To wake up one waiter, another backend can call
ConditionVariableSignal(cv); to wake up all waiters,
ConditionVariableBroadcast(cv).
I am cautiously optimistic that this design will serve a wide variety
of needs for parallel query development - basically anything that
needs to wait for another process to reach a certain point in the
computation that can be detected through changes in shared memory
state. The attached patch condition-variable-v1.patch implements this
API. I originally open-coded the wait queue for this, but I've just
finished rebasing it on top of Thomas Munro's proclist stuff, so
before applying this patch you need the one from here:
/messages/by-id/CAEepm=0Vvr9zgwHt67RwuTfwMEby1GiGptBk3xFPDbbgEtZgMg@mail.gmail.com
At some point while hacking on this I realized that we could actually
replace the io_in_progress locks with condition variables; the
attached patch buffer-io-cv-v1.patch does this (it must be applied on
top of the proclist patch from the above email and also on top of
condition-variable-v1.patch). Using condition variables here seems to
have a couple of advantages. First, it means that a backend waiting
for buffer I/O to complete is interruptible. Second, it fixes a
long-running bit of nastiness in AbortBufferIO: right now, if a
backend that is doing buffer I/O aborts, the abort causes it to
release all of its LWLocks, including the buffer I/O lock. Everyone
waiting for that buffer busy-loops until the aborting process gets
around to reacquiring the lock and updating the buffer state in
AbortBufferIO. But if we replace the io_in_progress locks with
condition variables, then that doesn't happen any more. Nobody is
"holding" the condition variable, so it doesn't get "released" when
the process doing I/O aborts. Instead, they just keep sleeping until
the aborting process reaches AbortBufferIO, and then it broadcasts on
the condition variable and wakes everybody up, which seems a good deal
nicer.
I'm very curious to know whether other people like this abstraction
and whether they think it will be useful for things they want to do
with parallel query (or otherwise). Comments welcome. Review
appreciated. Other suggestions for how to handle this are cool, too.
Credit: These patches were written by me; an earlier version of the
condition-variable-v1.patch was reviewed and tested by Rahila Syed.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Attachments:
condition-variable-v1.patchapplication/x-download; name=condition-variable-v1.patchDownload
From e577260dce26ac501c9ad00b899469a1cc028e1a Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 11 Aug 2016 17:11:32 -0400
Subject: [PATCH 2/4] Condition variable code.
Reviewed in an earlier version by Rahila Syed.
---
src/backend/access/transam/xact.c | 4 +
src/backend/bootstrap/bootstrap.c | 2 +
src/backend/postmaster/bgwriter.c | 2 +
src/backend/postmaster/checkpointer.c | 2 +
src/backend/postmaster/walwriter.c | 2 +
src/backend/replication/walsender.c | 2 +
src/backend/storage/lmgr/Makefile | 2 +-
src/backend/storage/lmgr/condition_variable.c | 157 ++++++++++++++++++++++++++
src/backend/storage/lmgr/proc.c | 7 ++
src/include/storage/condition_variable.h | 58 ++++++++++
src/include/storage/proc.h | 4 +
src/include/storage/proclist.h | 16 +++
12 files changed, 257 insertions(+), 1 deletion(-)
create mode 100644 src/backend/storage/lmgr/condition_variable.c
create mode 100644 src/include/storage/condition_variable.h
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 23f36ea..b40b2e0 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -45,6 +45,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -2476,6 +2477,9 @@ AbortTransaction(void)
/* Reset WAL record construction state */
XLogResetInsertion();
+ /* Cancel condition variable sleep */
+ ConditionVariableCancelSleep();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index e518e17..9eeb49c 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -33,6 +33,7 @@
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
@@ -535,6 +536,7 @@ static void
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
}
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 00f03d8..40f3f80 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -46,6 +46,7 @@
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -189,6 +190,7 @@ BackgroundWriterMain(void)
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 8d4b353..0c072f3 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -273,6 +274,7 @@ CheckpointerMain(void)
* files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 228190a..e5de019 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -50,6 +50,7 @@
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -169,6 +170,7 @@ WalWriterMain(void)
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a0dba19..44143d7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,6 +66,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index cd6ec73..e1b787e 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
- s_lock.o predicate.o
+ s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644
index 0000000..0639689
--- /dev/null
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -0,0 +1,157 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ * Implementation of condition variables. Condition variables provide
+ * a way for one process to wait until a specific condition occurs,
+ * without needing to know the specific identity of the process for
+ * which they are waiting. Waits for condition variables can be
+ * interrupted, unlike LWLock waits. Condition variables are safe
+ * to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+ SpinLockInit(&cv->mutex);
+ proclist_init(&cv->wakeup);
+}
+
+/*
+ * Add ourselves to the wait queue for a condition variable and mark
+ * ourselves as sleeping.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+ int pgprocno = MyProc->pgprocno;
+
+ /*
+ * It's not legal to prepare a sleep until the previous sleep has been
+ * completed or cancelled.
+ */
+ Assert(cv_sleep_target == NULL);
+
+ /* Record the condition variable on which we will sleep. */
+ cv_sleep_target = cv;
+
+ /* Mark myself as sleeping. */
+ MyProc->cvSleeping = true;
+
+ /* Add myself to the wait queue. */
+ SpinLockAcquire(&cv->mutex);
+ proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+}
+
+/*
+ * Sleeping on a condition variable is extremely simple. We just repeatedly
+ * wait on our latch until someone clears our cvSleeping flag. This may
+ * even happen immediately, since a signal or broadcast operation could have
+ * happened after we prepared to sleep and before we reach this function.
+ */
+void
+ConditionVariableSleep(void)
+{
+ Assert(cv_sleep_target != NULL);
+
+ while (MyProc->cvSleeping)
+ {
+ CHECK_FOR_INTERRUPTS();
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+ ResetLatch(&MyProc->procLatch);
+ }
+
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Cancel any pending sleep operation. We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+ ConditionVariable *cv = cv_sleep_target;
+
+ if (cv_sleep_target == NULL)
+ return;
+
+ SpinLockAcquire(&cv->mutex);
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ MyProc->cvSleeping = false;
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+ PGPROC *proc = NULL;
+
+ /* Remove the first process from the wakeup queue (if any). */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_is_empty(&cv->wakeup))
+ proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* If we found someone sleeping, set their latch to wake them up. */
+ if (proc != NULL)
+ {
+ SetLatch(&proc->procLatch);
+ return true;
+ }
+
+ /* No sleeping processes. */
+ return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+ int nwoken = 0;
+
+ /*
+ * Let's just do this the dumbest way possible. We could try to dequeue
+ * all the sleepers at once to save spinlock cycles, but it's a bit hard
+ * to get that right in the face of possible sleep cancellations, and
+ * we don't want to loop holding the mutex.
+ */
+ while (ConditionVariableSignal(cv))
+ ++nwoken;
+
+ return nwoken;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 9a758bd..ec08091 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -42,6 +42,7 @@
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -805,6 +806,9 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -910,6 +914,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644
index 0000000..54b7fba
--- /dev/null
+++ b/src/include/storage/condition_variable.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ * Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true. Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable. In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+ slock_t mutex;
+ proclist_head wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable. In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met. If not, the process should then sleep. If so,
+ * it should cancel the sleep. A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again. If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition. Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(void);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif /* CONDITION_VARIABLE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index f576f05..812008a 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -115,6 +115,10 @@ struct PGPROC
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
+ /* Support for condition variables. */
+ bool cvSleeping; /* true if sleeping on a condition variable */
+ proclist_node cvWaitLink; /* position in CV wait list */
+
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 2013a40..0d7935c 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -120,6 +120,20 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+ PGPROC *proc;
+
+ Assert(!proclist_is_empty(list));
+ proc = GetPGProcByNumber(list->head);
+ proclist_delete_offset(list, list->head, node_offset);
+ return proc;
+}
+
+/*
* Helper macros to avoid repetition of offsetof(PGPROC, <member>).
* 'link_member' is the name of a proclist_node member in PGPROC.
*/
@@ -129,6 +143,8 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+ proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current
--
2.5.4 (Apple Git-61)
buffer-io-cv-v1.patchapplication/x-download; name=buffer-io-cv-v1.patchDownload
From 923f55423200d3034380068ff1357af32deedf8f Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 11 Aug 2016 17:11:34 -0400
Subject: [PATCH 3/4] Replace buffer I/O locks with condition variables.
---
src/backend/storage/buffer/buf_init.c | 25 +++++---------
src/backend/storage/buffer/bufmgr.c | 59 ++++++++------------------------
src/include/storage/buf_internals.h | 7 ++--
src/include/storage/condition_variable.h | 11 ++++++
4 files changed, 38 insertions(+), 64 deletions(-)
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index a4163cf..37ebd37 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -20,7 +20,7 @@
BufferDescPadded *BufferDescriptors;
char *BufferBlocks;
-LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
+ConditionVariableMinimallyPadded *BufferIOCVArray = NULL;
LWLockTranche BufferIOLWLockTranche;
LWLockTranche BufferContentLWLockTranche;
WritebackContext BackendWritebackContext;
@@ -71,7 +71,7 @@ InitBufferPool(void)
{
bool foundBufs,
foundDescs,
- foundIOLocks,
+ foundIOCV,
foundBufCkpt;
/* Align descriptors to a cacheline boundary. */
@@ -85,16 +85,10 @@ InitBufferPool(void)
NBuffers * (Size) BLCKSZ, &foundBufs);
/* Align lwlocks to cacheline boundary */
- BufferIOLWLockArray = (LWLockMinimallyPadded *)
- ShmemInitStruct("Buffer IO Locks",
- NBuffers * (Size) sizeof(LWLockMinimallyPadded),
- &foundIOLocks);
-
- BufferIOLWLockTranche.name = "buffer_io";
- BufferIOLWLockTranche.array_base = BufferIOLWLockArray;
- BufferIOLWLockTranche.array_stride = sizeof(LWLockMinimallyPadded);
- LWLockRegisterTranche(LWTRANCHE_BUFFER_IO_IN_PROGRESS,
- &BufferIOLWLockTranche);
+ BufferIOCVArray = (ConditionVariableMinimallyPadded *)
+ ShmemInitStruct("Buffer IO Condition Variables",
+ NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded),
+ &foundIOCV);
BufferContentLWLockTranche.name = "buffer_content";
BufferContentLWLockTranche.array_base =
@@ -114,10 +108,10 @@ InitBufferPool(void)
ShmemInitStruct("Checkpoint BufferIds",
NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
- if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt)
+ if (foundDescs || foundBufs || foundIOCV || foundBufCkpt)
{
/* should find all of these, or none of them */
- Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt);
+ Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt);
/* note: this path is only taken in EXEC_BACKEND case */
}
else
@@ -147,8 +141,7 @@ InitBufferPool(void)
LWLockInitialize(BufferDescriptorGetContentLock(buf),
LWTRANCHE_BUFFER_CONTENT);
- LWLockInitialize(BufferDescriptorGetIOLock(buf),
- LWTRANCHE_BUFFER_IO_IN_PROGRESS);
+ ConditionVariableInit(BufferDescriptorGetIOCV(buf));
}
/* Correct last entry of linked list */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 76ade37..403db6e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1314,8 +1314,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
LWLockRelease(newPartitionLock);
/*
- * Buffer contents are currently invalid. Try to get the io_in_progress
- * lock. If StartBufferIO returns false, then someone else managed to
+ * Buffer contents are currently invalid. Try to obtain the right to start
+ * I/O. If StartBufferIO returns false, then someone else managed to
* read it before we did, so there's nothing left for BufferAlloc() to do.
*/
if (StartBufferIO(buf, true))
@@ -1693,9 +1693,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
uint32 buf_state;
uint32 old_buf_state;
- /* I'd better not still hold any locks on the buffer */
+ /* I'd better not still hold the buffer content lock */
Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
- Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
/*
* Decrement the shared reference count.
@@ -2656,9 +2655,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
uint32 buf_state;
/*
- * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
- * false, then someone else flushed the buffer before we could, so we need
- * not do anything.
+ * Try to start an I/O operation. If StartBufferIO returns false, then
+ * someone else flushed the buffer before we could, so we need not do
+ * anything.
*/
if (!StartBufferIO(buf, false))
return;
@@ -2714,7 +2713,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
/*
* Now it's safe to write buffer to disk. Note that no one else should
* have been able to write it while we were busy with log flushing because
- * we have the io_in_progress lock.
+ * only one process at a time can set the BM_IO_IN_PROGRESS bit.
*/
bufBlock = BufHdrGetBlock(buf);
@@ -2749,7 +2748,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
/*
* Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
- * end the io_in_progress state.
+ * end the BM_IO_IN_PROGRESS state.
*/
TerminateBufferIO(buf, true, 0);
@@ -3755,7 +3754,7 @@ ConditionalLockBufferForCleanup(Buffer buffer)
* Functions for buffer I/O handling
*
* Note: We assume that nested buffer I/O never occurs.
- * i.e at most one io_in_progress lock is held per proc.
+ * i.e at most one BM_IO_IN_PROGRESS bit is set per proc.
*
* Also note that these are used only for shared buffers, not local ones.
*/
@@ -3766,13 +3765,6 @@ ConditionalLockBufferForCleanup(Buffer buffer)
static void
WaitIO(BufferDesc *buf)
{
- /*
- * Changed to wait until there's no IO - Inoue 01/13/2000
- *
- * Note this is *necessary* because an error abort in the process doing
- * I/O could release the io_in_progress_lock prematurely. See
- * AbortBufferIO.
- */
for (;;)
{
uint32 buf_state;
@@ -3782,14 +3774,15 @@ WaitIO(BufferDesc *buf)
* here, but since this test is essential for correctness, we'd better
* play it safe.
*/
+ ConditionVariablePrepareToSleep(BufferDescriptorGetIOCV(buf));
buf_state = LockBufHdr(buf);
UnlockBufHdr(buf, buf_state);
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
- LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
- LWLockRelease(BufferDescriptorGetIOLock(buf));
+ ConditionVariableSleep();
}
+ ConditionVariableCancelSleep();
}
/*
@@ -3801,7 +3794,7 @@ WaitIO(BufferDesc *buf)
* In some scenarios there are race conditions in which multiple backends
* could attempt the same I/O operation concurrently. If someone else
* has already started I/O on this buffer then we will block on the
- * io_in_progress lock until he's done.
+ * I/O condition variable until he's done.
*
* Input operations are only attempted on buffers that are not BM_VALID,
* and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -3819,25 +3812,11 @@ StartBufferIO(BufferDesc *buf, bool forInput)
for (;;)
{
- /*
- * Grab the io_in_progress lock so that other processes can wait for
- * me to finish the I/O.
- */
- LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
buf_state = LockBufHdr(buf);
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
-
- /*
- * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
- * lock isn't held is if the process doing the I/O is recovering from
- * an error (see AbortBufferIO). If that's the case, we must wait for
- * him to get unwedged.
- */
UnlockBufHdr(buf, buf_state);
- LWLockRelease(BufferDescriptorGetIOLock(buf));
WaitIO(buf);
}
@@ -3847,7 +3826,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
{
/* someone else already did the I/O */
UnlockBufHdr(buf, buf_state);
- LWLockRelease(BufferDescriptorGetIOLock(buf));
return false;
}
@@ -3865,7 +3843,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
* (Assumptions)
* My process is executing IO for the buffer
* BM_IO_IN_PROGRESS bit is set for the buffer
- * We hold the buffer's io_in_progress lock
* The buffer is Pinned
*
* If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
@@ -3897,7 +3874,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
InProgressBuf = NULL;
- LWLockRelease(BufferDescriptorGetIOLock(buf));
+ ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
}
/*
@@ -3918,14 +3895,6 @@ AbortBufferIO(void)
{
uint32 buf_state;
- /*
- * Since LWLockReleaseAll has already been called, we're not holding
- * the buffer's io_in_progress_lock. We have to re-acquire it so that
- * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
- * buffer will be in a busy spin until we succeed in doing this.
- */
- LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
buf_state = LockBufHdr(buf);
Assert(buf_state & BM_IO_IN_PROGRESS);
if (IsForInput)
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index e0dfb2f..18ef400 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -17,6 +17,7 @@
#include "storage/buf.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
@@ -221,12 +222,12 @@ typedef union BufferDescPadded
#define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
-#define BufferDescriptorGetIOLock(bdesc) \
- (&(BufferIOLWLockArray[(bdesc)->buf_id]).lock)
+#define BufferDescriptorGetIOCV(bdesc) \
+ (&(BufferIOCVArray[(bdesc)->buf_id]).cv)
#define BufferDescriptorGetContentLock(bdesc) \
((LWLock*) (&(bdesc)->content_lock))
-extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
+extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray;
/*
* The freeNext field is either the index of the next freelist entry,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 54b7fba..38626d5 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -31,6 +31,17 @@ typedef struct
proclist_head wakeup;
} ConditionVariable;
+/*
+ * Pad a condition variable to a power-of-two size so that an array of
+ * condition variables does not cross a cache line boundary.
+ */
+#define CV_MINIMAL_SIZE (sizeof(ConditionVariable) <= 16 ? 16 : 32)
+typedef union ConditionVariableMinimallyPadded
+{
+ ConditionVariable cv;
+ char pad[CV_MINIMAL_SIZE];
+} ConditionVariableMinimallyPadded;
+
/* Initialize a condition variable. */
extern void ConditionVariableInit(ConditionVariable *);
--
2.5.4 (Apple Git-61)
Robert Haas <robertmhaas@gmail.com> writes:
I had what I think is a better idea, which is to introduce a notion of
condition variables.
Interesting proposal.
... Using condition variables here seems to
have a couple of advantages. First, it means that a backend waiting
for buffer I/O to complete is interruptible. Second, it fixes a
long-running bit of nastiness in AbortBufferIO: right now, if a
backend that is doing buffer I/O aborts, the abort causes it to
release all of its LWLocks, including the buffer I/O lock. Everyone
waiting for that buffer busy-loops until the aborting process gets
around to reacquiring the lock and updating the buffer state in
AbortBufferIO. But if we replace the io_in_progress locks with
condition variables, then that doesn't happen any more. Nobody is
"holding" the condition variable, so it doesn't get "released" when
the process doing I/O aborts. Instead, they just keep sleeping until
the aborting process reaches AbortBufferIO, and then it broadcasts on
the condition variable and wakes everybody up, which seems a good deal
nicer.
Hmm. I fear the only reason you see an advantage there is that you don't
(yet) have any general-purpose mechanism for an aborting transaction to
satisfy its responsibilities vis-a-vis waiters on condition variables.
Instead, this wins specifically because you stuck some bespoke logic into
AbortBufferIO. OK ... but that sounds like we're going to end up with
every single condition variable that ever exists in the system needing to
be catered for separately and explicitly during transaction abort cleanup.
Which does not sound promising from a reliability standpoint. On the
other hand, I don't know what the equivalent rule to "release all LWLocks
during abort" might look like for condition variables, so I don't know
if it's even possible to avoid that.
I encourage you to pursue this, because indeed LWLocks aren't always
an ideal solution, but I think it requires some careful thought about
what transaction aborts will do with condition variables.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 11, 2016 at 2:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Another approach to the problem is to use a latch wait loop. That
almost works. Interrupts can be serviced, and you can recheck shared
memory to see whether the condition for proceeding is satisfied after
each iteration of the loop. There's only one problem: when you do
something that might cause the condition to be satisfied for other
waiting backends, you need to set their latch - but you don't have an
easy way to know exactly which processes are waiting, so how do you
call SetLatch?
That's what I ended up doing with parallel CREATE INDEX. It worked,
but it would be nice to have a general purpose facility for waiting
for conditions to change.
https://en.wikipedia.org/wiki/Monitor_(synchronization)#Condition_variables_2
Basically, a condition variable has three operations: you can wait for
the condition variable; you can signal the condition variable to wake
up one waiter; or you can broadcast on the condition variable to wake
up all waiters. Atomically with entering the wait, you must be able
to check whether the condition is satisfied. So, in my
implementation, a condition variable wait loop looks like this:for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();To wake up one waiter, another backend can call
ConditionVariableSignal(cv); to wake up all waiters,
ConditionVariableBroadcast(cv).
This seems convenient.
I notice that you acquire a spinlock within the implementation of
condition variables. Is it worth any effort to consolidate the number
of spinlock acquisitions? In other words, maybe the most common idioms
should be baked into the ConditionVariable interface, which could save
callers from having to use their own mutex variable.
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Aug 12, 2016 at 9:47 AM, Robert Haas <robertmhaas@gmail.com> wrote:
https://en.wikipedia.org/wiki/Monitor_(synchronization)#Condition_variables_2
Basically, a condition variable has three operations: you can wait for
the condition variable; you can signal the condition variable to wake
up one waiter; or you can broadcast on the condition variable to wake
up all waiters. Atomically with entering the wait, you must be able
to check whether the condition is satisfied. So, in my
implementation, a condition variable wait loop looks like this:for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();To wake up one waiter, another backend can call
ConditionVariableSignal(cv); to wake up all waiters,
ConditionVariableBroadcast(cv).
It is interesting to compare this interface with Wikipedia's
description, POSIX's pthread_cond_t and C++'s std::condition_variable.
In those interfaces, the wait operation takes a mutex which must
already be held by the caller. It unlocks the mutex and begins
waiting atomically. Then when it returns, the mutex is automatically
reacquired. This approach avoids race conditions as long as the
shared state change you are awaiting is protected by that mutex. If
you check that state before waiting and while still holding the lock,
you can be sure not to miss any change signals, and then when it
returns you can check the state again and be sure that no one can be
concurrently changing it.
In contrast, this proposal leaves it up to client code to get that
right, similarly to the way you need to do things in a certain order
when waiting for state changes with latches. You could say that it's
more error prone: I think there have been a few cases of incorrectly
coded latch/state-change wait loops in the past. On the other hand,
it places no requirements on the synchronisation mechanism the client
code uses for the related shared state. pthread_cond_wait requires
you to pass in a pointer to the related pthread_mutex_t, whereas with
this proposal client code is free to use atomic ops, lwlocks,
spinlocks or any other mutual exclusion mechanism to coordinate state
changes and deal with cache coherency.
Then there is the question of what happens when the backend that is
supposed to be doing the signalling dies or aborts, which Tom Lane
referred to in his reply. In those other libraries there is no such
concern: it's understood that these are low level thread
synchronisation primitives and if you're waiting for something that
never happens, you'll be waiting forever. I don't know what the
answer is in general for Postgres condition variables, but...
The thing that I personally am working on currently that is very
closely related and could use this has a more specific set of
circumstances: I want "join points" AKA barriers. Something like
pthread_barrier_t. (I'm saying "join point" rather than "barrier" to
avoid confusion with compiler and memory barriers, barrier.h etc.)
Join points let you wait for all workers in a known set to reach a
given point, possibly with a phase number or at least sense (one bit
phase counter) to detect synchronisation bugs. They also select one
worker arbitrarily to receive a different return value when releasing
workers from a join point, for cases where a particular phase of
parallel work needs to be done by exactly one worker while the others
sit on the bench: for example initialisation, cleanup or merging (CF
PTHREAD_BARRIER_SERIAL_THREAD). Clearly a join point could be not
much more than a condition variable and some state tracking arrivals
and departures, but I think that this higher level synchronisation
primitive might have an advantage over raw condition variables in the
abort case: it can know the total set of workers that its waiting for,
if they are somehow registered with it first, and registration can
include arranging for cleanup hooks to do the right thing. It's
already a requirement for a join point to know which workers exist (or
at least how many). Then the deal would then be that when you call
joinpoint_join(&some_joinpoint, phase), it will return only when all
peers have joined or detached, where the latter happens automatically
if they abort or die. Not at all sure of the details yet... but I
suspect join points are useful for a bunch of things like parallel
sort, parallel hash join (my project), and anything else involving
phases or some form of "fork/join" parallelism.
Or perhaps that type of thinking about error handling should be pushed
down to the condition variable. How would that look: all potential
signallers would have to register to deliver a goodbye signal in their
abort and shmem exit paths? Then what happens if you die before
registering? I think even if you find a way to do that I'd still need
to do similar extra work on top for my join points concept, because
although I do need waiters to be poked at the time worker aborts or
dies, one goodbye prod isn't enough: I'd also need to adjust the join
point's set of workers, or put it into error state.
--
Thomas Munro
http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 11, 2016 at 6:12 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
But if we replace the io_in_progress locks with
condition variables, then that doesn't happen any more. Nobody is
"holding" the condition variable, so it doesn't get "released" when
the process doing I/O aborts. Instead, they just keep sleeping until
the aborting process reaches AbortBufferIO, and then it broadcasts on
the condition variable and wakes everybody up, which seems a good deal
nicer.Hmm. I fear the only reason you see an advantage there is that you don't
(yet) have any general-purpose mechanism for an aborting transaction to
satisfy its responsibilities vis-a-vis waiters on condition variables.
In the specific case of parallel query, this problem doesn't really
arise: if an error happens, it will be propagated back to the leader
(unless it occurs in the leader as an initial matter) and the leader
will then kill of all of the other workers. Since waiting for a
condition variable doesn't block interrupts, the workers will all stop
waiting and die like the obedient lemmings they are.
Instead, this wins specifically because you stuck some bespoke logic into
AbortBufferIO. OK ... but that sounds like we're going to end up with
every single condition variable that ever exists in the system needing to
be catered for separately and explicitly during transaction abort cleanup.
Which does not sound promising from a reliability standpoint. On the
other hand, I don't know what the equivalent rule to "release all LWLocks
during abort" might look like for condition variables, so I don't know
if it's even possible to avoid that.
I don't think it is possible to avoid that. Certainly the fact that
LWLocks are automatically released is in most scenarios a big
advantage, but for content locks it isn't. I note that I didn't so
much insert bespoke logic there as replace the existing bespoke logic
with less-ugly bespoke logic.
One way to conceptualize a condition variable is that it is the wait
queue for an LWLock without the lock itself. The "lock modes" are
defined by the interaction between the condition checked by waiters
before sleeping and the shared memory updates performed before
signalling or broadcasting. This separation of concerns allows for
enormous flexibility - you could use CVs to implement an LWLock-like
construct with an arbitrary set of lock modes and an arbitrary
conflict matrix, for example. But I think that in most cases it will
be best to leave mutual exclusion to LWLocks, and use CVs when what we
need is not mutual exclusion but rather waiting for an operation
performed by some other backend to complete (successfully or
otherwise). The IO-in-progress example is just such a case: the
current spinning behavior arises from the fact that the buffer's
I/O-in-progress flag does not get cleared until AFTER processes has
been dropped off the wait queue. The normal pattern with a CV will be
(1) perform a shared memory update that will, if seen by other
processes, confirm that the condition has been met and then (2)
broadcast on the CV. There's no way for transaction abort to know
what (1) looks like, even if somehow were made aware of the CV so that
it could do (2).
I encourage you to pursue this, because indeed LWLocks aren't always
an ideal solution, but I think it requires some careful thought about
what transaction aborts will do with condition variables.
Thanks. As stated above, I believe the right answer is in fact
"nothing", because this facility is too low-level to permit a
categorical answer.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 11, 2016 at 6:37 PM, Peter Geoghegan <pg@heroku.com> wrote:
I notice that you acquire a spinlock within the implementation of
condition variables. Is it worth any effort to consolidate the number
of spinlock acquisitions? In other words, maybe the most common idioms
should be baked into the ConditionVariable interface, which could save
callers from having to use their own mutex variable.
One thing to keep in mind is that spinlocks are extremely fast as long
as you don't have too many processes contending for them. With
parallel groups (or I/O-in-progress wait queues) of single digit
number of processes, I doubt that consolidating the spinlock
acquisitions will produce any measurable benefit. If we get to the
point of having parallel groups containing scores of processes, that
could change.
Also, right now we don't have enough users of the CV interface to know
what the most common idioms will be. We could speculate, but if we
do, I'll start of the speculation by guessing that there will be a lot
of diversity, and not too much that keeps getting repeated. If that
proves to be wrong, of course, we can always go back and change it
later. We're not writing this on stone tablets.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 11, 2016 at 8:44 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
In contrast, this proposal leaves it up to client code to get that
right, similarly to the way you need to do things in a certain order
when waiting for state changes with latches. You could say that it's
more error prone: I think there have been a few cases of incorrectly
coded latch/state-change wait loops in the past. On the other hand,
it places no requirements on the synchronisation mechanism the client
code uses for the related shared state. pthread_cond_wait requires
you to pass in a pointer to the related pthread_mutex_t, whereas with
this proposal client code is free to use atomic ops, lwlocks,
spinlocks or any other mutual exclusion mechanism to coordinate state
changes and deal with cache coherency.
I think you have accurately stated the pros and cons of this approach.
On the whole I think it's a good trade-off. In particular, not being
locked into a specific synchronization method seems like a good idea
from here. If you had to pick just one, it would be hard to decide
between spinlocks and LWLocks, and "atomics" isn't a sufficiently
specific thing to code to.
Then there is the question of what happens when the backend that is
supposed to be doing the signalling dies or aborts, which Tom Lane
referred to in his reply. In those other libraries there is no such
concern: it's understood that these are low level thread
synchronisation primitives and if you're waiting for something that
never happens, you'll be waiting forever. I don't know what the
answer is in general for Postgres condition variables, but...
As I noted in my reply to Tom, for parallel query, we're going to kill
all the workers at the same time, so the problem doesn't arise. When
we use this mechanism outside that context, we just have to do it
correctly. I don't think that's especially hard, but could somebody
mess it up? Sure.
The thing that I personally am working on currently that is very
closely related and could use this has a more specific set of
circumstances: I want "join points" AKA barriers. Something like
pthread_barrier_t. (I'm saying "join point" rather than "barrier" to
avoid confusion with compiler and memory barriers, barrier.h etc.)
Join points let you wait for all workers in a known set to reach a
given point, possibly with a phase number or at least sense (one bit
phase counter) to detect synchronisation bugs. They also select one
worker arbitrarily to receive a different return value when releasing
workers from a join point, for cases where a particular phase of
parallel work needs to be done by exactly one worker while the others
sit on the bench: for example initialisation, cleanup or merging (CF
PTHREAD_BARRIER_SERIAL_THREAD). Clearly a join point could be not
much more than a condition variable and some state tracking arrivals
and departures, but I think that this higher level synchronisation
primitive might have an advantage over raw condition variables in the
abort case: it can know the total set of workers that its waiting for,
if they are somehow registered with it first, and registration can
include arranging for cleanup hooks to do the right thing. It's
already a requirement for a join point to know which workers exist (or
at least how many). Then the deal would then be that when you call
joinpoint_join(&some_joinpoint, phase), it will return only when all
peers have joined or detached, where the latter happens automatically
if they abort or die. Not at all sure of the details yet... but I
suspect join points are useful for a bunch of things like parallel
sort, parallel hash join (my project), and anything else involving
phases or some form of "fork/join" parallelism.
If I'm right that the abort/die case doesn't really need any special
handling here, then I think this gets a lot simpler.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Aug 12, 2016 at 9:47 AM, Robert Haas <robertmhaas@gmail.com> wrote:
[condition-variable-v1.patch]
Don't you need to set proc->cvSleeping = false in ConditionVariableSignal?
--
Thomas Munro
http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Aug 14, 2016 at 9:04 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
On Fri, Aug 12, 2016 at 9:47 AM, Robert Haas <robertmhaas@gmail.com> wrote:
[condition-variable-v1.patch]
Don't you need to set proc->cvSleeping = false in ConditionVariableSignal?
I poked at this a bit... OK, a lot... and have some feedback:
1. As above, we need to clear cvSleeping before setting the latch.
2. The following schedule corrupts the waitlist by trying to delete
something from it that isn't in it:
P1: ConditionVariablePrepareToSleep: push self onto waitlist
P2: ConditionVariableSignal: pop P1 from waitlist
P1: <check user condition, decide to break without ever sleeping>
P3: ConditionVariablePrepareToSleep: push self onto waitlist
P1: ConditionVariableCancelSleep: delete self from waitlist (!)
Without P3 coming along you probably wouldn't notice because the
waitlist will be happily cleared and look valid, but P3's entry gets
lost and then it hangs forever.
One solution is to teach ConditionVariableCancelSleep to check if
we're still actually there first. That can be done in O(1) time by
clearing nodes' next and prev pointers when deleting, so that we can
rely on that in a new proclist_contains() function. See attached.
3. The following schedule corrupts the waitlist by trying to insert
something into it that is already in it:
P1: ConditionVariablePrepareToSleep: push self onto waitlist
P1: <check user condition, decide to sleep>
P1: ConditionVariableSleep
P1: ConditionVariablePrepareToSleep: push self onto waitlist (!)
Nodes before and after self's pre-existing position can be forgotten
when self's node is pushed to the front of the list. That can be
fixed by making ConditionVariablePrepareToSleep also check if we're
already in the list.
4. The waitlist is handled LIFO-ly. Would it be better for the first
guy to start waiting to be woken up first, like we do for locks? The
Pthreads equivalent says that it depends on "scheduling policy". I
don't know if it matters much, just an observation.
5. The new proclist function you added is the first to work in terms
of PGPROC* rather than procno. Maybe the whole interface should work
with either PGPROC pointers or procnos? No strong view.
Please find attached a -v2 of your patch which includes suggestions
1-3 above. Like the -v1, it applies on top of
lwlocks-in-dsm-v3.patch. Also, I have attached a v2->v3 diff to show
just my proposed changes.
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
diff.patchapplication/octet-stream; name=diff.patchDownload
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 0639689..1acda3c 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -61,7 +61,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
/* Add myself to the wait queue. */
SpinLockAcquire(&cv->mutex);
- proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+ if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+ proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
SpinLockRelease(&cv->mutex);
}
@@ -100,7 +101,8 @@ ConditionVariableCancelSleep(void)
return;
SpinLockAcquire(&cv->mutex);
- proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
SpinLockRelease(&cv->mutex);
MyProc->cvSleeping = false;
@@ -126,6 +128,7 @@ ConditionVariableSignal(ConditionVariable *cv)
/* If we found someone sleeping, set their latch to wake them up. */
if (proc != NULL)
{
+ proc->cvSleeping = false;
SetLatch(&proc->procLatch);
return true;
}
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 0d7935c..b14e8f8 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->tail != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->next = list->head;
proclist_node_get(node->next, node_offset)->prev = procno;
node->prev = INVALID_PGPROCNO;
@@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
- * Insert a node a the end of a list.
+ * Insert a node at the end of a list.
*/
static inline void
proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
@@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->head != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->prev = list->tail;
proclist_node_get(node->prev, node_offset)->next = procno;
node->next = INVALID_PGPROCNO;
@@ -117,6 +121,38 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
list->tail = node->prev;
else
proclist_node_get(node->next, node_offset)->prev = node->prev;
+
+ node->next = node->prev = INVALID_PGPROCNO;
+}
+
+/*
+ * Check if a node is currently in a list. It must be known that the node is
+ * not in any _other_ proclist that uses the same proclist_node, so that the
+ * only possibilities are that it is in this list or none.
+ */
+static inline bool
+proclist_contains_offset(proclist_head *list, int procno,
+ size_t node_offset)
+{
+ proclist_node *node = proclist_node_get(procno, node_offset);
+
+ /*
+ * If this node has never been a member of a cv list, then it will contain
+ * zero before and after us in the list. Circular lists are not allowed
+ * so this condition is not confusable with a real pgprocno 0.
+ */
+ if (node->prev == 0 && node->next == 0)
+ return false;
+
+ /* If there is a previous node, then this node must be in the list. */
+ if (node->prev != INVALID_PGPROCNO)
+ return true;
+
+ /*
+ * There is no previous node, so the only way this node can be in the list
+ * is if it's the head node.
+ */
+ return list->head == procno;
}
/*
@@ -145,6 +181,8 @@ proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_pop_head_node(list, link_member) \
proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
+#define proclist_contains(list, procno, link_member) \
+ proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current
condition-variable-v2.patchapplication/octet-stream; name=condition-variable-v2.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 23f36ea..b40b2e0 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -45,6 +45,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -2476,6 +2477,9 @@ AbortTransaction(void)
/* Reset WAL record construction state */
XLogResetInsertion();
+ /* Cancel condition variable sleep */
+ ConditionVariableCancelSleep();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index e518e17..9eeb49c 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -33,6 +33,7 @@
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
@@ -535,6 +536,7 @@ static void
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
}
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 00f03d8..40f3f80 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -46,6 +46,7 @@
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -189,6 +190,7 @@ BackgroundWriterMain(void)
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 8d4b353..0c072f3 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -273,6 +274,7 @@ CheckpointerMain(void)
* files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 228190a..e5de019 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -50,6 +50,7 @@
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -169,6 +170,7 @@ WalWriterMain(void)
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a0dba19..44143d7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,6 +66,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index cd6ec73..e1b787e 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
- s_lock.o predicate.o
+ s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644
index 0000000..1acda3c
--- /dev/null
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -0,0 +1,160 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ * Implementation of condition variables. Condition variables provide
+ * a way for one process to wait until a specific condition occurs,
+ * without needing to know the specific identity of the process for
+ * which they are waiting. Waits for condition variables can be
+ * interrupted, unlike LWLock waits. Condition variables are safe
+ * to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+ SpinLockInit(&cv->mutex);
+ proclist_init(&cv->wakeup);
+}
+
+/*
+ * Add ourselves to the wait queue for a condition variable and mark
+ * ourselves as sleeping.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+ int pgprocno = MyProc->pgprocno;
+
+ /*
+ * It's not legal to prepare a sleep until the previous sleep has been
+ * completed or cancelled.
+ */
+ Assert(cv_sleep_target == NULL);
+
+ /* Record the condition variable on which we will sleep. */
+ cv_sleep_target = cv;
+
+ /* Mark myself as sleeping. */
+ MyProc->cvSleeping = true;
+
+ /* Add myself to the wait queue. */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+ proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+}
+
+/*
+ * Sleeping on a condition variable is extremely simple. We just repeatedly
+ * wait on our latch until someone clears our cvSleeping flag. This may
+ * even happen immediately, since a signal or broadcast operation could have
+ * happened after we prepared to sleep and before we reach this function.
+ */
+void
+ConditionVariableSleep(void)
+{
+ Assert(cv_sleep_target != NULL);
+
+ while (MyProc->cvSleeping)
+ {
+ CHECK_FOR_INTERRUPTS();
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+ ResetLatch(&MyProc->procLatch);
+ }
+
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Cancel any pending sleep operation. We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+ ConditionVariable *cv = cv_sleep_target;
+
+ if (cv_sleep_target == NULL)
+ return;
+
+ SpinLockAcquire(&cv->mutex);
+ if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ MyProc->cvSleeping = false;
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+ PGPROC *proc = NULL;
+
+ /* Remove the first process from the wakeup queue (if any). */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_is_empty(&cv->wakeup))
+ proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* If we found someone sleeping, set their latch to wake them up. */
+ if (proc != NULL)
+ {
+ proc->cvSleeping = false;
+ SetLatch(&proc->procLatch);
+ return true;
+ }
+
+ /* No sleeping processes. */
+ return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+ int nwoken = 0;
+
+ /*
+ * Let's just do this the dumbest way possible. We could try to dequeue
+ * all the sleepers at once to save spinlock cycles, but it's a bit hard
+ * to get that right in the face of possible sleep cancellations, and
+ * we don't want to loop holding the mutex.
+ */
+ while (ConditionVariableSignal(cv))
+ ++nwoken;
+
+ return nwoken;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 9a758bd..ec08091 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -42,6 +42,7 @@
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -805,6 +806,9 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -910,6 +914,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644
index 0000000..54b7fba
--- /dev/null
+++ b/src/include/storage/condition_variable.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ * Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true. Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable. In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+ slock_t mutex;
+ proclist_head wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable. In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met. If not, the process should then sleep. If so,
+ * it should cancel the sleep. A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again. If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition. Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(void);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif /* CONDITION_VARIABLE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index f576f05..812008a 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -115,6 +115,10 @@ struct PGPROC
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
+ /* Support for condition variables. */
+ bool cvSleeping; /* true if sleeping on a condition variable */
+ proclist_node cvWaitLink; /* position in CV wait list */
+
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 2013a40..b14e8f8 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->tail != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->next = list->head;
proclist_node_get(node->next, node_offset)->prev = procno;
node->prev = INVALID_PGPROCNO;
@@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
- * Insert a node a the end of a list.
+ * Insert a node at the end of a list.
*/
static inline void
proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
@@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->head != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->prev = list->tail;
proclist_node_get(node->prev, node_offset)->next = procno;
node->next = INVALID_PGPROCNO;
@@ -117,6 +121,52 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
list->tail = node->prev;
else
proclist_node_get(node->next, node_offset)->prev = node->prev;
+
+ node->next = node->prev = INVALID_PGPROCNO;
+}
+
+/*
+ * Check if a node is currently in a list. It must be known that the node is
+ * not in any _other_ proclist that uses the same proclist_node, so that the
+ * only possibilities are that it is in this list or none.
+ */
+static inline bool
+proclist_contains_offset(proclist_head *list, int procno,
+ size_t node_offset)
+{
+ proclist_node *node = proclist_node_get(procno, node_offset);
+
+ /*
+ * If this node has never been a member of a cv list, then it will contain
+ * zero before and after us in the list. Circular lists are not allowed
+ * so this condition is not confusable with a real pgprocno 0.
+ */
+ if (node->prev == 0 && node->next == 0)
+ return false;
+
+ /* If there is a previous node, then this node must be in the list. */
+ if (node->prev != INVALID_PGPROCNO)
+ return true;
+
+ /*
+ * There is no previous node, so the only way this node can be in the list
+ * is if it's the head node.
+ */
+ return list->head == procno;
+}
+
+/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+ PGPROC *proc;
+
+ Assert(!proclist_is_empty(list));
+ proc = GetPGProcByNumber(list->head);
+ proclist_delete_offset(list, list->head, node_offset);
+ return proc;
}
/*
@@ -129,6 +179,10 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+ proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
+#define proclist_contains(list, procno, link_member) \
+ proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current
On Mon, Aug 15, 2016 at 5:58 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
Also, I have attached a v2->v3 diff ...
Ugh. I meant a v1->v2 diff.
--
Thomas Munro
http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Aug 15, 2016 at 1:58 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
On Sun, Aug 14, 2016 at 9:04 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:On Fri, Aug 12, 2016 at 9:47 AM, Robert Haas <robertmhaas@gmail.com> wrote:
[condition-variable-v1.patch]
Don't you need to set proc->cvSleeping = false in ConditionVariableSignal?
I poked at this a bit... OK, a lot... and have some feedback:
1. As above, we need to clear cvSleeping before setting the latch.
Right, OK.
2. The following schedule corrupts the waitlist by trying to delete
something from it that isn't in it:P1: ConditionVariablePrepareToSleep: push self onto waitlist
P2: ConditionVariableSignal: pop P1 from waitlist
P1: <check user condition, decide to break without ever sleeping>
P3: ConditionVariablePrepareToSleep: push self onto waitlist
P1: ConditionVariableCancelSleep: delete self from waitlist (!)Without P3 coming along you probably wouldn't notice because the
waitlist will be happily cleared and look valid, but P3's entry gets
lost and then it hangs forever.One solution is to teach ConditionVariableCancelSleep to check if
we're still actually there first. That can be done in O(1) time by
clearing nodes' next and prev pointers when deleting, so that we can
rely on that in a new proclist_contains() function. See attached.
How about instead using cvSleeping to test this? Suppose we make a
rule that cvSleeping can be changed from false to true only by the
process whose PGPROC it is, and thus no lock is needed, but changing
it from true to false always requires the spinlock.
3. The following schedule corrupts the waitlist by trying to insert
something into it that is already in it:P1: ConditionVariablePrepareToSleep: push self onto waitlist
P1: <check user condition, decide to sleep>
P1: ConditionVariableSleep
P1: ConditionVariablePrepareToSleep: push self onto waitlist (!)Nodes before and after self's pre-existing position can be forgotten
when self's node is pushed to the front of the list. That can be
fixed by making ConditionVariablePrepareToSleep also check if we're
already in the list.
OK.
4. The waitlist is handled LIFO-ly. Would it be better for the first
guy to start waiting to be woken up first, like we do for locks? The
Pthreads equivalent says that it depends on "scheduling policy". I
don't know if it matters much, just an observation.
I don't know whether this matters. It's possible that FIFO is a
better policy; I don't really care.
5. The new proclist function you added is the first to work in terms
of PGPROC* rather than procno. Maybe the whole interface should work
with either PGPROC pointers or procnos? No strong view.
Hmm, maybe so. But wouldn't any caller translate to a PGPROC * straight off?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2016-08-11 21:27:45 -0400, Robert Haas wrote:
On Thu, Aug 11, 2016 at 6:37 PM, Peter Geoghegan <pg@heroku.com> wrote:
I notice that you acquire a spinlock within the implementation of
condition variables. Is it worth any effort to consolidate the number
of spinlock acquisitions? In other words, maybe the most common idioms
should be baked into the ConditionVariable interface, which could save
callers from having to use their own mutex variable.One thing to keep in mind is that spinlocks are extremely fast as long
as you don't have too many processes contending for them.
That's one of the conditions. The other is that the system as a whole is
not overcommitted. Because then the chance of processes being put to
sleep inside a spinlock increases.
With
parallel groups (or I/O-in-progress wait queues) of single digit
number of processes, I doubt that consolidating the spinlock
acquisitions will produce any measurable benefit. If we get to the
point of having parallel groups containing scores of processes, that
could change.
And we have no measures to manage systemwide load with paralellism yet,
I think the issue is a bit more general than the quoted paragraph.
But I also think we shouldn't yet worry about it. It seems likely that
the actual critical bottleneck is elsewhere for now.
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Aug 15, 2016 at 10:35 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Don't you need to set proc->cvSleeping = false in ConditionVariableSignal?
I poked at this a bit... OK, a lot... and have some feedback:
1. As above, we need to clear cvSleeping before setting the latch.
Right, OK.
I have independently faced this problem while using your patch and for
now I have updated my local copy. If possible, please send an updated
patch as this patch could be used for development of various
parallelism projects.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Sep 5, 2016 at 3:17 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Aug 15, 2016 at 10:35 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Don't you need to set proc->cvSleeping = false in ConditionVariableSignal?
I poked at this a bit... OK, a lot... and have some feedback:
1. As above, we need to clear cvSleeping before setting the latch.
Right, OK.
I have independently faced this problem while using your patch and for
now I have updated my local copy. If possible, please send an updated
patch as this patch could be used for development of various
parallelism projects.
Thomas already posted an updated patch in the same message where he
reported the problem.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Sep 6, 2016 at 5:29 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, Sep 5, 2016 at 3:17 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Aug 15, 2016 at 10:35 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Don't you need to set proc->cvSleeping = false in ConditionVariableSignal?
I poked at this a bit... OK, a lot... and have some feedback:
1. As above, we need to clear cvSleeping before setting the latch.
Right, OK.
I have independently faced this problem while using your patch and for
now I have updated my local copy. If possible, please send an updated
patch as this patch could be used for development of various
parallelism projects.Thomas already posted an updated patch in the same message where he
reported the problem.
Oops, I missed that, will use the same.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 11, 2016 at 2:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Another approach to the problem is to use a latch wait loop. That
almost works. Interrupts can be serviced, and you can recheck shared
memory to see whether the condition for proceeding is satisfied after
each iteration of the loop. There's only one problem: when you do
something that might cause the condition to be satisfied for other
waiting backends, you need to set their latch - but you don't have an
easy way to know exactly which processes are waiting, so how do you
call SetLatch? I originally thought of adding a function like
SetAllLatches(ParallelContext *) and maybe that can work, but then I
had what I think is a better idea, which is to introduce a notion of
condition variables.
I don't see a CF entry for this. Are you planning to work on this
again soon, Robert?
I have an eye on this patch due to my work on parallel CREATE INDEX.
It would be nice to have some rough idea of when you intend to commit
this.
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Sep 13, 2016 at 10:55 PM, Peter Geoghegan <pg@heroku.com> wrote:
On Thu, Aug 11, 2016 at 2:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Another approach to the problem is to use a latch wait loop. That
almost works. Interrupts can be serviced, and you can recheck shared
memory to see whether the condition for proceeding is satisfied after
each iteration of the loop. There's only one problem: when you do
something that might cause the condition to be satisfied for other
waiting backends, you need to set their latch - but you don't have an
easy way to know exactly which processes are waiting, so how do you
call SetLatch? I originally thought of adding a function like
SetAllLatches(ParallelContext *) and maybe that can work, but then I
had what I think is a better idea, which is to introduce a notion of
condition variables.I don't see a CF entry for this. Are you planning to work on this
again soon, Robert?I have an eye on this patch due to my work on parallel CREATE INDEX.
It would be nice to have some rough idea of when you intend to commit
this.
I basically figured I would commit it when and if it became clear that
it'd get good use in some other patch which was on the road to being
committed. I don't think it needs much work, just the assurance that
it will get some use.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Maybe I can leave it up to you to determine if that applies in the context
of my parallel create index patch. You are the obvious candidate to review
that patch anyway, of course.
--
Peter Geoghegan
On Mon, Aug 15, 2016 at 5:58 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
Please find attached a -v2 of your patch which includes suggestions
1-3 above.
Here's a rebased patch. ConditionVariableSleep now takes
wait_event_info. Anyone using this in patches for core probably needs
to add enumerators to the WaitEventXXX enums in pgstat.h to describe
their wait points.
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
condition-variable-v3.patchapplication/octet-stream; name=condition-variable-v3.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e11b229..0402fc3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -45,6 +45,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -2472,6 +2473,9 @@ AbortTransaction(void)
/* Reset WAL record construction state */
XLogResetInsertion();
+ /* Cancel condition variable sleep */
+ ConditionVariableCancelSleep();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 3870a4d..5c5ba7b 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -33,6 +33,7 @@
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
@@ -536,6 +537,7 @@ static void
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
}
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index c3f3356..a31d44e 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -46,6 +46,7 @@
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -187,6 +188,7 @@ BackgroundWriterMain(void)
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 397267c..92b0a94 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -271,6 +272,7 @@ CheckpointerMain(void)
* files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 67dcff6..7803af4 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -50,6 +50,7 @@
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -167,6 +168,7 @@ WalWriterMain(void)
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0f3ced2..52411b5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,6 +66,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index cd6ec73..e1b787e 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
- s_lock.o predicate.o
+ s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644
index 0000000..2a29e01
--- /dev/null
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -0,0 +1,164 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ * Implementation of condition variables. Condition variables provide
+ * a way for one process to wait until a specific condition occurs,
+ * without needing to know the specific identity of the process for
+ * which they are waiting. Waits for condition variables can be
+ * interrupted, unlike LWLock waits. Condition variables are safe
+ * to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+ SpinLockInit(&cv->mutex);
+ proclist_init(&cv->wakeup);
+}
+
+/*
+ * Add ourselves to the wait queue for a condition variable and mark
+ * ourselves as sleeping.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+ int pgprocno = MyProc->pgprocno;
+
+ /*
+ * It's not legal to prepare a sleep until the previous sleep has been
+ * completed or cancelled.
+ */
+ Assert(cv_sleep_target == NULL);
+
+ /* Record the condition variable on which we will sleep. */
+ cv_sleep_target = cv;
+
+ /* Mark myself as sleeping. */
+ MyProc->cvSleeping = true;
+
+ /* Add myself to the wait queue. */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+ proclist_push_head(&cv->wakeup, pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+}
+
+/*
+ * Sleeping on a condition variable is extremely simple. We just repeatedly
+ * wait on our latch until someone clears our cvSleeping flag. This may
+ * even happen immediately, since a signal or broadcast operation could have
+ * happened after we prepared to sleep and before we reach this function.
+ *
+ * Supply a value from one of the WaitEventXXX enums defined in pgstat.h to
+ * control the contents of pg_stat_activity's wait_event_type and wait_event
+ * columns while waiting.
+ */
+void
+ConditionVariableSleep(uint32 wait_event_info)
+{
+ Assert(cv_sleep_target != NULL);
+
+ while (MyProc->cvSleeping)
+ {
+ CHECK_FOR_INTERRUPTS();
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1, wait_event_info);
+ ResetLatch(&MyProc->procLatch);
+ }
+
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Cancel any pending sleep operation. We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+ ConditionVariable *cv = cv_sleep_target;
+
+ if (cv_sleep_target == NULL)
+ return;
+
+ SpinLockAcquire(&cv->mutex);
+ if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ MyProc->cvSleeping = false;
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+ PGPROC *proc = NULL;
+
+ /* Remove the first process from the wakeup queue (if any). */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_is_empty(&cv->wakeup))
+ proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* If we found someone sleeping, set their latch to wake them up. */
+ if (proc != NULL)
+ {
+ proc->cvSleeping = false;
+ SetLatch(&proc->procLatch);
+ return true;
+ }
+
+ /* No sleeping processes. */
+ return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+ int nwoken = 0;
+
+ /*
+ * Let's just do this the dumbest way possible. We could try to dequeue
+ * all the sleepers at once to save spinlock cycles, but it's a bit hard
+ * to get that right in the face of possible sleep cancellations, and
+ * we don't want to loop holding the mutex.
+ */
+ while (ConditionVariableSignal(cv))
+ ++nwoken;
+
+ return nwoken;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index dd76094..0f3930b 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -43,6 +43,7 @@
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -802,6 +803,9 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -907,6 +911,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644
index 0000000..f88cfb1
--- /dev/null
+++ b/src/include/storage/condition_variable.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ * Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true. Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable. In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+ slock_t mutex;
+ proclist_head wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable. In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met. If not, the process should then sleep. If so,
+ * it should cancel the sleep. A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again. If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition. Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(uint32 wait_event_info);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif /* CONDITION_VARIABLE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7dc8dac..c9c31e9 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -115,6 +115,10 @@ struct PGPROC
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
+ /* Support for condition variables. */
+ bool cvSleeping; /* true if sleeping on a condition variable */
+ proclist_node cvWaitLink; /* position in CV wait list */
+
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 2013a40..b14e8f8 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->tail != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->next = list->head;
proclist_node_get(node->next, node_offset)->prev = procno;
node->prev = INVALID_PGPROCNO;
@@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
- * Insert a node a the end of a list.
+ * Insert a node at the end of a list.
*/
static inline void
proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
@@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->head != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->prev = list->tail;
proclist_node_get(node->prev, node_offset)->next = procno;
node->next = INVALID_PGPROCNO;
@@ -117,6 +121,52 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
list->tail = node->prev;
else
proclist_node_get(node->next, node_offset)->prev = node->prev;
+
+ node->next = node->prev = INVALID_PGPROCNO;
+}
+
+/*
+ * Check if a node is currently in a list. It must be known that the node is
+ * not in any _other_ proclist that uses the same proclist_node, so that the
+ * only possibilities are that it is in this list or none.
+ */
+static inline bool
+proclist_contains_offset(proclist_head *list, int procno,
+ size_t node_offset)
+{
+ proclist_node *node = proclist_node_get(procno, node_offset);
+
+ /*
+ * If this node has never been a member of a cv list, then it will contain
+ * zero before and after us in the list. Circular lists are not allowed
+ * so this condition is not confusable with a real pgprocno 0.
+ */
+ if (node->prev == 0 && node->next == 0)
+ return false;
+
+ /* If there is a previous node, then this node must be in the list. */
+ if (node->prev != INVALID_PGPROCNO)
+ return true;
+
+ /*
+ * There is no previous node, so the only way this node can be in the list
+ * is if it's the head node.
+ */
+ return list->head == procno;
+}
+
+/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+ PGPROC *proc;
+
+ Assert(!proclist_is_empty(list));
+ proc = GetPGProcByNumber(list->head);
+ proclist_delete_offset(list, list->head, node_offset);
+ return proc;
}
/*
@@ -129,6 +179,10 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+ proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
+#define proclist_contains(list, procno, link_member) \
+ proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current
On Tue, Oct 4, 2016 at 12:12 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
Here's a rebased patch. ConditionVariableSleep now takes
wait_event_info. Anyone using this in patches for core probably needs
to add enumerators to the WaitEventXXX enums in pgstat.h to describe
their wait points.
I think that there are at least 2 patches from EDB people that are
already dependent on this one. I myself could easily adopt parallel
CREATE INDEX to use it, too. Why the continued delay in committing it?
I think it's fairly clear that this mechanism is widely useful.
--
Peter Geoghegan
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Oct 4, 2016 at 3:12 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
On Mon, Aug 15, 2016 at 5:58 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:Please find attached a -v2 of your patch which includes suggestions
1-3 above.Here's a rebased patch. ConditionVariableSleep now takes
wait_event_info. Anyone using this in patches for core probably needs
to add enumerators to the WaitEventXXX enums in pgstat.h to describe
their wait points.
So, in my original patch, it was intended that cvSleeping was the
definitive source of truth as to whether a particular backend is
committed to sleeping or not. That had some bugs, I guess, but in
your version, there are now two sources of truth. On the one hand,
there's still cvSleeping. On the other hand, there's now also whether
proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink) returns
true.
I'm not sure whether that causes any serious problem. It seems
possible, for example, that ConditionVariableSignal() could clear the
cvSleeping flag for a backend that's now waiting for some OTHER
condition variable, because once it pops the victim off the list and
releases the spinlock, the other backend could meanwhile
ConditionVariableCancelSleep() and then do anything it likes,
including sleep on some other condition variable. Now I think that
the worst thing that will happen is that the other backend will
receive a spurious wakeup, but I'm not quite sure. The whole point of
having cvSleeping in the first place is so that we don't get spurious
wakeups just because somebody happens to set your process latch, so it
seems a bit unfortunate that it doesn't actually prevent that from
happening.
I wonder if we shouldn't try to create the invariant that when the CV
mutex is not help, the state of cvSleeping has to be true if we're in
the proclist and false if we're not. So ConditionVariableSignal()
would clear the flag before releasing the spinlock, for example. Then
I think we wouldn't need proclist_contains().
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Oct 28, 2016 at 9:38 AM, Robert Haas <robertmhaas@gmail.com> wrote:
On Tue, Oct 4, 2016 at 3:12 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:On Mon, Aug 15, 2016 at 5:58 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:Please find attached a -v2 of your patch which includes suggestions
1-3 above.Here's a rebased patch. ConditionVariableSleep now takes
wait_event_info. Anyone using this in patches for core probably needs
to add enumerators to the WaitEventXXX enums in pgstat.h to describe
their wait points.So, in my original patch, it was intended that cvSleeping was the
definitive source of truth as to whether a particular backend is
committed to sleeping or not. That had some bugs, I guess, but in
your version, there are now two sources of truth. On the one hand,
there's still cvSleeping. On the other hand, there's now also whether
proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink) returns
true.I'm not sure whether that causes any serious problem. It seems
possible, for example, that ConditionVariableSignal() could clear the
cvSleeping flag for a backend that's now waiting for some OTHER
condition variable, because once it pops the victim off the list and
releases the spinlock, the other backend could meanwhile
ConditionVariableCancelSleep() and then do anything it likes,
including sleep on some other condition variable. Now I think that
the worst thing that will happen is that the other backend will
receive a spurious wakeup, but I'm not quite sure. The whole point of
having cvSleeping in the first place is so that we don't get spurious
wakeups just because somebody happens to set your process latch, so it
seems a bit unfortunate that it doesn't actually prevent that from
happening.I wonder if we shouldn't try to create the invariant that when the CV
mutex is not help, the state of cvSleeping has to be true if we're in
the proclist and false if we're not. So ConditionVariableSignal()
would clear the flag before releasing the spinlock, for example. Then
I think we wouldn't need proclist_contains().
Yeah, that'd work. ConditionVariableSleep would need to hold the
spinlock while checking cvSleeping (otherwise there is race case where
another process sets it to false but this process doesn't see that yet
and then waits for a latch-set which never arrives). It's not the end
of the world but it seems unfortunate to have to acquire and release
the spinlock in ConditionVariablePrepareToSleep and then immediately
again in ConditionVariableSleep.
I was going to code that up but I read this and had another idea:
http://stackoverflow.com/questions/8594591/why-does-pthread-cond-wait-have-spurious-wakeups
I realise that you didn't actually say you wanted to guarantee no
spurious wakeups. But since the client code already has to check its
own exit condition, why bother adding a another layer of looping?
Sprurious latch sets must be super rare, but even if they aren't, what
do you save by filtering them here? In this situation you already got
woken from a deep slumbler and scheduled back onto the CPU, so it
hardly matters whether you go around again in that loop or the
client's loop. We could make things really simple instead: get rid of
cvSleeping, have ConditionVariablePrepareToSleep reset the latch, then
have ConditionVariableSleep wait for the latch to be set just once (no
loop). Then we'd document that spurious wakeups are possible so the
caller must write a robust predicate loop, exactly as you already
showed in your first message. We'd need to keep that
proclist_contains stuff to avoid corrupting the list.
proclist_contains would be the one source of truth for whether you're
in the waitlist, and the client code's predicate loop would contain
the one source of truth for whether the condition it is waiting for
has been reached.
Thoughts?
--
Thomas Munro
http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Nov 21, 2016 at 7:10 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
I wonder if we shouldn't try to create the invariant that when the CV
mutex is not help, the state of cvSleeping has to be true if we're in
the proclist and false if we're not. So ConditionVariableSignal()
would clear the flag before releasing the spinlock, for example. Then
I think we wouldn't need proclist_contains().Yeah, that'd work. ConditionVariableSleep would need to hold the
spinlock while checking cvSleeping (otherwise there is race case where
another process sets it to false but this process doesn't see that yet
and then waits for a latch-set which never arrives). It's not the end
of the world but it seems unfortunate to have to acquire and release
the spinlock in ConditionVariablePrepareToSleep and then immediately
again in ConditionVariableSleep.I was going to code that up but I read this and had another idea:
http://stackoverflow.com/questions/8594591/why-does-pthread-cond-wait-have-spurious-wakeups
I realise that you didn't actually say you wanted to guarantee no
spurious wakeups. But since the client code already has to check its
own exit condition, why bother adding a another layer of looping?
Sprurious latch sets must be super rare, but even if they aren't, what
do you save by filtering them here? In this situation you already got
woken from a deep slumbler and scheduled back onto the CPU, so it
hardly matters whether you go around again in that loop or the
client's loop. We could make things really simple instead: get rid of
cvSleeping, have ConditionVariablePrepareToSleep reset the latch, then
have ConditionVariableSleep wait for the latch to be set just once (no
loop). Then we'd document that spurious wakeups are possible so the
caller must write a robust predicate loop, exactly as you already
showed in your first message. We'd need to keep that
proclist_contains stuff to avoid corrupting the list.
proclist_contains would be the one source of truth for whether you're
in the waitlist, and the client code's predicate loop would contain
the one source of truth for whether the condition it is waiting for
has been reached.
I don't think we can rely on spurious latch set events being rare.
There are an increasing number of things that set the process latch,
and there will very likely be more in the future. For instance, the
arrival of tuples from a parallel worker associated with our session
will set the process latch. Workers starting or dying will set the
process latch. So my inclination was to try to guarantee no spurious
wakeups at all, but possibly a softer guarantee that makes them
unlikely would be sufficient.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Aug 11, 2016 at 5:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
So, in my
implementation, a condition variable wait loop looks like this:for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();
I have what I think is a better idea. Let's get rid of
ConditionVariablePrepareToSleep(cv) and instead tell users of this
facility to write the loop this way:
for (;;)
{
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep(cv);
}
ConditionVariableCancelSleep();
ConditionVariableSleep(cv) will check whether the current process is
already on the condition variable's waitlist. If so, it will sleep;
if not, it will add the process and return without sleeping.
It may seem odd that ConditionVariableSleep(cv) doesn't necessary
sleep, but this design has a significant advantage: we avoid
manipulating the wait-list altogether in the case where the condition
is already satisfied when we enter the loop. That's more like what we
already do in lwlock.c: we try to grab the lock first; if we can't, we
add ourselves to the wait-list and retry; if we then get the lock
after all we have to recheck whether we can get the lock and remove
ourselves from the wait-list if so. Of course, there is some cost: if
we do have to wait, we'll end up checking the condition twice before
actually going to sleep. However, it's probably smart to bet that
actually needing to sleep is fairly infrequent, just as in lwlock.c.
Thoughts?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello,
At Mon, 21 Nov 2016 15:57:47 -0500, Robert Haas <robertmhaas@gmail.com> wrote in <CA+TgmobFjwcFEiq8j+fvH5CdXHdVJffmemNLq8MqFesg2+4Gwg@mail.gmail.com>
On Thu, Aug 11, 2016 at 5:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
So, in my
implementation, a condition variable wait loop looks like this:for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();I have what I think is a better idea. Let's get rid of
ConditionVariablePrepareToSleep(cv) and instead tell users of this
facility to write the loop this way:for (;;)
{
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep(cv);
}
ConditionVariableCancelSleep();
It seems rather a common way to wait on a condition variable, in
shorter,
| while (condition for which we are waiting is *not* satisfied)
| ConditionVariableSleep(cv);
| ConditionVariableCancelSleep();
ConditionVariableSleep(cv) will check whether the current process is
already on the condition variable's waitlist. If so, it will sleep;
if not, it will add the process and return without sleeping.It may seem odd that ConditionVariableSleep(cv) doesn't necessary
sleep, but this design has a significant advantage: we avoid
manipulating the wait-list altogether in the case where the condition
is already satisfied when we enter the loop. That's more like what we
The condition check is done far faster than maintaining the
wait-list for most cases, I believe.
already do in lwlock.c: we try to grab the lock first; if we can't, we
add ourselves to the wait-list and retry; if we then get the lock
after all we have to recheck whether we can get the lock and remove
ourselves from the wait-list if so. Of course, there is some cost: if
we do have to wait, we'll end up checking the condition twice before
actually going to sleep. However, it's probably smart to bet that
actually needing to sleep is fairly infrequent, just as in lwlock.c.Thoughts?
FWIW, I agree to the assumption.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Nov 22, 2016 at 3:05 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hello,
At Mon, 21 Nov 2016 15:57:47 -0500, Robert Haas <robertmhaas@gmail.com> wrote in <CA+TgmobFjwcFEiq8j+fvH5CdXHdVJffmemNLq8MqFesg2+4Gwg@mail.gmail.com>
On Thu, Aug 11, 2016 at 5:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
So, in my
implementation, a condition variable wait loop looks like this:for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();I have what I think is a better idea. Let's get rid of
ConditionVariablePrepareToSleep(cv) and instead tell users of this
facility to write the loop this way:for (;;)
{
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep(cv);
}
ConditionVariableCancelSleep();It seems rather a common way to wait on a condition variable, in
shorter,| while (condition for which we are waiting is *not* satisfied)
| ConditionVariableSleep(cv);
| ConditionVariableCancelSleep();
Ok, let's show it that way.
ConditionVariableSleep(cv) will check whether the current process is
already on the condition variable's waitlist. If so, it will sleep;
if not, it will add the process and return without sleeping.It may seem odd that ConditionVariableSleep(cv) doesn't necessary
sleep, but this design has a significant advantage: we avoid
manipulating the wait-list altogether in the case where the condition
is already satisfied when we enter the loop. That's more like what weThe condition check is done far faster than maintaining the
wait-list for most cases, I believe.already do in lwlock.c: we try to grab the lock first; if we can't, we
add ourselves to the wait-list and retry; if we then get the lock
after all we have to recheck whether we can get the lock and remove
ourselves from the wait-list if so. Of course, there is some cost: if
we do have to wait, we'll end up checking the condition twice before
actually going to sleep. However, it's probably smart to bet that
actually needing to sleep is fairly infrequent, just as in lwlock.c.Thoughts?
FWIW, I agree to the assumption.
Here's a version that works that way, though it allows you to call
ConditionVariablePrepareToSleep *optionally* before you enter your
loop, in case you expect to have to wait and would rather avoid the
extra loop. Maybe there isn't much point in exposing that though,
since your condition test should be fast and waiting is the slow path,
but we don't really really know what your condition test is. I
thought about that because my use case (barrier.c) does in fact expect
to hit the wait case more often than not. If that seems pointless
then perhaps ConditionVariablePrepareToSleep should become static and
implicit. This version does attempt to suppress spurious returns, a
bit, using proclist_contains. No more cvSleeping.
It's possible that future users will want a version with a timeout, or
multiplexed with IO, in which case there would be some interesting
questions about how this should interact with WaitEventSet. It also
seems like someone might eventually want to handle postmaster death.
Perhaps there shoul eventually be a way to tell WaitEventSet that
you're waiting for a CV so these things can be multiplexed without
exposing the fact that it's done internally with latches.
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
condition-variable-v4.patchapplication/octet-stream; name=condition-variable-v4.patchDownload
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9580596..d643216 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -45,6 +45,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -2472,6 +2473,9 @@ AbortTransaction(void)
/* Reset WAL record construction state */
XLogResetInsertion();
+ /* Cancel condition variable sleep */
+ ConditionVariableCancelSleep();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 3870a4d..5c5ba7b 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -33,6 +33,7 @@
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
@@ -536,6 +537,7 @@ static void
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
}
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index c3f3356..a31d44e 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -46,6 +46,7 @@
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -187,6 +188,7 @@ BackgroundWriterMain(void)
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 397267c..92b0a94 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -271,6 +272,7 @@ CheckpointerMain(void)
* files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 67dcff6..7803af4 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -50,6 +50,7 @@
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -167,6 +168,7 @@ WalWriterMain(void)
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..aa42d59 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,6 +66,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index cd6ec73..e1b787e 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
- s_lock.o predicate.o
+ s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644
index 0000000..2710b0b
--- /dev/null
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -0,0 +1,225 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ * Implementation of condition variables. Condition variables provide
+ * a way for one process to wait until a specific condition occurs,
+ * without needing to know the specific identity of the process for
+ * which they are waiting. Waits for condition variables can be
+ * interrupted, unlike LWLock waits. Condition variables are safe
+ * to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+#include "utils/memutils.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/* Reusable WaitEventSet. */
+static WaitEventSet *cv_wait_event_set = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+ SpinLockInit(&cv->mutex);
+ proclist_init(&cv->wakeup);
+}
+
+/*
+ * Prepare to wait on a given condition variable. This can optionally be
+ * called before entering a test/sleep loop. Alternatively, the call to
+ * ConditionVariablePrepareToSleep can be omitted. The only advantage of
+ * calling ConditionVariablePrepareToSleep is that it avoids an initial
+ * double-test of the user's predicate in the case that we need to wait.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+ int pgprocno = MyProc->pgprocno;
+
+ /*
+ * It's not legal to prepare a sleep until the previous sleep has been
+ * completed or canceled.
+ */
+ Assert(cv_sleep_target == NULL);
+
+ /* Record the condition variable on which we will sleep. */
+ cv_sleep_target = cv;
+
+ /* Create a reusable WaitEventSet. */
+ if (cv_wait_event_set == NULL)
+ {
+ cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
+ AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
+ &MyProc->procLatch, NULL);
+ }
+
+ /* Add myself to the wait queue. */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+ proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* Reset my latch before entering the caller's predicate loop. */
+ ResetLatch(&MyProc->procLatch);
+}
+
+/*--------------------------------------------------------------------------
+ * Wait for the given condition variable to be signaled. This should be
+ * called in a predicate loop that tests for a specfic exit condition and
+ * otherwise sleeps, like so:
+ *
+ * ConditionVariablePrepareToSleep(cv); [optional]
+ * while (condition for which we are waiting is not true)
+ * ConditionVariableSleep(cv, wait_event_info);
+ * ConditionVariableCancelSleep();
+ *
+ * Supply a value from one of the WaitEventXXX enums defined in pgstat.h to
+ * control the contents of pg_stat_activity's wait_event_type and wait_event
+ * columns while waiting.
+ *-------------------------------------------------------------------------*/
+void
+ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
+{
+ WaitEvent event;
+ bool done = false;
+
+ /*
+ * If the caller didn't prepare to sleep explicitly, then do so now and
+ * return immediately. The caller's predicate loop should immediately
+ * call again if its exit condition is not yet met. This initial spurious
+ * return can be avoided by calling ConditionVariablePrepareToSleep(cv)
+ * first. Whether it's worth doing that depends on whether you expect the
+ * condition to be met initially, in which case skipping the prepare
+ * allows you to skip manipulation of the wait list, or not met intiailly,
+ * in which case preparing first allows you to skip a spurious test of the
+ * caller's exit condition.
+ */
+ if (cv_sleep_target == NULL)
+ {
+ ConditionVariablePrepareToSleep(cv);
+ return;
+ }
+
+ /* Any earlier condition variable sleep must have been canceled. */
+ Assert(cv_sleep_target == cv);
+
+ while (!done)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Wait for latch to be set. We don't care about the result because
+ * our contract permits spurious returns.
+ */
+ WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
+
+ /* Reset latch before testing whether we can return. */
+ ResetLatch(&MyProc->procLatch);
+
+ /*
+ * If this process has been taken out of the wait list, then we know
+ * that is has been signaled by ConditionVariableSignal. We put it
+ * back into the wait list, so we don't miss any further signals while
+ * the caller's loop checks its condition. If it hasn't been taken
+ * out of the wait list, then the latch must have been set by
+ * something other than ConditionVariableSignal; though we don't
+ * guarantee not to return spuriously, we'll avoid these obvious
+ * cases.
+ */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ {
+ done = true;
+ proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+ }
+}
+
+/*
+ * Cancel any pending sleep operation. We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+ ConditionVariable *cv = cv_sleep_target;
+
+ if (cv == NULL)
+ return;
+
+ SpinLockAcquire(&cv->mutex);
+ if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+ PGPROC *proc = NULL;
+
+ /* Remove the first process from the wakeup queue (if any). */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_is_empty(&cv->wakeup))
+ proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* If we found someone sleeping, set their latch to wake them up. */
+ if (proc != NULL)
+ {
+ SetLatch(&proc->procLatch);
+ return true;
+ }
+
+ /* No sleeping processes. */
+ return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+ int nwoken = 0;
+
+ /*
+ * Let's just do this the dumbest way possible. We could try to dequeue
+ * all the sleepers at once to save spinlock cycles, but it's a bit hard
+ * to get that right in the face of possible sleep cancelations, and
+ * we don't want to loop holding the mutex.
+ */
+ while (ConditionVariableSignal(cv))
+ ++nwoken;
+
+ return nwoken;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b201631..83e9ca1 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -43,6 +43,7 @@
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -802,6 +803,9 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -907,6 +911,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644
index 0000000..2d4429b
--- /dev/null
+++ b/src/include/storage/condition_variable.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ * Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true. Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable. In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+ slock_t mutex;
+ proclist_head wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * Sleep on a condition variable. In order to avoid race conditions, a
+ * process should first prepare to sleep, then recheck whether the desired
+ * condition has been met. If not, the process should then sleep. If so,
+ * it should cancel the sleep. A non-local exit via ERROR or FATAL will
+ * automatically cancel a pending sleep.
+ *
+ * After sleeping, a process may or may not need to recheck the condition
+ * and possibly sleep again. If the condition variable is never signalled
+ * or broadcast except when the condition is guaranteed to hold, then
+ * there is no need to recheck the condition. Otherwise, it must be
+ * rechecked.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+extern void ConditionVariableSleep(ConditionVariable *, uint32 wait_event_info);
+extern void ConditionVariableCancelSleep(void);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif /* CONDITION_VARIABLE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7dc8dac..6fa7125 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -115,6 +115,9 @@ struct PGPROC
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
+ /* Support for condition variables. */
+ proclist_node cvWaitLink; /* position in CV wait list */
+
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 2013a40..b14e8f8 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->tail != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->next = list->head;
proclist_node_get(node->next, node_offset)->prev = procno;
node->prev = INVALID_PGPROCNO;
@@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
- * Insert a node a the end of a list.
+ * Insert a node at the end of a list.
*/
static inline void
proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
@@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->head != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->prev = list->tail;
proclist_node_get(node->prev, node_offset)->next = procno;
node->next = INVALID_PGPROCNO;
@@ -117,6 +121,52 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
list->tail = node->prev;
else
proclist_node_get(node->next, node_offset)->prev = node->prev;
+
+ node->next = node->prev = INVALID_PGPROCNO;
+}
+
+/*
+ * Check if a node is currently in a list. It must be known that the node is
+ * not in any _other_ proclist that uses the same proclist_node, so that the
+ * only possibilities are that it is in this list or none.
+ */
+static inline bool
+proclist_contains_offset(proclist_head *list, int procno,
+ size_t node_offset)
+{
+ proclist_node *node = proclist_node_get(procno, node_offset);
+
+ /*
+ * If this node has never been a member of a cv list, then it will contain
+ * zero before and after us in the list. Circular lists are not allowed
+ * so this condition is not confusable with a real pgprocno 0.
+ */
+ if (node->prev == 0 && node->next == 0)
+ return false;
+
+ /* If there is a previous node, then this node must be in the list. */
+ if (node->prev != INVALID_PGPROCNO)
+ return true;
+
+ /*
+ * There is no previous node, so the only way this node can be in the list
+ * is if it's the head node.
+ */
+ return list->head == procno;
+}
+
+/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+ PGPROC *proc;
+
+ Assert(!proclist_is_empty(list));
+ proc = GetPGProcByNumber(list->head);
+ proclist_delete_offset(list, list->head, node_offset);
+ return proc;
}
/*
@@ -129,6 +179,10 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+ proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
+#define proclist_contains(list, procno, link_member) \
+ proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current
At Tue, 22 Nov 2016 17:48:07 +1300, Thomas Munro <thomas.munro@enterprisedb.com> wrote in <CAEepm=2VNfOq3spjSRGgM8WB+-PhfPXFB_adjizUUef9=cVDWQ@mail.gmail.com>
On Tue, Nov 22, 2016 at 3:05 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:Hello,
At Mon, 21 Nov 2016 15:57:47 -0500, Robert Haas <robertmhaas@gmail.com> wrote in <CA+TgmobFjwcFEiq8j+fvH5CdXHdVJffmemNLq8MqFesg2+4Gwg@mail.gmail.com>
On Thu, Aug 11, 2016 at 5:47 PM, Robert Haas <robertmhaas@gmail.com> wrote:
So, in my
implementation, a condition variable wait loop looks like this:for (;;)
{
ConditionVariablePrepareToSleep(cv);
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep();
}
ConditionVariableCancelSleep();I have what I think is a better idea. Let's get rid of
ConditionVariablePrepareToSleep(cv) and instead tell users of this
facility to write the loop this way:for (;;)
{
if (condition for which we are waiting is satisfied)
break;
ConditionVariableSleep(cv);
}
ConditionVariableCancelSleep();It seems rather a common way to wait on a condition variable, in
shorter,| while (condition for which we are waiting is *not* satisfied)
| ConditionVariableSleep(cv);
| ConditionVariableCancelSleep();Ok, let's show it that way.
ConditionVariableSleep(cv) will check whether the current process is
already on the condition variable's waitlist. If so, it will sleep;
if not, it will add the process and return without sleeping.It may seem odd that ConditionVariableSleep(cv) doesn't necessary
sleep, but this design has a significant advantage: we avoid
manipulating the wait-list altogether in the case where the condition
is already satisfied when we enter the loop. That's more like what weThe condition check is done far faster than maintaining the
wait-list for most cases, I believe.already do in lwlock.c: we try to grab the lock first; if we can't, we
add ourselves to the wait-list and retry; if we then get the lock
after all we have to recheck whether we can get the lock and remove
ourselves from the wait-list if so. Of course, there is some cost: if
we do have to wait, we'll end up checking the condition twice before
actually going to sleep. However, it's probably smart to bet that
actually needing to sleep is fairly infrequent, just as in lwlock.c.Thoughts?
FWIW, I agree to the assumption.
Here's a version that works that way, though it allows you to call
ConditionVariablePrepareToSleep *optionally* before you enter your
loop, in case you expect to have to wait and would rather avoid the
extra loop. Maybe there isn't much point in exposing that though,
since your condition test should be fast and waiting is the slow path,
but we don't really really know what your condition test is. I
thought about that because my use case (barrier.c) does in fact expect
to hit the wait case more often than not. If that seems pointless
then perhaps ConditionVariablePrepareToSleep should become static and
implicit. This version does attempt to suppress spurious returns, a
bit, using proclist_contains. No more cvSleeping.
Nice!
It's possible that future users will want a version with a timeout, or
multiplexed with IO, in which case there would be some interesting
questions about how this should interact with WaitEventSet. It also
seems like someone might eventually want to handle postmaster death.
Perhaps there shoul eventually be a way to tell WaitEventSet that
you're waiting for a CV so these things can be multiplexed without
exposing the fact that it's done internally with latches.
Interesting. IMHO, returning on POSTMASTER_DEATH doesn't seem to
harm ordinary use and might be useful right now. CVSleepTimeout()
would be made sooner or later if someone needs. Multiplexed IO is
apparently a matter of WaitEventSet. Waiting CV by WaitEventSet
would be a matter of future.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Nov 21, 2016 at 11:48 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
Here's a version that works that way, though it allows you to call
ConditionVariablePrepareToSleep *optionally* before you enter your
loop, in case you expect to have to wait and would rather avoid the
extra loop. Maybe there isn't much point in exposing that though,
since your condition test should be fast and waiting is the slow path,
but we don't really really know what your condition test is. I
thought about that because my use case (barrier.c) does in fact expect
to hit the wait case more often than not. If that seems pointless
then perhaps ConditionVariablePrepareToSleep should become static and
implicit. This version does attempt to suppress spurious returns, a
bit, using proclist_contains. No more cvSleeping.
This version looks good to me and I have committed it after doing a
bit more work on the comments.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers