From a8c9055ecd252166f009c9b94120deb636d1c4e0 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Sun, 28 Sep 2025 18:44:54 +0800 Subject: [PATCH v5] Improve read_local_xlog_page_guts by replacing polling with latch-based waiting Replace inefficient polling loops in read_local_xlog_page_guts with latch-based waiting when WAL data is not yet available. This eliminates CPU-intensive busy waiting and improves responsiveness by waking processes immediately when their target LSN becomes available. --- src/backend/access/transam/xlog.c | 20 +- src/backend/access/transam/xlogrecovery.c | 4 +- src/backend/access/transam/xlogutils.c | 48 ++- src/backend/access/transam/xlogwait.c | 329 +++++++++++++----- src/backend/replication/walsender.c | 4 - .../utils/activity/wait_event_names.txt | 1 + src/include/access/xlogwait.h | 58 ++- 7 files changed, 347 insertions(+), 117 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 36b8ac6b855..76c5ad7ae26 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2913,6 +2913,15 @@ XLogFlush(XLogRecPtr record) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN))) + WaitLSNWakeupFlush(LogwrtResult.Flush); + /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. @@ -3095,6 +3104,15 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN))) + WaitLSNWakeupFlush(LogwrtResult.Flush); + /* * Great, done. To take some work off the critical path, try to initialize * as many of the no-longer-needed WAL buffers for future use as we can. @@ -6227,7 +6245,7 @@ StartupXLOG(void) * Wake up all waiters for replay LSN. They need to report an error that * recovery was ended before reaching the target LSN. */ - WaitLSNWakeup(InvalidXLogRecPtr); + WaitLSNWakeupReplay(InvalidXLogRecPtr); /* * Shutdown the recovery environment. This must occur after diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 824b0942b34..1859d2084e8 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1846,8 +1846,8 @@ PerformWalRecovery(void) */ if (waitLSNState && (XLogRecoveryCtl->lastReplayedEndRecPtr >= - pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) - WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr); + pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN))) + WaitLSNWakeupReplay(XLogRecoveryCtl->lastReplayedEndRecPtr); /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 38176d9688e..0ea02a45c6b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -23,6 +23,7 @@ #include "access/xlogrecovery.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "storage/fd.h" #include "storage/smgr.h" @@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, loc = targetPagePtr + reqLen; /* - * Loop waiting for xlog to be available if necessary - * - * TODO: The walsender has its own version of this function, which uses a - * condition variable to wake up whenever WAL is flushed. We could use the - * same infrastructure here, instead of the check/sleep/repeat style of - * loop. + * Waiting for xlog to be available if necessary. */ while (1) { @@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, if (state->currTLI == currTLI) { - if (loc <= read_upto) break; @@ -947,7 +942,44 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, } CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + + /* + * Wait for LSN using appropriate method based on server state. + */ + if (!RecoveryInProgress()) + { + /* Primary: wait for flush */ + WaitForLSNFlush(loc); + } + else + { + /* Standby: wait for replay */ + WaitLSNResult result = WaitForLSNReplay(loc, 0); + + switch (result) + { + case WAIT_LSN_RESULT_SUCCESS: + /* LSN was replayed, loop back to recheck timeline */ + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + /* + * Promoted while waiting. This is the tricky case. + * We're now a primary, so loop back and use flush + * logic instead of replay logic. + */ + break; + + default: + /* Shouldn't happen without timeout */ + elog(ERROR, "unexpected wait result"); + } + } + + /* + * Loop back to recheck everything. + * Timeline might have changed during our wait. + */ } else { diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c index 4d831fbfa74..3916a9163d5 100644 --- a/src/backend/access/transam/xlogwait.c +++ b/src/backend/access/transam/xlogwait.c @@ -1,8 +1,8 @@ /*------------------------------------------------------------------------- * * xlogwait.c - * Implements waiting for the given replay LSN, which is used in - * WAIT FOR lsn '...' + * Implements waiting for WAL operations to reach specific LSNs. + * Used by WAIT FOR lsn '...' and internal WAL reading operations. * * Copyright (c) 2025, PostgreSQL Global Development Group * @@ -10,10 +10,11 @@ * src/backend/access/transam/xlogwait.c * * NOTES - * This file implements waiting for the replay of the given LSN on a - * physical standby. The core idea is very small: every backend that - * wants to wait publishes the LSN it needs to the shared memory, and - * the startup process wakes it once that LSN has been replayed. + * This file implements waiting for WAL operations to reach specific LSNs + * on both physical standby and primary servers. The core idea is simple: + * every process that wants to wait publishes the LSN it needs to the + * shared memory, and the appropriate process (startup on standby, or + * WAL writer/backend on primary) wakes it once that LSN has been reached. * * The shared memory used by this module comprises a procInfos * per-backend array with the information of the awaited LSN for each @@ -23,14 +24,18 @@ * * In addition, the least-awaited LSN is cached as minWaitedLSN. The * waiter process publishes information about itself to the shared - * memory and waits on the latch before it wakens up by a startup + * memory and waits on the latch before it wakens up by the appropriate * process, timeout is reached, standby is promoted, or the postmaster * dies. Then, it cleans information about itself in the shared memory. * - * After replaying a WAL record, the startup process first performs a - * fast path check minWaitedLSN > replayLSN. If this check is negative, - * it checks waitersHeap and wakes up the backend whose awaited LSNs - * are reached. + * On standby servers: After replaying a WAL record, the startup process + * first performs a fast path check minWaitedLSN > replayLSN. If this + * check is negative, it checks waitersHeap and wakes up the backend + * whose awaited LSNs are reached. + * + * On primary servers: After flushing WAL, the WAL writer or backend + * process performs a similar check against the flush LSN and wakes up + * waiters whose target flush LSNs have been reached. * *------------------------------------------------------------------------- */ @@ -53,8 +58,10 @@ #include "utils/snapmgr.h" +static int waitlsn_replay_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); -static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, +static int waitlsn_flush_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg); struct WaitLSNState *waitLSNState = NULL; @@ -81,22 +88,29 @@ WaitLSNShmemInit(void) &found); if (!found) { - pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX); - pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL); + /* Initialize replay heap and tracking */ + pg_atomic_init_u64(&waitLSNState->minWaitedReplayLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->replayWaitersHeap, waitlsn_replay_cmp, (void *)(uintptr_t)WAIT_LSN_REPLAY); + + /* Initialize flush heap and tracking */ + pg_atomic_init_u64(&waitLSNState->minWaitedFlushLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->flushWaitersHeap, waitlsn_flush_cmp, (void *)(uintptr_t)WAIT_LSN_FLUSH); + + /* Initialize process info array */ memset(&waitLSNState->procInfos, 0, (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo)); } } /* - * Comparison function for waitReplayLSN->waitersHeap heap. Waiting processes are - * ordered by lsn, so that the waiter with smallest lsn is at the top. + * Comparison function for replay waiters heaps. Waiting processes are + * ordered by LSN, so that the waiter with smallest LSN is at the top. */ static int -waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +waitlsn_replay_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) { - const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); - const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, b); if (aproc->waitLSN < bproc->waitLSN) return 1; @@ -107,65 +121,106 @@ waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) } /* - * Update waitReplayLSN->minWaitedLSN according to the current state of - * waitReplayLSN->waitersHeap. + * Comparison function for flush waiters heaps. Waiting processes are + * ordered by LSN, so that the waiter with smallest LSN is at the top. + */ +static int +waitlsn_flush_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Update minimum waited LSN for the specified operation type */ static void -updateMinWaitedLSN(void) +updateMinWaitedLSN(WaitLSNOperation operation) { - XLogRecPtr minWaitedLSN = PG_UINT64_MAX; + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; - if (!pairingheap_is_empty(&waitLSNState->waitersHeap)) + if (operation == WAIT_LSN_REPLAY) { - pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); - - minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; + if (!pairingheap_is_empty(&waitLSNState->replayWaitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->replayWaitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node); + minWaitedLSN = procInfo->waitLSN; + } + pg_atomic_write_u64(&waitLSNState->minWaitedReplayLSN, minWaitedLSN); + } + else /* WAIT_LSN_FLUSH */ + { + if (!pairingheap_is_empty(&waitLSNState->flushWaitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->flushWaitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node); + minWaitedLSN = procInfo->waitLSN; + } + pg_atomic_write_u64(&waitLSNState->minWaitedFlushLSN, minWaitedLSN); } - - pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN); } /* - * Put the current process into the heap of LSN waiters. + * Add current process to appropriate waiters heap based on operation type */ static void -addLSNWaiter(XLogRecPtr lsn) +addLSNWaiter(XLogRecPtr lsn, WaitLSNOperation operation) { WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - Assert(!procInfo->inHeap); - procInfo->procno = MyProcNumber; procInfo->waitLSN = lsn; - pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode); - procInfo->inHeap = true; - updateMinWaitedLSN(); + if (operation == WAIT_LSN_REPLAY) + { + Assert(!procInfo->inReplayHeap); + pairingheap_add(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode); + procInfo->inReplayHeap = true; + updateMinWaitedLSN(WAIT_LSN_REPLAY); + } + else /* WAIT_LSN_FLUSH */ + { + Assert(!procInfo->inFlushHeap); + pairingheap_add(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode); + procInfo->inFlushHeap = true; + updateMinWaitedLSN(WAIT_LSN_FLUSH); + } LWLockRelease(WaitLSNLock); } /* - * Remove the current process from the heap of LSN waiters if it's there. + * Remove current process from appropriate waiters heap based on operation type */ static void -deleteLSNWaiter(void) +deleteLSNWaiter(WaitLSNOperation operation) { WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - if (!procInfo->inHeap) + if (operation == WAIT_LSN_REPLAY && procInfo->inReplayHeap) { - LWLockRelease(WaitLSNLock); - return; + pairingheap_remove(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode); + procInfo->inReplayHeap = false; + updateMinWaitedLSN(WAIT_LSN_REPLAY); + } + else if (operation == WAIT_LSN_FLUSH && procInfo->inFlushHeap) + { + pairingheap_remove(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode); + procInfo->inFlushHeap = false; + updateMinWaitedLSN(WAIT_LSN_FLUSH); } - - pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode); - procInfo->inHeap = false; - updateMinWaitedLSN(); LWLockRelease(WaitLSNLock); } @@ -177,7 +232,7 @@ deleteLSNWaiter(void) #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16) /* - * Remove waiters whose LSN has been replayed from the heap and set their + * Remove waiters whose LSN has been reached from the heap and set their * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap * and set latches for all waiters. * @@ -188,12 +243,18 @@ deleteLSNWaiter(void) * if there are more waiters, this function will loop to process them in * multiple chunks. */ -void -WaitLSNWakeup(XLogRecPtr currentLSN) +static void +wakeupWaiters(WaitLSNOperation operation, XLogRecPtr currentLSN) { - int i; - ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE]; - int numWakeUpProcs; + ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE]; + int numWakeUpProcs; + int i; + pairingheap *heap; + + /* Select appropriate heap */ + heap = (operation == WAIT_LSN_REPLAY) ? + &waitLSNState->replayWaitersHeap : + &waitLSNState->flushWaitersHeap; do { @@ -201,35 +262,42 @@ WaitLSNWakeup(XLogRecPtr currentLSN) LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); /* - * Iterate the pairing heap of waiting processes till we find LSN not - * yet replayed. Record the process numbers to wake up, but to avoid - * holding the lock for too long, send the wakeups only after - * releasing the lock. + * Iterate the waiters heap until we find LSN not yet reached. + * Record process numbers to wake up, but send wakeups after releasing lock. */ - while (!pairingheap_is_empty(&waitLSNState->waitersHeap)) + while (!pairingheap_is_empty(heap)) { - pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); - WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); + pairingheap_node *node = pairingheap_first(heap); + WaitLSNProcInfo *procInfo; + + /* Get procInfo using appropriate heap node */ + if (operation == WAIT_LSN_REPLAY) + procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node); + else + procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node); - if (!XLogRecPtrIsInvalid(currentLSN) && - procInfo->waitLSN > currentLSN) + if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN) break; Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE); wakeUpProcs[numWakeUpProcs++] = procInfo->procno; - (void) pairingheap_remove_first(&waitLSNState->waitersHeap); - procInfo->inHeap = false; + (void) pairingheap_remove_first(heap); + + /* Update appropriate flag */ + if (operation == WAIT_LSN_REPLAY) + procInfo->inReplayHeap = false; + else + procInfo->inFlushHeap = false; if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE) break; } - updateMinWaitedLSN(); - + updateMinWaitedLSN(operation); LWLockRelease(WaitLSNLock); /* - * Set latches for processes, whose waited LSNs are already replayed. + * Set latches for processes, whose waited LSNs are already reached. * As the time consuming operations, we do this outside of * WaitLSNLock. This is actually fine because procLatch isn't ever * freed, so we just can potentially set the wrong process' (or no @@ -238,25 +306,54 @@ WaitLSNWakeup(XLogRecPtr currentLSN) for (i = 0; i < numWakeUpProcs; i++) SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch); - /* Need to recheck if there were more waiters than static array size. */ - } - while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE); + } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE); +} + +/* + * Wake up processes waiting for replay LSN to reach currentLSN + */ +void +WaitLSNWakeupReplay(XLogRecPtr currentLSN) +{ + /* Fast path check */ + if (pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN) > currentLSN) + return; + + wakeupWaiters(WAIT_LSN_REPLAY, currentLSN); +} + +/* + * Wake up processes waiting for flush LSN to reach currentLSN + */ +void +WaitLSNWakeupFlush(XLogRecPtr currentLSN) +{ + /* Fast path check */ + if (pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN) > currentLSN) + return; + + wakeupWaiters(WAIT_LSN_FLUSH, currentLSN); } /* - * Delete our item from shmem array if any. + * Clean up LSN waiters for exiting process */ void WaitLSNCleanup(void) { - /* - * We do a fast-path check of the 'inHeap' flag without the lock. This - * flag is set to true only by the process itself. So, it's only possible - * to get a false positive. But that will be eliminated by a recheck - * inside deleteLSNWaiter(). - */ - if (waitLSNState->procInfos[MyProcNumber].inHeap) - deleteLSNWaiter(); + if (waitLSNState) + { + /* + * We do a fast-path check of the heap flags without the lock. These + * flags are set to true only by the process itself. So, it's only possible + * to get a false positive. But that will be eliminated by a recheck + * inside deleteLSNWaiter(). + */ + if (waitLSNState->procInfos[MyProcNumber].inReplayHeap) + deleteLSNWaiter(WAIT_LSN_REPLAY); + if (waitLSNState->procInfos[MyProcNumber].inFlushHeap) + deleteLSNWaiter(WAIT_LSN_FLUSH); + } } /* @@ -308,11 +405,11 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) } /* - * Add our process to the pairing heap of waiters. It might happen that + * Add our process to the replay waiters heap. It might happen that * target LSN gets replayed before we do. Another check at the beginning * of the loop below prevents the race condition. */ - addLSNWaiter(targetLSN); + addLSNWaiter(targetLSN, WAIT_LSN_REPLAY); for (;;) { @@ -326,7 +423,7 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) * Recovery was ended, but recheck if target LSN was already * replayed. See the comment regarding deleteLSNWaiter() below. */ - deleteLSNWaiter(); + deleteLSNWaiter(WAIT_LSN_REPLAY); currentLSN = GetXLogReplayRecPtr(NULL); if (PromoteIsTriggered() && targetLSN <= currentLSN) return WAIT_LSN_RESULT_SUCCESS; @@ -372,11 +469,11 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) } /* - * Delete our process from the shared memory pairing heap. We might - * already be deleted by the startup process. The 'inHeap' flag prevents + * Delete our process from the shared memory replay heap. We might + * already be deleted by the startup process. The 'inReplayHeap' flag prevents * us from the double deletion. */ - deleteLSNWaiter(); + deleteLSNWaiter(WAIT_LSN_REPLAY); /* * If we didn't reach the target LSN, we must be exited by timeout. @@ -386,3 +483,69 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) return WAIT_LSN_RESULT_SUCCESS; } + +/* + * Wait until targetLSN has been flushed on a primary server. + * Returns only after the condition is satisfied or on FATAL exit. + */ +void +WaitForLSNFlush(XLogRecPtr targetLSN) +{ + XLogRecPtr currentLSN; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS); + + /* We can only wait for flush when we are not in recovery */ + Assert(!RecoveryInProgress()); + + /* Quick exit if already flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + return; + + /* Add to flush waiters */ + addLSNWaiter(targetLSN, WAIT_LSN_FLUSH); + + /* Wait loop */ + for (;;) + { + int rc; + + /* Check if the waited LSN has been flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, -1, + WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN flush"))); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory flush heap. We might + * already be deleted by the waker process. The 'inFlushHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(WAIT_LSN_FLUSH); + + return; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..9955e829190 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1022,10 +1022,6 @@ StartReplication(StartReplicationCmd *cmd) /* * XLogReaderRoutine->page_read callback for logical decoding contexts, as a * walsender process. - * - * Inside the walsender we can do better than read_local_xlog_page, - * which has to do a plain sleep/busy loop, because the walsender's latch gets - * set every time WAL is flushed. */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index eb77924c4be..c1ac71ff7f2 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,6 +89,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h index df8202528b9..f9c303a8c7f 100644 --- a/src/include/access/xlogwait.h +++ b/src/include/access/xlogwait.h @@ -30,49 +30,67 @@ typedef enum * wait */ } WaitLSNResult; +/* + * Wait operation types for LSN waiting facility. + */ +typedef enum WaitLSNOperation +{ + WAIT_LSN_REPLAY, /* Waiting for replay on standby */ + WAIT_LSN_FLUSH /* Waiting for flush on primary */ +} WaitLSNOperation; + /* * WaitLSNProcInfo - the shared memory structure representing information - * about the single process, which may wait for LSN replay. An item of - * waitLSN->procInfos array. + * about the single process, which may wait for LSN operations. An item of + * waitLSNState->procInfos array. */ typedef struct WaitLSNProcInfo { /* LSN, which this process is waiting for */ XLogRecPtr waitLSN; - /* Process to wake up once the waitLSN is replayed */ + /* Process to wake up once the waitLSN is reached */ ProcNumber procno; - /* - * A flag indicating that this item is present in - * waitReplayLSNState->waitersHeap - */ - bool inHeap; + /* Type-safe heap membership flags */ + bool inReplayHeap; /* In replay waiters heap */ + bool inFlushHeap; /* In flush waiters heap */ - /* - * A pairing heap node for participation in - * waitReplayLSNState->waitersHeap - */ - pairingheap_node phNode; + /* Separate heap nodes for type safety */ + pairingheap_node replayHeapNode; + pairingheap_node flushHeapNode; } WaitLSNProcInfo; /* - * WaitLSNState - the shared memory state for the replay LSN waiting facility. + * WaitLSNState - the shared memory state for the LSN waiting facility. */ typedef struct WaitLSNState { /* - * The minimum LSN value some process is waiting for. Used for the + * The minimum replay LSN value some process is waiting for. Used for the * fast-path checking if we need to wake up any waiters after replaying a * WAL record. Could be read lock-less. Update protected by WaitLSNLock. */ - pg_atomic_uint64 minWaitedLSN; + pg_atomic_uint64 minWaitedReplayLSN; + + /* + * A pairing heap of replay waiting processes ordered by LSN values (least LSN is + * on top). Protected by WaitLSNLock. + */ + pairingheap replayWaitersHeap; + + /* + * The minimum flush LSN value some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after flushing + * WAL. Could be read lock-less. Update protected by WaitLSNLock. + */ + pg_atomic_uint64 minWaitedFlushLSN; /* - * A pairing heap of waiting processes order by LSN values (least LSN is + * A pairing heap of flush waiting processes ordered by LSN values (least LSN is * on top). Protected by WaitLSNLock. */ - pairingheap waitersHeap; + pairingheap flushWaitersHeap; /* * An array with per-process information, indexed by the process number. @@ -86,8 +104,10 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState; extern Size WaitLSNShmemSize(void); extern void WaitLSNShmemInit(void); -extern void WaitLSNWakeup(XLogRecPtr currentLSN); +extern void WaitLSNWakeupReplay(XLogRecPtr currentLSN); +extern void WaitLSNWakeupFlush(XLogRecPtr currentLSN); extern void WaitLSNCleanup(void); extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); +extern void WaitForLSNFlush(XLogRecPtr targetLSN); #endif /* XLOG_WAIT_H */ -- 2.51.0