From ccdf02bfdcca1807d9fe6bd1e39b0b185f81e5e6 Mon Sep 17 00:00:00 2001 From: alterego665 <824662526@qq.com> Date: Thu, 28 Aug 2025 15:38:47 +0800 Subject: [PATCH v3 2/2] Improve read_local_xlog_page_guts by replacing polling with latch-based waiting Replace inefficient polling loops in read_local_xlog_page_guts with latch-based waiting when WAL data is not yet available. This eliminates CPU-intensive busy waiting and improves responsiveness by waking processes immediately when their target LSN becomes available. --- src/backend/access/transam/xlog.c | 18 ++++ src/backend/access/transam/xlogutils.c | 48 ++++++++-- src/backend/access/transam/xlogwait.c | 93 ++++++++++++++++--- src/backend/replication/walsender.c | 4 - .../utils/activity/wait_event_names.txt | 1 + src/include/access/xlogwait.h | 1 + 6 files changed, 142 insertions(+), 23 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f5257dfa689..4af8f22f166 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2913,6 +2913,15 @@ XLogFlush(XLogRecPtr record) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) + WaitLSNWakeup(LogwrtResult.Flush); + /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. @@ -3088,6 +3097,15 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) + WaitLSNWakeup(LogwrtResult.Flush); + /* * Great, done. To take some work off the critical path, try to initialize * as many of the no-longer-needed WAL buffers for future use as we can. diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 27ea52fdfee..b7ba3f6a737 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -23,6 +23,7 @@ #include "access/xlogrecovery.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "storage/fd.h" #include "storage/smgr.h" @@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, loc = targetPagePtr + reqLen; /* - * Loop waiting for xlog to be available if necessary - * - * TODO: The walsender has its own version of this function, which uses a - * condition variable to wake up whenever WAL is flushed. We could use the - * same infrastructure here, instead of the check/sleep/repeat style of - * loop. + * Waiting for xlog to be available if necessary. */ while (1) { @@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, if (state->currTLI == currTLI) { - if (loc <= read_upto) break; @@ -947,7 +942,44 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, } CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + + /* + * Wait for LSN using appropriate method based on server state. + */ + if (!RecoveryInProgress()) + { + /* Primary: wait for flush */ + WaitForLSNFlush(loc); + } + else + { + /* Standby: wait for replay */ + WaitLSNResult result = WaitForLSNReplay(loc, 0); + + switch (result) + { + case WAIT_LSN_RESULT_SUCCESS: + /* LSN was replayed, loop back to recheck timeline */ + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + /* + * Promoted while waiting. This is the tricky case. + * We're now a primary, so loop back and use flush + * logic instead of replay logic. + */ + break; + + default: + /* Shouldn't happen without timeout */ + elog(ERROR, "unexpected wait result"); + } + } + + /* + * Loop back to recheck everything. + * Timeline might have changed during our wait. + */ } else { diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c index 2cc9312e836..70241bd384f 100644 --- a/src/backend/access/transam/xlogwait.c +++ b/src/backend/access/transam/xlogwait.c @@ -1,8 +1,8 @@ /*------------------------------------------------------------------------- * * xlogwait.c - * Implements waiting for the given replay LSN, which is used in - * WAIT FOR lsn '...' + * Implements waiting for WAL operations to reach specific LSNs. + * Used by WAIT FOR lsn '...' and internal WAL reading operations. * * Copyright (c) 2025, PostgreSQL Global Development Group * @@ -10,10 +10,11 @@ * src/backend/access/transam/xlogwait.c * * NOTES - * This file implements waiting for the replay of the given LSN on a - * physical standby. The core idea is very small: every backend that - * wants to wait publishes the LSN it needs to the shared memory, and - * the startup process wakes it once that LSN has been replayed. + * This file implements waiting for WAL operations to reach specific LSNs + * on both physical standby and primary servers. The core idea is simple: + * every process that wants to wait publishes the LSN it needs to the + * shared memory, and the appropriate process (startup on standby, or + * WAL writer/backend on primary) wakes it once that LSN has been reached. * * The shared memory used by this module comprises a procInfos * per-backend array with the information of the awaited LSN for each @@ -23,14 +24,18 @@ * * In addition, the least-awaited LSN is cached as minWaitedLSN. The * waiter process publishes information about itself to the shared - * memory and waits on the latch before it wakens up by a startup + * memory and waits on the latch before it wakens up by the appropriate * process, timeout is reached, standby is promoted, or the postmaster * dies. Then, it cleans information about itself in the shared memory. * - * After replaying a WAL record, the startup process first performs a - * fast path check minWaitedLSN > replayLSN. If this check is negative, - * it checks waitersHeap and wakes up the backend whose awaited LSNs - * are reached. + * On standby servers: After replaying a WAL record, the startup process + * first performs a fast path check minWaitedLSN > replayLSN. If this + * check is negative, it checks waitersHeap and wakes up the backend + * whose awaited LSNs are reached. + * + * On primary servers: After flushing WAL, the WAL writer or backend + * process performs a similar check against the flush LSN and wakes up + * waiters whose target flush LSNs have been reached. * *------------------------------------------------------------------------- */ @@ -386,3 +391,69 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) return WAIT_LSN_RESULT_SUCCESS; } + +/* + * Wait for LSN to be flushed on primary server. + * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was replayed + */ +void +WaitForLSNFlush(XLogRecPtr targetLSN) +{ + XLogRecPtr currentLSN; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS); + + /* We can only wait for flush when we are not in recovery */ + Assert(!RecoveryInProgress()); + + /* Quick exit if already flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + return; + + /* Add to waiters */ + addLSNWaiter(targetLSN); + + /* Wait loop */ + for (;;) + { + int rc; + + /* Check if the waited LSN has been flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, -1, + WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN flush"))); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory pairing heap. We might + * already be deleted by the waker process. The 'inHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(); + + return; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0855bae3535..5fb74088fcd 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1021,10 +1021,6 @@ StartReplication(StartReplicationCmd *cmd) /* * XLogReaderRoutine->page_read callback for logical decoding contexts, as a * walsender process. - * - * Inside the walsender we can do better than read_local_xlog_page, - * which has to do a plain sleep/busy loop, because the walsender's latch gets - * set every time WAL is flushed. */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ee20a48b2c5..b2266666c02 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,6 +89,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h index 72be2f76293..742c47609ae 100644 --- a/src/include/access/xlogwait.h +++ b/src/include/access/xlogwait.h @@ -86,5 +86,6 @@ extern void WaitLSNShmemInit(void); extern void WaitLSNWakeup(XLogRecPtr currentLSN); extern void WaitLSNCleanup(void); extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); +extern void WaitForLSNFlush(XLogRecPtr targetLSN); #endif /* XLOG_WAIT_H */ -- 2.49.0