From 545ddb9055dfff3eff520d5fc854a8f4abfdf029 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 12 Feb 2020 18:17:24 +1300 Subject: [PATCH 5/5] Prefetch referenced blocks during recovery. Introduce a new GUC wal_prefetch_distance. If it is set to a positive number of bytes, then read ahead in the WAL at most that distance and initiate asynchronous reading of referenced blocks, in the hope of avoiding I/O stalls. The number of concurrent asynchronous reads is limited by both effective_io_concurrency and wal_prefetch_distance. Author: Thomas Munro Reviewed-by: Tomas Vondra Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 38 ++ src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 65 +++ src/backend/access/transam/xlogprefetcher.c | 456 ++++++++++++++++++++ src/backend/access/transam/xlogutils.c | 23 +- src/backend/replication/logical/logical.c | 2 +- src/backend/utils/misc/guc.c | 25 ++ src/include/access/xlog.h | 4 + src/include/access/xlogprefetcher.h | 25 ++ src/include/access/xlogutils.h | 20 + src/include/storage/bufmgr.h | 5 + src/include/utils/guc.h | 2 + 12 files changed, 664 insertions(+), 2 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetcher.c create mode 100644 src/include/access/xlogprefetcher.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c1128f89ec..415b0793e1 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3082,6 +3082,44 @@ include_dir 'conf.d' + + wal_prefetch_distance (integer) + + wal_prefetch_distance configuration parameter + + + + + The maximum distance to look ahead in the WAL during recovery, to find + blocks to prefetch. Prefetching blocks that will soon be needed can + reduce I/O wait times. The number of concurrent prefetches is limited + by this setting as well as . + If this value is specified without units, it is taken as bytes. + The default is -1, meaning that WAL prefetching is disabled. + + + + + + wal_prefetch_fpw (boolean) + + wal_prefetch_fpw configuration parameter + + + + + Whether to prefetch blocks with full page images during recovery. + Usually this doesn't help, since such blocks will not be read. However, + on file systems with a block size larger than + PostgreSQL's, prefetching can avoid a costly + read-before-write when a blocks are later written. + This setting has no effect unless + is set to a positive number. + The default is off. + + + + diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..20e044c7c8 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetcher.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0c389e9315..0f27a4da54 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -34,11 +34,13 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xloginsert.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "catalog/pg_database.h" +#include "catalog/storage_xlog.h" #include "commands/tablespace.h" #include "common/controldata_utils.h" #include "miscadmin.h" @@ -104,6 +106,8 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int wal_prefetch_distance = -1; +bool wal_prefetch_fpw = false; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -801,6 +805,7 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ static bool lastSourceFailed = false; +static bool reset_wal_prefetcher = false; typedef struct XLogPageReadPrivate { @@ -6191,6 +6196,7 @@ CheckRequiredParameterValues(void) } } + /* * This must be called ONCE during postmaster or standalone-backend startup */ @@ -7046,6 +7052,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetcher *prefetcher = NULL; InRedo = true; @@ -7053,6 +7060,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + /* the first time through, see if we need to enable prefetching */ + ResetWalPrefetcher(); + /* * main redo apply loop */ @@ -7082,6 +7092,31 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* + * The first time through, or if any relevant settings or the + * WAL source changes, we'll restart the prefetching machinery + * as appropriate. This is simpler than trying to handle + * various complicated state changes. + */ + if (unlikely(reset_wal_prefetcher)) + { + /* If we had one already, destroy it. */ + if (prefetcher) + { + XLogPrefetcherFree(prefetcher); + prefetcher = NULL; + } + /* If we want one, create it. */ + if (wal_prefetch_distance > 0) + prefetcher = XLogPrefetcherAllocate(xlogreader->ReadRecPtr, + currentSource == XLOG_FROM_STREAM); + reset_wal_prefetcher = false; + } + + /* Peform WAL prefetching, if enabled. */ + if (prefetcher) + XLogPrefetcherReadAhead(prefetcher, xlogreader->ReadRecPtr); + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7269,6 +7304,8 @@ StartupXLOG(void) /* * end of main redo apply loop */ + if (prefetcher) + XLogPrefetcherFree(prefetcher); if (reachedRecoveryTarget) { @@ -10128,6 +10165,24 @@ assign_xlog_sync_method(int new_sync_method, void *extra) } } +void +assign_wal_prefetch_distance(int new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + wal_prefetch_distance = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + +void +assign_wal_prefetch_fpw(bool new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + wal_prefetch_fpw = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + /* * Issue appropriate kind of fsync (if any) for an XLOG output file. @@ -11911,6 +11966,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and move on to the next state. */ currentSource = XLOG_FROM_STREAM; + ResetWalPrefetcher(); break; case XLOG_FROM_STREAM: @@ -12334,3 +12390,12 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Schedule a WAL prefetcher reset, on change of relevant settings. + */ +void +ResetWalPrefetcher(void) +{ + reset_wal_prefetcher = true; +} diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c new file mode 100644 index 0000000000..6b565dc313 --- /dev/null +++ b/src/backend/access/transam/xlogprefetcher.c @@ -0,0 +1,456 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.c + * Prefetching support for PostgreSQL write-ahead log manager + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetcher.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/hsearch.h" + +/* + * Internal state used for book-keeping. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + XLogReadLocalOptions options; + bool have_record; + bool shutdown; + int next_block_id; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + XLogRecPtr *prefetch_queue; + int prefetch_queue_size; + int prefetch_head; + int prefetch_tail; + + /* Details of last prefetched block. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL that is ahead of the given lsn. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming) +{ + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + XLogPrefetcher *prefetcher = palloc0(sizeof(*prefetcher)); + + prefetcher->options.nowait = true; + if (streaming) + { + /* + * We're only allowed to read as far as the WAL receiver has written. + * We don't have to wait for it to be flushed, though, as recovery + * does, so that gives us a chance ot get a bit further ahead. + */ + prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN; + } + else + { + /* We're allowed to read as far as we can. */ + prefetcher->options.read_upto_policy = XLRO_LSN; + prefetcher->options.lsn = (XLogRecPtr) -1; + } + prefetcher->reader = XLogReaderAllocate(wal_segment_size, + NULL, + read_local_xlog_page, + &prefetcher->options); + prefetcher->filter_table = hash_create("PrefetchFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + /* + * The size of the queue is determined by target_prefetch_pages, which is + * derived from effective_io_concurrency. In theory we might have a + * separate queue for each tablespace, but it's not clear how that should + * work, so for now we'll just use the system-wide GUC to rate-limit all + * prefetching. + */ + prefetcher->prefetch_queue_size = target_prefetch_pages; + prefetcher->prefetch_queue = palloc0(sizeof(XLogRecPtr) * prefetcher->prefetch_queue_size); + prefetcher->prefetch_head = prefetcher->prefetch_tail = 0; + + /* Prepare to read at the given LSN. */ + XLogBeginRead(prefetcher->reader, lsn); + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + XLogReaderFree(prefetcher->reader); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher->prefetch_queue); + pfree(prefetcher); +} + +/* + * Read ahead in the WAL, as far as we can within the limits set by the user. + * Begin fetching any referenced blocks that are not already in the buffer + * pool. + */ +void +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + /* + * If an error has occurred or we've hit the end of the WAL or a timeline + * change, do nothing. Eventually we might be restarted by the recovery + * loop deciding to reset us due to a new timeline or a GUC change. + */ + if (prefetcher->shutdown) + return; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of IOs running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return; + + /* Can we drop any filters yet, due to problem records begin replayed? */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* Main prefetch loop. */ + for (;;) + { + XLogReaderState *reader = prefetcher->reader; + char *error; + + /* If we don't already have a record, then try to read one. */ + if (!prefetcher->have_record) + { + if (!XLogReadRecord(reader, &error)) + { + /* If we got an error, log it and give up. */ + if (error) + { + elog(LOG, "WAL prefetch: %s", error); + prefetcher->shutdown = true; + } + /* Otherwise, we'll try again later when more data is here. */ + return; + } + prefetcher->have_record = true; + prefetcher->next_block_id = 0; + } + + /* Are we too far ahead of replay? */ + if (prefetcher->reader->ReadRecPtr >= replaying_lsn + wal_prefetch_distance) + break; + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < reader->ReadRecPtr && + XLogRecGetRmid(reader) == RM_SMGR_ID && + (XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader); + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + reader->ReadRecPtr); + } + + /* + * Scan the record for block references. We might already have been + * partway through processing this record when we hit maximum I/O + * concurrency, so start where we left off. + */ + for (int i = prefetcher->next_block_id; i <= reader->max_block_id; ++i) + { + DecodedBkpBlock *block = &reader->blocks[i]; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might thing we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it + * might still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !wal_prefetch_fpw) + continue; + + /* + * If this block will initialize a new page then it's probably an + * extension. Since it might create a new segment, we can't try + * to prefetch this block until the record has been replayed, or we + * might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + reader->ReadRecPtr); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, + block->blkno)) + continue; + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat or sequential access, then skip it. We + * expect the kernel to detect sequential access on its own + * and do a better job than we could. + */ + if (block->blkno == prefetcher->last_blkno || + block->blkno == prefetcher->last_blkno + 1) + { + prefetcher->last_blkno = block->blkno; + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + switch (SharedPrefetchBuffer(reln, block->forknum, block->blkno)) + { + case PREFETCH_BUFFER_HIT: + /* It's already cached, so do nothing. */ + break; + case PREFETCH_BUFFER_MISS: + /* + * I/O has possibly been initiated (though we don't know if it + * was already cached by the kernel, so we just have to assume + * that it has due to lack of better information). Record + * this as an I/O in progress until eventually we replay this + * LSN. + */ + XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr); + /* + * If the queue is now full, we'll have to wait before + * processing any more blocks from this record. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = i + 1; + return; + } + break; + case PREFETCH_BUFFER_NOREL: + /* + * The underlying segment file doesn't exist. Presumably it + * will be unlinked by a later WAL record. When recovery + * reads this block, it will use the EXTENSION_CREATE_RECOVERY + * flag. We certainly don't want to do that sort of thing + * while merely prefetching, so let's just ignore references + * to this relation until this record is replayed, and let + * recovery create the dummy file or complain if something is + * wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + reader->ReadRecPtr); + break; + } + } + + /* Advance to the next record. */ + prefetcher->have_record = false; + } +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of the + * time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * IO, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn; + prefetcher->prefetch_head %= prefetcher->prefetch_queue_size; +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when IO really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail++; + prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size; + } +} + +/* + * Check if the maximum allowed number of IOs is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size == + prefetcher->prefetch_tail; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b217ffa52f..fad2acb514 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -827,6 +828,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + XLogReadLocalOptions *options = (XLogReadLocalOptions *) state->private_data; loc = targetPagePtr + reqLen; @@ -841,7 +843,23 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * notices recovery finishes, so we only have to maintain it for the * local process until recovery ends. */ - if (!RecoveryInProgress()) + if (options) + { + switch (options->read_upto_policy) + { + case XLRO_WALRCV_WRITTEN: + read_upto = GetWalRcvWriteRecPtr(); + break; + case XLRO_LSN: + read_upto = options->lsn; + break; + default: + read_upto = 0; + elog(ERROR, "unknown read_upto_policy value"); + break; + } + } + else if (!RecoveryInProgress()) read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); @@ -879,6 +897,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, if (loc <= read_upto) break; + if (options && options->nowait) + break; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index e3da7d3625..34f3017871 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, NULL); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 8228e1f390..2e07e2394a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1240,6 +1240,18 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"wal_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch blocks that have full page images in the WAL"), + gettext_noop("On some systems, there is no benefit to prefetching pages that will be " + "entirely overwritten, but if the logical page size of the filesystem is " + "larger than PostgreSQL's, this can be beneficial. This option has no " + "effect unless wal_prefetch_distance is set to a positive number.") + }, + &wal_prefetch_fpw, + false, + NULL, assign_wal_prefetch_fpw, NULL + }, { {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, @@ -2626,6 +2638,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"wal_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("How many bytes to read ahead in the WAL to prefetch referenced blocks."), + gettext_noop("Set to -1 to disable WAL prefetching."), + GUC_UNIT_BYTE + }, + &wal_prefetch_distance, + -1, -1, INT_MAX, + NULL, assign_wal_prefetch_distance, NULL + }, + { {"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the number of WAL files held for standby servers."), @@ -11484,6 +11507,8 @@ assign_effective_io_concurrency(int newval, void *extra) { #ifdef USE_PREFETCH target_prefetch_pages = *((int *) extra); + if (AmStartupProcess()) + ResetWalPrefetcher(); #endif /* USE_PREFETCH */ } diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 98b033fc20..0a31edfba4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -111,6 +111,8 @@ extern int wal_keep_segments; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; +extern int wal_prefetch_distance; +extern bool wal_prefetch_fpw; extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; @@ -319,6 +321,8 @@ extern void SetWalWriterSleeping(bool sleeping); extern void XLogRequestWalReceiverReply(void); +extern void ResetWalPrefetcher(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h new file mode 100644 index 0000000000..070ffc5c85 --- /dev/null +++ b/src/include/access/xlogprefetcher.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.h + * Declarations for the XLog prefetching facility + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetcher.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCHER_H +#define XLOGPREFETCHER_H + +#include "access/xlogdefs.h" + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + +extern XLogPrefetcher *XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); +extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, XLogRecPtr replaying_lsn); + +#endif diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..1c8e67d74a 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,6 +47,26 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private + * data for an xlog reader, causing read_local_xlog_page to modify its + * behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* How far to read. */ + enum { + XLRO_WALRCV_WRITTEN, + XLRO_LSN + } read_upto_policy; + + /* If read_upto_policy is XLRO_LSN, the LSN. */ + XLogRecPtr lsn; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 5d7a796ba0..6e91c33f3d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -159,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; */ #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer)) +/* + * When you try to prefetch a buffer, there are three possibilities: it's + * already cached in our buffer pool, it's not cached but we can ask the kernel + * we'll be loading it soon, or the relation file doesn't exist. + */ typedef enum PrefetchBufferResult { PREFETCH_BUFFER_HIT, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index ce93ace76c..903b0ec02b 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -438,5 +438,7 @@ extern void assign_search_path(const char *newval, void *extra); /* in access/transam/xlog.c */ extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern void assign_wal_prefetch_distance(int new_value, void *extra); +extern void assign_wal_prefetch_fpw(bool new_value, void *extra); #endif /* GUC_H */ -- 2.23.0