From ad0a3cfe10bdd2cccc4274849c4a77898b06e13c Mon Sep 17 00:00:00 2001 From: Anthonin Bonnefoy Date: Wed, 2 Jul 2025 09:58:52 +0200 Subject: Don't keep closed WAL segments in page cache after replay On a standby, the recovery process reads the WAL segments, applies changes and closes the segment. When closed, the segments will still be in page cache memory until they are evicted due to inactivity. The segments may be re-read if archive_mode is set to always, wal_summarizer is enabled or if the standby is used for replication and has an active walsender. The presence of a replication slots is also a likely indicator that a walsender will be started, and need to read the WAL segments. Outside of those circumstances, the WAL segments won't be re-read and keeping them in the page cache generates unnecessary memory pressure. A POSIX_FADV_DONTNEED is sent before closing a replayed WAL segment to immediately free any cached pages. --- src/backend/access/transam/xlogrecovery.c | 21 +++++++++++++++++++++ src/backend/replication/slot.c | 16 ++++++++++++++++ src/backend/replication/walsender.c | 14 +++++++++++++- src/include/replication/slot.h | 10 +++++----- src/include/replication/walsender.h | 1 + src/include/replication/walsender_private.h | 4 ++++ 6 files changed, 60 insertions(+), 6 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index ecd66fd86a4..b5ffef515c7 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -49,6 +49,7 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "postmaster/walsummarizer.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/walreceiver.h" @@ -3370,6 +3371,26 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, } } + /* + * Once replayed, WAL segment files may be re-read in several cases: + * archive_mode is set to always, summarize_wal is enabled or the + * standby acts as a walsender for either logical or physical + * replication. Outside of those conditions, the WAL segment files + * shouldn't be re-read and we can signal the kernel to release any + * cached pages. We also check if any replication slot is used: If + * present, it's likely a walsender will start and will need to read + * the WAL segment files, so we may as well keep the pages in cache. + */ +#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) + { + if (StandbyMode && + XLogArchiveMode != ARCHIVE_MODE_ALWAYS && + !summarize_wal && + WalSndNumActive() == 0 && + ReplicationSlotNumUsed() == 0) + (void) posix_fadvise(readFile, 0, 0, POSIX_FADV_DONTNEED); + } +#endif close(readFile); readFile = -1; readSource = XLOG_FROM_ANY; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 28c7019402b..9ea7272df4a 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -220,6 +220,7 @@ ReplicationSlotsShmemInit(void) /* First time through, so initialize */ MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize()); + pg_atomic_init_u32(&ReplicationSlotCtl->used_replication_slots, 0); for (i = 0; i < max_replication_slots; i++) { @@ -522,6 +523,9 @@ ReplicationSlotCreate(const char *name, bool db_specific, if (SlotIsLogical(slot)) pgstat_create_replslot(slot); + /* Update the used_replication_slots counter with the created slot */ + pg_atomic_fetch_add_u32(&ReplicationSlotCtl->used_replication_slots, 1); + /* * Now that the slot has been marked as in_use and active, it's safe to * let somebody else try to allocate a slot. @@ -579,6 +583,15 @@ ReplicationSlotIndex(ReplicationSlot *slot) return slot - ReplicationSlotCtl->replication_slots; } +/* + * Return the number of used replication slots + */ +int +ReplicationSlotNumUsed(void) +{ + return pg_atomic_read_u32(&ReplicationSlotCtl->used_replication_slots); +} + /* * If the slot at 'index' is unused, return false. Otherwise 'name' is set to * the slot's name and true is returned. @@ -1143,6 +1156,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) if (SlotIsLogical(slot)) pgstat_drop_replslot(slot); + /* Decrement the used_replication_slots counter */ + pg_atomic_sub_fetch_u32(&ReplicationSlotCtl->used_replication_slots, 1); + /* * We release this at the very end, so that nobody starts trying to create * a slot while we're still cleaning up the detritus of the old one. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2cde8ebc729..13fa279658f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3092,7 +3092,7 @@ InitWalSenderSlot(void) SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = walsnd; - + pg_atomic_fetch_add_u32(&WalSndCtl->active_wal_senders, 1); break; } } @@ -3117,6 +3117,7 @@ WalSndKill(int code, Datum arg) /* Mark WalSnd struct as no longer being in use. */ walsnd->pid = 0; SpinLockRelease(&walsnd->mutex); + pg_atomic_fetch_sub_u32(&WalSndCtl->active_wal_senders, 1); } /* XLogReaderRoutine->segment_open callback */ @@ -3697,6 +3698,15 @@ WalSndRqstFileReload(void) } } +/* + * Return the number of active walsender processes + */ +int +WalSndNumActive(void) +{ + return pg_atomic_read_u32(&WalSndCtl->active_wal_senders); +} + /* * Handle PROCSIG_WALSND_INIT_STOPPING signal. */ @@ -3785,6 +3795,8 @@ WalSndShmemInit(void) SpinLockInit(&walsnd->mutex); } + pg_atomic_init_u32(&WalSndCtl->active_wal_senders, 0); + ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 4b4709f6e2c..491b8e53db5 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -292,11 +292,10 @@ typedef struct ReplicationSlot */ typedef struct ReplicationSlotCtlData { - /* - * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some - * reason you can't do that in an otherwise-empty struct. - */ - ReplicationSlot replication_slots[1]; + /* Number of used replication slots */ + pg_atomic_uint32 used_replication_slots; + + ReplicationSlot replication_slots[FLEXIBLE_ARRAY_MEMBER]; } ReplicationSlotCtlData; /* @@ -370,6 +369,7 @@ extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, TransactionId snapshotConflictHorizon); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); +extern int ReplicationSlotNumUsed(void); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index a4df3b8e0ae..b0a3d9a72ba 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -48,6 +48,7 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern int WalSndNumActive(void); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5de674d5410..98618a14e34 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -16,6 +16,7 @@ #include "lib/ilist.h" #include "nodes/nodes.h" #include "nodes/replnodes.h" +#include "port/atomics.h" #include "replication/syncrep.h" #include "storage/condition_variable.h" #include "storage/shmem.h" @@ -113,6 +114,9 @@ typedef struct */ ConditionVariable wal_confirm_rcv_cv; + /* Number of active walsenders. */ + pg_atomic_uint32 active_wal_senders; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; -- 2.52.0