From 683c008b71d173dea34810ba445bba6f51f52af6 Mon Sep 17 00:00:00 2001 From: Yura Sokolov Date: Sat, 18 Jan 2025 23:50:09 +0300 Subject: [PATCH v5 1/2] Get rid of WALBufMappingLock Allow many backends to concurrently initialize XLog buffers. This way `MemSet((char *) NewPage, 0, XLOG_BLCKSZ);` is not under single LWLock in exclusive mode. Algorithm: - backend first reserves page for initialization, - then it ensures it was written out, - this it initialized it and signals concurrent initializers usign ConditionVariable, - when enough pages reserved for initialization for this backend, it ensures all required pages completes initialization. Many backends concurrently reserve pages, initialize them, and advance XLogCtl->InitializedUpTo to point latest initialized page. --- src/backend/access/transam/xlog.c | 176 +++++++++++------- .../utils/activity/wait_event_names.txt | 2 +- src/include/storage/lwlocklist.h | 2 +- 3 files changed, 111 insertions(+), 69 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9c270e7d466..c70ac26026e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -302,10 +302,7 @@ static bool doPageWrites; * so it's a plain spinlock. The other locks are held longer (potentially * over I/O operations), so we use LWLocks for them. These locks are: * - * WALBufMappingLock: must be held to replace a page in the WAL buffer cache. - * It is only held while initializing and changing the mapping. If the - * contents of the buffer being replaced haven't been written yet, the mapping - * lock is released while the write is done, and reacquired afterwards. + * (TODO: describe AdvanceXLInsertBuffer) * * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or * XLogFlush). @@ -444,6 +441,11 @@ typedef struct XLogCtlInsert WALInsertLockPadded *WALInsertLocks; } XLogCtlInsert; +typedef struct XLBlocks +{ + pg_atomic_uint64 bound; +} XLBlocks; + /* * Total shared-memory state for XLOG. */ @@ -472,25 +474,37 @@ typedef struct XLogCtlData pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ + /* + * Latest initialized or reserved for inititalization page in the cache + * (last byte position + 1). + * + * It should be advanced before identity of a buffer will be changed to. + * To change the identity of a buffer that's still dirty, the old page + * needs to be written out first, and for that you need WALWriteLock, and + * you need to ensure that there are no in-progress insertions to the page + * by calling WaitXLogInsertionsToFinish(). + */ + pg_atomic_uint64 InitializeReserved; + /* * Latest initialized page in the cache (last byte position + 1). * - * To change the identity of a buffer (and InitializedUpTo), you need to - * hold WALBufMappingLock. To change the identity of a buffer that's - * still dirty, the old page needs to be written out first, and for that - * you need WALWriteLock, and you need to ensure that there are no - * in-progress insertions to the page by calling - * WaitXLogInsertionsToFinish(). + * It is updated to successfully initialized buffer's identities, perhaps + * waiting on conditional variable bound to buffer. */ - XLogRecPtr InitializedUpTo; + pg_atomic_uint64 InitializedUpTo; + + ConditionVariable InitializedUpToCondVar; /* * These values do not change after startup, although the pointed-to pages - * and xlblocks values certainly do. xlblocks values are protected by - * WALBufMappingLock. + * and xlblocks values certainly do. xlblocks values are changed + * lock-free with cooperation with InitializeReserved+InitializedUpTo and + * check for write position. */ char *pages; /* buffers for unwritten XLOG pages */ - pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ + XLBlocks *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ (and condvar + * for) */ int XLogCacheBlck; /* highest allocated xlog buffer index */ /* @@ -810,8 +824,8 @@ XLogInsertRecord(XLogRecData *rdata, * fullPageWrites from changing until the insertion is finished. * * Step 2 can usually be done completely in parallel. If the required WAL - * page is not initialized yet, you have to grab WALBufMappingLock to - * initialize it, but the WAL writer tries to do that ahead of insertions + * page is not initialized yet, you have to go through AdvanceXLInsertBuffer, + * which will ensure it is initialized. But the WAL writer tries to do that ahead of insertions * to avoid that from happening in the critical path. * *---------- @@ -1671,7 +1685,7 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) expectedEndPtr = ptr; expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; - endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); + endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx].bound); if (expectedEndPtr != endptr) { XLogRecPtr initializedUpto; @@ -1702,11 +1716,11 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) WALInsertLockUpdateInsertingAt(initializedUpto); AdvanceXLInsertBuffer(ptr, tli, false); - endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); + endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx].bound); if (expectedEndPtr != endptr) - elog(PANIC, "could not find WAL buffer for %X/%X", - LSN_FORMAT_ARGS(ptr)); + elog(PANIC, "could not find WAL buffer for %X/%X: %X/%X != %X/%X", + LSN_FORMAT_ARGS(ptr), LSN_FORMAT_ARGS(expectedEndPtr), LSN_FORMAT_ARGS(endptr)); } else { @@ -1803,7 +1817,7 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, * First verification step: check that the correct page is present in * the WAL buffers. */ - endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); + endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx].bound); if (expectedEndPtr != endptr) break; @@ -1835,7 +1849,7 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, * Second verification step: check that the page we read from wasn't * evicted while we were copying the data. */ - endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]); + endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx].bound); if (expectedEndPtr != endptr) break; @@ -1991,32 +2005,45 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageBeginPtr; XLogPageHeader NewPage; + XLogRecPtr ReservedPtr; int npages pg_attribute_unused() = 0; - LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); - /* - * Now that we have the lock, check if someone initialized the page - * already. + * Try to initialize pages we need in WAL buffer. */ - while (upto >= XLogCtl->InitializedUpTo || opportunistic) + ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved); + while (upto >= ReservedPtr || opportunistic) { - nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); - /* - * Get ending-offset of the buffer page we need to replace (this may - * be zero if the buffer hasn't been used yet). Fall through if it's - * already written out. + * Get ending-offset of the buffer page we need to replace. + * + * We don't lookup into xlblocks, but rather calculate position we + * must wait to be written. If it was written, xlblocks will have this + * position (or uninitialized) */ - OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]); - if (LogwrtResult.Write < OldPageRqstPtr) + if (ReservedPtr + XLOG_BLCKSZ > XLOG_BLCKSZ * XLOGbuffers) + OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - XLOG_BLCKSZ * XLOGbuffers; + else + OldPageRqstPtr = InvalidXLogRecPtr; + + if (LogwrtResult.Write < OldPageRqstPtr && opportunistic) { /* - * Nope, got work to do. If we just want to pre-initialize as much - * as we can without flushing, give up now. + * If we just want to pre-initialize as much as we can without + * flushing, give up now. */ - if (opportunistic) - break; + upto = ReservedPtr - 1; + break; + } + + /* Actually reserve the page for initialization. */ + if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved, &ReservedPtr, ReservedPtr + XLOG_BLCKSZ)) + continue; + + /* Fall through if it's already written out. */ + if (LogwrtResult.Write < OldPageRqstPtr) + { + /* Nope, got work to do. */ /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); @@ -2031,14 +2058,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) RefreshXLogWriteResult(LogwrtResult); if (LogwrtResult.Write < OldPageRqstPtr) { - /* - * Must acquire write lock. Release WALBufMappingLock first, - * to make sure that all insertions that we need to wait for - * can finish (up to this same position). Otherwise we risk - * deadlock. - */ - LWLockRelease(WALBufMappingLock); - WaitXLogInsertionsToFinish(OldPageRqstPtr); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); @@ -2060,9 +2079,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) PendingWalStats.wal_buffers_full++; TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); } - /* Re-acquire WALBufMappingLock and retry */ - LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); - continue; } } @@ -2070,19 +2086,26 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Now the next buffer slot is free and we can set it up to be the * next output page. */ - NewPageBeginPtr = XLogCtl->InitializedUpTo; + NewPageBeginPtr = ReservedPtr; NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; + nextidx = XLogRecPtrToBufIdx(ReservedPtr); - Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); +#ifdef USE_ASSERT_CHECKING + { + XLogRecPtr storedBound = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx].bound); + + Assert(storedBound == OldPageRqstPtr || storedBound == InvalidXLogRecPtr); + } +#endif NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); /* - * Mark the xlblock with InvalidXLogRecPtr and issue a write barrier - * before initializing. Otherwise, the old page may be partially - * zeroed but look valid. + * Mark the xlblock with (InvalidXLogRecPtr+1) and issue a write + * barrier before initializing. Otherwise, the old page may be + * partially zeroed but look valid. */ - pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], InvalidXLogRecPtr); + pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx].bound, InvalidXLogRecPtr + 1); pg_write_barrier(); /* @@ -2138,12 +2161,26 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) */ pg_write_barrier(); - pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr); - XLogCtl->InitializedUpTo = NewPageEndPtr; + pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx].bound, NewPageEndPtr); + //ConditionVariableBroadcast(&XLogCtl->xlblocks[nextidx].condvar); - npages++; + while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr)) + { + NewPageBeginPtr = NewPageEndPtr; + NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; + nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr); + + if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx].bound) != NewPageEndPtr) + { + ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar); + break; + } + } } - LWLockRelease(WALBufMappingLock); + + while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo)) + ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT); + ConditionVariableCancelSleep(); #ifdef WAL_DEBUG if (XLOG_DEBUG && npages > 0) @@ -2356,7 +2393,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * if we're passed a bogus WriteRqst.Write that is past the end of the * last page that's been initialized by AdvanceXLInsertBuffer. */ - XLogRecPtr EndPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[curridx]); + XLogRecPtr EndPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[curridx].bound); if (LogwrtResult.Write >= EndPtr) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", @@ -4920,7 +4957,7 @@ XLOGShmemSize(void) /* WAL insertion locks, plus alignment */ size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1)); /* xlblocks array */ - size = add_size(size, mul_size(sizeof(pg_atomic_uint64), XLOGbuffers)); + size = add_size(size, mul_size(sizeof(XLBlocks), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ size = add_size(size, Max(XLOG_BLCKSZ, PG_IO_ALIGN_SIZE)); /* and the buffers themselves */ @@ -4998,12 +5035,12 @@ XLOGShmemInit(void) * needed here. */ allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData); - XLogCtl->xlblocks = (pg_atomic_uint64 *) allocptr; - allocptr += sizeof(pg_atomic_uint64) * XLOGbuffers; + XLogCtl->xlblocks = (XLBlocks *) allocptr; + allocptr += sizeof(XLBlocks) * XLOGbuffers; for (i = 0; i < XLOGbuffers; i++) { - pg_atomic_init_u64(&XLogCtl->xlblocks[i], InvalidXLogRecPtr); + pg_atomic_init_u64(&XLogCtl->xlblocks[i].bound, InvalidXLogRecPtr); } /* WAL insertion locks. Ensure they're aligned to the full padded size */ @@ -5044,6 +5081,10 @@ XLOGShmemInit(void) pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); + + pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr); + pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr); + ConditionVariableInit(&XLogCtl->InitializedUpToCondVar); } /* @@ -6062,8 +6103,8 @@ StartupXLOG(void) memcpy(page, endOfRecoveryInfo->lastPage, len); memset(page + len, 0, XLOG_BLCKSZ - len); - pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); - XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; + pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx].bound, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); + pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); } else { @@ -6072,8 +6113,9 @@ StartupXLOG(void) * let the first attempt to insert a log record to initialize the next * buffer. */ - XLogCtl->InitializedUpTo = EndOfLog; + pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog); } + pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo)); /* * Update local and shared status. This is OK to do without any locks diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index e199f071628..ccf73781d81 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -155,6 +155,7 @@ REPLICATION_SLOT_DROP "Waiting for a replication slot to become inactive so it c RESTORE_COMMAND "Waiting for to complete." SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a READ ONLY DEFERRABLE transaction." SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." +WAL_BUFFER_INIT "Waiting on WAL buffer to be initialized." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." @@ -310,7 +311,6 @@ XidGen "Waiting to allocate a new transaction ID." ProcArray "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)." SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue." SInvalWrite "Waiting to add a message to the shared catalog invalidation queue." -WALBufMapping "Waiting to replace a page in WAL buffers." WALWrite "Waiting for WAL buffers to be written to disk." ControlFile "Waiting to read or update the pg_control file or create a new WAL file." MultiXactGen "Waiting to read or update shared multixact state." diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index cf565452382..ff897515769 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen) PG_LWLOCK(4, ProcArray) PG_LWLOCK(5, SInvalRead) PG_LWLOCK(6, SInvalWrite) -PG_LWLOCK(7, WALBufMapping) +/* 7 was WALBufMapping */ PG_LWLOCK(8, WALWrite) PG_LWLOCK(9, ControlFile) /* 10 was CheckpointLock */ -- 2.39.5 (Apple Git-154)