From c6275a297e2478acf68260ce9f21c45c1e6da223 Mon Sep 17 00:00:00 2001 From: alterego665 <824662526@qq.com> Date: Wed, 27 Aug 2025 23:28:12 +0800 Subject: [PATCH v2] Improve read_local_xlog_page_guts by replacing polling with latch-based waiting Replace inefficient polling loops in read_local_xlog_page_guts with latch-based waiting when WAL data is not yet available. This eliminates CPU-intensive busy waiting and improves responsiveness by waking processes immediately when their target LSN becomes available. --- src/backend/access/transam/xlog.c | 18 +++++ src/backend/access/transam/xlogutils.c | 48 +++++++++++--- src/backend/access/transam/xlogwait.c | 66 +++++++++++++++++++ .../utils/activity/wait_event_names.txt | 1 + src/include/access/xlogwait.h | 1 + 5 files changed, 126 insertions(+), 8 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f5257dfa689..4af8f22f166 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2913,6 +2913,15 @@ XLogFlush(XLogRecPtr record) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) + WaitLSNWakeup(LogwrtResult.Flush); + /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. @@ -3088,6 +3097,15 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedLSN))) + WaitLSNWakeup(LogwrtResult.Flush); + /* * Great, done. To take some work off the critical path, try to initialize * as many of the no-longer-needed WAL buffers for future use as we can. diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 27ea52fdfee..b7ba3f6a737 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -23,6 +23,7 @@ #include "access/xlogrecovery.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "storage/fd.h" #include "storage/smgr.h" @@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, loc = targetPagePtr + reqLen; /* - * Loop waiting for xlog to be available if necessary - * - * TODO: The walsender has its own version of this function, which uses a - * condition variable to wake up whenever WAL is flushed. We could use the - * same infrastructure here, instead of the check/sleep/repeat style of - * loop. + * Waiting for xlog to be available if necessary. */ while (1) { @@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, if (state->currTLI == currTLI) { - if (loc <= read_upto) break; @@ -947,7 +942,44 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, } CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + + /* + * Wait for LSN using appropriate method based on server state. + */ + if (!RecoveryInProgress()) + { + /* Primary: wait for flush */ + WaitForLSNFlush(loc); + } + else + { + /* Standby: wait for replay */ + WaitLSNResult result = WaitForLSNReplay(loc, 0); + + switch (result) + { + case WAIT_LSN_RESULT_SUCCESS: + /* LSN was replayed, loop back to recheck timeline */ + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + /* + * Promoted while waiting. This is the tricky case. + * We're now a primary, so loop back and use flush + * logic instead of replay logic. + */ + break; + + default: + /* Shouldn't happen without timeout */ + elog(ERROR, "unexpected wait result"); + } + } + + /* + * Loop back to recheck everything. + * Timeline might have changed during our wait. + */ } else { diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c index 2cc9312e836..20c75e93cfa 100644 --- a/src/backend/access/transam/xlogwait.c +++ b/src/backend/access/transam/xlogwait.c @@ -386,3 +386,69 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) return WAIT_LSN_RESULT_SUCCESS; } + +/* + * Wait for LSN to be flushed on primary server. + * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was replayed + */ +void +WaitForLSNFlush(XLogRecPtr targetLSN) +{ + XLogRecPtr currentLSN; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS); + + /* We can only wait for flush when we are not in recovery */ + Assert(!RecoveryInProgress()); + + /* Quick exit if already flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + return; + + /* Add to waiters */ + addLSNWaiter(targetLSN); + + /* Wait loop */ + for (;;) + { + int rc; + + /* Check if the waited LSN has been flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, -1, + WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN flush"))); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory pairing heap. We might + * already be deleted by the waker process. The 'inHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(); + + return; +} diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ee20a48b2c5..b2266666c02 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,6 +89,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h index 72be2f76293..742c47609ae 100644 --- a/src/include/access/xlogwait.h +++ b/src/include/access/xlogwait.h @@ -86,5 +86,6 @@ extern void WaitLSNShmemInit(void); extern void WaitLSNWakeup(XLogRecPtr currentLSN); extern void WaitLSNCleanup(void); extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); +extern void WaitForLSNFlush(XLogRecPtr targetLSN); #endif /* XLOG_WAIT_H */ -- 2.49.0