From 397b57824507ce7f7281dce03a8e83cb1d2ca2e2 Mon Sep 17 00:00:00 2001 From: "Chao Li (Evan)" Date: Tue, 31 Mar 2026 08:25:12 +0800 Subject: [PATCH v4] Add condition variable support to WaitEventSetWait() WalSndWait() currently combines WaitEventSetWait() with manual ConditionVariablePrepareToSleep() / ConditionVariableCancelSleep() calls. This patch teaches WaitEventSetWait() to cooperate with condition variables directly. A wait-event set can now include a condition variable event, and WaitEventSetWait() takes care of preparing to sleep on the condition variable before blocking and canceling the sleep afterwards. With that in place, WalSndWait() no longer needs to manage the condition-variable sleep state manually, and can express the wakeup source through the wait-event set itself. Author: Chao Li --- contrib/postgres_fdw/postgres_fdw.c | 2 +- src/backend/executor/execAsync.c | 2 +- src/backend/executor/nodeAppend.c | 4 +- src/backend/libpq/be-secure.c | 4 +- src/backend/libpq/pqcomm.c | 12 +- src/backend/postmaster/postmaster.c | 4 +- src/backend/postmaster/syslogger.c | 4 +- src/backend/replication/walsender.c | 17 +-- src/backend/storage/ipc/latch.c | 16 +-- src/backend/storage/ipc/waiteventset.c | 132 +++++++++++++++++- src/backend/storage/lmgr/condition_variable.c | 33 ++++- src/backend/utils/init/miscinit.c | 4 +- src/include/libpq/libpq.h | 3 +- src/include/storage/condition_variable.h | 1 + src/include/storage/waiteventset.h | 7 +- 15 files changed, 197 insertions(+), 48 deletions(-) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index cc8ec24c30e..6ee750e9e21 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -7277,7 +7277,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) Assert(pendingAreq == areq); AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), - NULL, areq); + NULL, NULL, areq); } /* diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index cf7ddbb01f4..e048328f583 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -57,7 +57,7 @@ ExecAsyncRequest(AsyncRequest *areq) * for which it wishes to wait. We expect the node-type specific callback to * make a single call of the following form: * - * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); + * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, NULL, areq); */ void ExecAsyncConfigureWait(AsyncRequest *areq) diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 85c85569b5e..27895b96b50 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -1045,7 +1045,7 @@ ExecAppendAsyncEventWait(AppendState *node) Assert(node->as_eventset == NULL); node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); + NULL, NULL, NULL); /* Give each waiting subplan a chance to add an event. */ i = -1; @@ -1081,7 +1081,7 @@ ExecAppendAsyncEventWait(AppendState *node) * extensions too. */ AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); + MyLatch, NULL, NULL); /* Return at most EVENT_BUFFER_SIZE events in one call. */ if (nevents > EVENT_BUFFER_SIZE) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 617704bb993..0a406a050d9 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -217,7 +217,7 @@ retry: Assert(waitfor); - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL); WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_READ); @@ -342,7 +342,7 @@ retry: Assert(waitfor); - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL); WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_WRITE); diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 4a442f22df6..5d3a5c22db0 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -177,6 +177,7 @@ pq_init(ClientSocket *client_sock) Port *port; int socket_pos PG_USED_FOR_ASSERTS_ONLY; int latch_pos PG_USED_FOR_ASSERTS_ONLY; + int cv_pos PG_USED_FOR_ASSERTS_ONLY; /* allocate the Port struct and copy the ClientSocket contents to it */ port = palloc0_object(Port); @@ -307,11 +308,13 @@ pq_init(ClientSocket *client_sock) FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents); socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, - port->sock, NULL, NULL); + port->sock, NULL, NULL, NULL); latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); + MyLatch, NULL, NULL); + cv_pos = AddWaitEventToSet(FeBeWaitSet, WL_CONDITION_VARIABLE, PGINVALID_SOCKET, + NULL, NULL, NULL); AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, - NULL, NULL); + NULL, NULL, NULL); /* * The event positions match the order we added them, but let's sanity @@ -319,6 +322,7 @@ pq_init(ClientSocket *client_sock) */ Assert(socket_pos == FeBeWaitSetSocketPos); Assert(latch_pos == FeBeWaitSetLatchPos); + Assert(cv_pos == FeBeWaitSetCVPos); return port; } @@ -2063,7 +2067,7 @@ pq_check_connection(void) * It's OK to modify the socket event filter without restoring, because * all FeBeWaitSet socket wait sites do the same. */ - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL, NULL); retry: rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0); diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index ae829747004..28f7c5cd602 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1655,13 +1655,13 @@ ConfigurePostmasterWaitSet(bool accept_connections) pm_wait_set = CreateWaitEventSet(NULL, accept_connections ? (1 + NumListenSockets) : 1); AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, - NULL); + NULL, NULL); if (accept_connections) { for (int i = 0; i < NumListenSockets; i++) AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i], - NULL, NULL); + NULL, NULL, NULL); } } diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index 0c2a7bc8578..4db7df8e4a5 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -338,9 +338,9 @@ SysLoggerMain(const void *startup_data, size_t startup_data_len) * (including the postmaster). */ wes = CreateWaitEventSet(NULL, 2); - AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL, NULL); #ifndef WIN32 - AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL); + AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL, NULL); #endif /* main worker loop */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bad45adb004..ff73fba15f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3979,8 +3979,9 @@ static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) { WaitEvent event; + ConditionVariable *cv = NULL; - ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL, NULL); /* * We use a condition variable to efficiently wake up walsenders in @@ -4000,9 +4001,6 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * release for every iteration, just to wake up only the waiting * walsenders. It makes WalSndWakeup() callers' life easy. * - * XXX: A desirable future improvement would be to add support for CVs - * into WaitEventSetWait(). - * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. * @@ -4011,20 +4009,19 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * the receipt of the LSN. */ if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) - ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + cv = &WalSndCtl->wal_confirm_rcv_cv; else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) - ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); + cv = &WalSndCtl->wal_flush_cv; else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) - ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); + cv = &WalSndCtl->wal_replay_cv; + + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetCVPos, WL_CONDITION_VARIABLE, NULL, cv); if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 && (event.events & WL_POSTMASTER_DEATH)) { - ConditionVariableCancelSleep(); proc_exit(1); } - - ConditionVariableCancelSleep(); } /* diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 7d4f4cf32bb..391b5218f51 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -41,7 +41,7 @@ InitializeLatchWaitSet(void) /* Set up the WaitEventSet used by WaitLatch(). */ LatchWaitSet = CreateWaitEventSet(NULL, 2); latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); + MyLatch, NULL, NULL); Assert(latch_pos == LatchWaitSetLatchPos); /* @@ -51,7 +51,7 @@ InitializeLatchWaitSet(void) if (IsUnderPostmaster) { latch_pos = AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH, - PGINVALID_SOCKET, NULL, NULL); + PGINVALID_SOCKET, NULL, NULL, NULL); Assert(latch_pos == LatchWaitSetPostmasterDeathPos); } } @@ -186,12 +186,12 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout, */ if (!(wakeEvents & WL_LATCH_SET)) latch = NULL; - ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch); + ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch, NULL); if (IsUnderPostmaster) ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos, (wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)), - NULL); + NULL, NULL); if (WaitEventSetWait(LatchWaitSet, (wakeEvents & WL_TIMEOUT) ? timeout : -1, @@ -235,7 +235,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, if (wakeEvents & WL_LATCH_SET) AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET, - latch, NULL); + latch, NULL, NULL); /* Postmaster-managed callers must handle postmaster death somehow. */ Assert(!IsUnderPostmaster || @@ -244,18 +244,18 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster) AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, - NULL, NULL); + NULL, NULL, NULL); if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster) AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); + NULL, NULL, NULL); if (wakeEvents & WL_SOCKET_MASK) { int ev; ev = wakeEvents & WL_SOCKET_MASK; - AddWaitEventToSet(set, ev, sock, NULL, NULL); + AddWaitEventToSet(set, ev, sock, NULL, NULL, NULL); } rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info); diff --git a/src/backend/storage/ipc/waiteventset.c b/src/backend/storage/ipc/waiteventset.c index 0f228e1e7b8..d9849611966 100644 --- a/src/backend/storage/ipc/waiteventset.c +++ b/src/backend/storage/ipc/waiteventset.c @@ -73,6 +73,7 @@ #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/condition_variable.h" #include "storage/latch.h" #include "storage/waiteventset.h" #include "utils/memutils.h" @@ -137,6 +138,14 @@ struct WaitEventSet Latch *latch; int latch_pos; + /* + * If WL_CONDITION_VARIABLE is specified in any wait event, cv is a + * pointer to said condition variable, and cv_pos the offset in the + * ->events array. + */ + ConditionVariable *cv; + int cv_pos; + /* * WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag * is set so that we'll exit immediately if postmaster death is detected, @@ -414,6 +423,7 @@ CreateWaitEventSet(ResourceOwner resowner, int nevents) #endif set->latch = NULL; + set->cv = NULL; set->nevents_space = nevents; set->exit_on_postmaster_death = false; @@ -501,6 +511,11 @@ FreeWaitEventSet(WaitEventSet *set) { /* uses the latch's HANDLE */ } + else if (cur_event->events == WL_CONDITION_VARIABLE) + { + /* uses a dummy HANDLE */ + CloseHandle(set->handles[cur_event->pos + 1]); + } else if (cur_event->events & WL_POSTMASTER_DEATH) { /* uses PostmasterHandle */ @@ -550,6 +565,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set) * platforms, this is the same as WL_SOCKET_READABLE) * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer. * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies + * - WL_CONDITION_VARIABLE: Wait for a condition variable to be signaled. * * Returns the offset in WaitEventSet->events (starting from 0), which can be * used to modify previously added wait events using ModifyWaitEvent(). @@ -568,7 +584,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set) */ int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, - void *user_data) + ConditionVariable *cv, void *user_data) { WaitEvent *event; @@ -596,6 +612,18 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, elog(ERROR, "cannot wait on latch without a specified latch"); } + /* + * When add a CV event, we allow a NULL cv, because it can be set later + * with ModifyWaitEvent. + */ + if (cv) + { + if (set->cv) + elog(ERROR, "cannot wait on more than one condition variable"); + if ((events & WL_CONDITION_VARIABLE) != WL_CONDITION_VARIABLE) + elog(ERROR, "condition variable events only support being set"); + } + /* waiting for socket readiness without a socket indicates a bug */ if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); @@ -624,6 +652,16 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, #endif #endif } + else if (events == WL_CONDITION_VARIABLE) + { + set->cv = cv; + set->cv_pos = event->pos; + event->fd = PGINVALID_SOCKET; +#ifdef WAIT_USE_WIN32 + WaitEventAdjustWin32(set, event); +#endif + return event->pos; + } else if (events == WL_POSTMASTER_DEATH) { #ifndef WIN32 @@ -653,14 +691,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, * 'pos' is the id returned by AddWaitEventToSet. */ void -ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) +ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch, + ConditionVariable *cv) { WaitEvent *event; #if defined(WAIT_USE_KQUEUE) int old_events; #endif - Assert(pos < set->nevents); + Assert(pos >= 0 && pos < set->nevents); event = &set->events[pos]; #if defined(WAIT_USE_KQUEUE) @@ -682,6 +721,21 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) return; } + /* + * Change the condition variable associated with the event. As CV doesn't + * rely on kernal, go ahead with a fast-path and only update the CV + * pointer. + */ + if (event->events == WL_CONDITION_VARIABLE) + { + if (events != WL_CONDITION_VARIABLE) + elog(ERROR, "cannot change event type of a condition variable event"); + if (cv == NULL) + elog(ERROR, "cannot set condition variable to NULL"); + set->cv = cv; + return; + } + /* * If neither the event mask nor the associated latch changes, return * early. That's an important optimization for some sockets, where @@ -991,6 +1045,18 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) Assert(set->latch != NULL); *handle = set->latch->event; } + else if (event->events == WL_CONDITION_VARIABLE) + { + /* + * Condition-variable waits are handled in userspace, but + * WaitForMultipleObjects() still requires a valid HANDLE in every + * slot. + */ + *handle = CreateEvent(NULL, TRUE, FALSE, NULL); + if (*handle == NULL) + elog(ERROR, "failed to create event for condition variable: error code %lu", + GetLastError()); + } else if (event->events == WL_POSTMASTER_DEATH) { *handle = PostmasterHandle; @@ -1061,6 +1127,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout, else INSTR_TIME_SET_ZERO(start_time); + if (set->cv != NULL) + ConditionVariablePrepareToSleep(set->cv); + pgstat_report_wait_start(wait_event_info); #ifndef WIN32 @@ -1072,6 +1141,8 @@ WaitEventSetWait(WaitEventSet *set, long timeout, while (returned_events == 0) { int rc; + bool cv_signaled = false; + bool cv_maybe_signaled = false; /* * Check if the latch is set already first. If so, we either exit @@ -1133,6 +1204,39 @@ WaitEventSetWait(WaitEventSet *set, long timeout, timeout = 0; } + if (set->cv) + cv_signaled = ConditionVariableSignaled(set->cv); + + if (set->cv && !cv_signaled) + cv_maybe_signaled = true; + + if (set->cv && cv_signaled) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->pos = set->cv_pos; + occurred_events->user_data = + set->events[set->cv_pos].user_data; + occurred_events->events = WL_CONDITION_VARIABLE; + occurred_events++; + returned_events++; + + if (returned_events == nevents) + { + /* could have been set above */ + if (set->latch && set->latch->maybe_sleeping) + set->latch->maybe_sleeping = false; + break; /* output buffer full already */ + } + + /* + * Even though we already have an event, we'll poll just once with + * zero timeout to see what non-latch events we can fit into the + * output buffer at the same time. + */ + cur_timeout = 0; + timeout = 0; + } + /* * Wait for events using the readiness primitive chosen at the top of * this file. If -1 is returned, a timeout has occurred, if 0 we have @@ -1148,8 +1252,26 @@ WaitEventSetWait(WaitEventSet *set, long timeout, if (rc == -1) break; /* timeout occurred */ else + { returned_events += rc; + /* Check CV again after waiting if not done before waiting */ + if (set->cv && cv_maybe_signaled && returned_events < nevents && + ConditionVariableSignaled(set->cv)) + { + occurred_events->fd = PGINVALID_SOCKET; + occurred_events->pos = set->cv_pos; + occurred_events->user_data = + set->events[set->cv_pos].user_data; + occurred_events->events = WL_CONDITION_VARIABLE; + occurred_events++; + returned_events++; + + if (returned_events == nevents) + break; /* output buffer full already */ + } + } + /* If we're not done, update cur_timeout for next iteration */ if (returned_events == 0 && timeout >= 0) { @@ -1166,10 +1288,12 @@ WaitEventSetWait(WaitEventSet *set, long timeout, pgstat_report_wait_end(); + if (set->cv != NULL) + ConditionVariableCancelSleep(); + return returned_events; } - #if defined(WAIT_USE_EPOLL) /* diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 1f16b3f7475..c04bc420bcf 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -182,13 +182,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, * by something other than ConditionVariableSignal; though we don't * guarantee not to return spuriously, we'll avoid this obvious case. */ - SpinLockAcquire(&cv->mutex); - if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink)) - { - done = true; - proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink); - } - SpinLockRelease(&cv->mutex); + done = ConditionVariableSignaled(cv); /* * Check for interrupts, and return spuriously if that caused the @@ -217,6 +211,31 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, } } +/* + * Check whether this process was removed from cv's wait list by a CV + * signal/broadcast. If so, re-add it to preserve wakeups while the caller + * checks the predicate. + */ +bool +ConditionVariableSignaled(ConditionVariable *cv) +{ + bool signaled = false; + + /* Ignore probes for CVs we are not currently prepared to sleep on. */ + if (cv_sleep_target != cv) + return false; + + SpinLockAcquire(&cv->mutex); + if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink)) + { + signaled = true; + proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink); + } + SpinLockRelease(&cv->mutex); + + return signaled; +} + /* * Cancel any pending sleep operation. * diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 7ffc808073a..4bac31ec0d9 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -222,7 +222,7 @@ SwitchToSharedLatch(void) if (FeBeWaitSet) ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, - MyLatch); + MyLatch, NULL); /* * Set the shared latch as the local one might have been set. This @@ -249,7 +249,7 @@ SwitchBackToLocalLatch(void) if (FeBeWaitSet) ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, - MyLatch); + MyLatch, NULL); SetLatch(MyLatch); } diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index c9b934d2321..8d13eaf8393 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -65,7 +65,8 @@ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet; #define FeBeWaitSetSocketPos 0 #define FeBeWaitSetLatchPos 1 -#define FeBeWaitSetNEvents 3 +#define FeBeWaitSetCVPos 2 +#define FeBeWaitSetNEvents 4 extern int ListenServerPort(int family, const char *hostName, unsigned short portNumber, const char *unixSocketDir, diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h index 14bd6dd55c0..fb7e4bb67f2 100644 --- a/src/include/storage/condition_variable.h +++ b/src/include/storage/condition_variable.h @@ -57,6 +57,7 @@ extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info); extern bool ConditionVariableCancelSleep(void); +extern bool ConditionVariableSignaled(ConditionVariable *cv); /* * Optionally, ConditionVariablePrepareToSleep can be called before entering diff --git a/src/include/storage/waiteventset.h b/src/include/storage/waiteventset.h index 5341267f0a0..f3d569a6aa0 100644 --- a/src/include/storage/waiteventset.h +++ b/src/include/storage/waiteventset.h @@ -25,6 +25,7 @@ #ifndef WAITEVENTSET_H #define WAITEVENTSET_H +#include "storage/condition_variable.h" #include "utils/resowner.h" /* @@ -50,6 +51,7 @@ /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_ACCEPT WL_SOCKET_READABLE #endif +#define WL_CONDITION_VARIABLE (1 << 9) #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ WL_SOCKET_CONNECTED | \ @@ -81,9 +83,10 @@ extern WaitEventSet *CreateWaitEventSet(ResourceOwner resowner, int nevents); extern void FreeWaitEventSet(WaitEventSet *set); extern void FreeWaitEventSetAfterFork(WaitEventSet *set); extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, - struct Latch *latch, void *user_data); + struct Latch *latch, ConditionVariable *cv, + void *user_data); extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, - struct Latch *latch); + struct Latch *latch, ConditionVariable *cv); extern int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info); -- 2.50.1 (Apple Git-155)