From 3714219dde72857c033d0029c8ef9b30ca72f574 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Tue, 7 Apr 2026 14:40:15 +0500 Subject: [PATCH v3 1/2] Pipelined Recovery - Producer This includes the producer specific code for the producer-consumer architecture for WAL replay that separates WAL decoding from the recovery process, enabling parallel processing between differemt steps of replay. The producer includes a background worker that reads and decodes WAL records, then send them to the startup process for the redo. IPC happens via shared memory message queues (shm_mq), allowing the decoder to run ahead of the apply process. This provides some improvement in recovery performance for CPU-bound workloads. New GUC: wal_pipeline (default: off) Author: Imran Zaheer Idea by: Ants Aasma --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/meson.build | 1 + src/backend/access/transam/xlogpipeline.c | 577 ++++++++++++++++++ src/backend/access/transam/xlogrecovery.c | 57 +- src/backend/postmaster/bgworker.c | 5 + src/backend/utils/misc/guc_parameters.dat | 15 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/access/xlog.h | 2 + src/include/access/xlogpipeline.h | 158 +++++ src/include/access/xlogrecovery.h | 19 + src/include/storage/subsystemlist.h | 1 + src/test/recovery/meson.build | 1 + src/test/recovery/t/053_walpipeline.pl | 208 +++++++ 13 files changed, 1036 insertions(+), 11 deletions(-) create mode 100644 src/backend/access/transam/xlogpipeline.c create mode 100644 src/include/access/xlogpipeline.h create mode 100644 src/test/recovery/t/053_walpipeline.pl diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index a32f473e0a2..ba0bf343769 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -32,6 +32,7 @@ OBJS = \ xlogbackup.o \ xlogfuncs.o \ xloginsert.o \ + xlogpipeline.o \ xlogprefetcher.o \ xlogreader.o \ xlogrecovery.o \ diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index 06aadc7f315..be37b40581d 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -20,6 +20,7 @@ backend_sources += files( 'xlogbackup.c', 'xlogfuncs.c', 'xloginsert.c', + 'xlogpipeline.c', 'xlogprefetcher.c', 'xlogrecovery.c', 'xlogstats.c', diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c new file mode 100644 index 00000000000..3f482c6ba44 --- /dev/null +++ b/src/backend/access/transam/xlogpipeline.c @@ -0,0 +1,577 @@ +/*------------------------------------------------------------------------- + * + * xlogpipeline.c + * WAL replay pipeline implementation + * + * This module implements a producer-consumer pipeline for WAL replay. + * The producer (background worker) reads and decodes WAL records in parallel + * with the consumer (startup process) that applies them. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogpipeline.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "access/heapam_xlog.h" +#include "access/rmgr.h" +#include "access/xlog.h" +#include "access/xlogpipeline.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogrecord.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "access/xlog_internal.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/startup.h" +#include "storage/bufmgr.h" +#include "storage/dsm.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/md.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "storage/smgr.h" +#include "storage/subsystems.h" +#include "tcop/tcopprot.h" +#include "utils/elog.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/rel.h" +#include "utils/timeout.h" + + + +/* Global shared memory control structure */ +WalPipelineShmCtl *WalPipelineShm = NULL; + +static void WalPipelineShmemRequest(void *arg); +static void WalPipelineShmemInit(void *arg); + +const ShmemCallbacks WalPipelineShmemCallbacks = { + .request_fn = WalPipelineShmemRequest, + .init_fn = WalPipelineShmemInit, +}; + +XLogPrefetcher *xlogprefetcher_pipelined = NULL; + +/* Local state for producer */ +static dsm_segment *producer_dsm_seg = NULL; +static shm_mq *producer_mq = NULL; +static shm_mq_handle *producer_mq_handle = NULL; + + +/* + * Flags set by interrupt handlers for later service in the redo loop. + */ +static volatile sig_atomic_t got_SIGHUP = false; + +/* Signal handlers */ +static void PipelineBgwSigHupHandler(SIGNAL_ARGS); + +/* Forward declarations */ +static void wal_pipeline_cleanup_callback(int code, Datum arg); +static Size serialize_wal_record(XLogReaderState *record, char **buffer); +static void cleanup_producer_resources(void); +static void cleanup_consumer_resources(void); + +/* copied from xlogrecovery.c */ +/* Parameters passed down from ReadRecord to the XLogPageRead callback. */ +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; + TimeLineID replayTLI; +} XLogPageReadPrivate; + + +/* + * Register shared memory for WAL Pipeline + */ +static void +WalPipelineShmemRequest(void *arg) +{ + ShmemRequestStruct(.name = "WAL Pipeline Ctl", + .size = sizeof(WalPipelineShmCtl), + .ptr = (void **) &WalPipelineShm, + ); +} + +static void +WalPipelineShmemInit(void *arg) +{ + memset(WalPipelineShm, 0, sizeof(WalPipelineShmCtl)); + + SpinLockInit(&WalPipelineShm->mutex); +} + + +/* + * Producer Function. + * Main loop for the producer background worker. + */ +void +WalPipeline_ProducerMain(Datum main_arg) +{ + dsm_handle handle = DatumGetUInt32(main_arg); + dsm_segment *seg; + shm_toc *toc; + WalPipelineParams *params; + XLogReaderState *xlogreader; + XLogPageReadPrivate *private; + XLogRecord *record; + TimeLineID replayTLI = 0; + bool end_of_wal = false; + uint64 records_sent; + uint64 records_received; + + /* + * Properly accept or ignore signals the postmaster might send us. + */ + pqsignal(SIGHUP, PipelineBgwSigHupHandler); /* reload config file */ + + /* Register cleanup callback */ + before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0); + + seg = dsm_attach(handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("[walpipeline] producer: could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment"))); + + /* Lookup params and queue */ + params = shm_toc_lookup(toc, 1, false); + producer_mq = shm_toc_lookup(toc, 2, false); + + /* Set up producer side of queue */ + producer_dsm_seg = seg; + shm_mq_set_sender(producer_mq, MyProc); + producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL); + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = MyProcPid; + SpinLockRelease(&WalPipelineShm->mutex); + + /* DSM is now attached, so safe to unblock the signals */ + BackgroundWorkerUnblockSignals(); + + /* Set up WAL reading processor */ + private = palloc0(sizeof(XLogPageReadPrivate)); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + private); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->system_identifier = GetSystemIdentifier(); + + /* + * Set the WAL decode buffer size. This limits how far ahead we can read + * in the WAL. + */ + XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); + + /* Init some important globals before starting */ + replayTLI = params->ReplayTLI; + InitializePipelineRecoveryEnv(params); + + /* Reinit the WAL prefetcher. */ + xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader); + + + elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u", + LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI); + + XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr); + + /* Main decoding loop */ + while (true) + { + bool shutdown_requested; + + /* Check if consumer requested shutdown */ + SpinLockAcquire(&WalPipelineShm->mutex); + shutdown_requested = WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + if (shutdown_requested) + { + elog(DEBUG1, "[walpipeline] producer: shutdown requested by consumer"); + break; + } + + /* Read next WAL record */ + record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI); + + if (record == NULL) + { + end_of_wal = true; + elog(DEBUG1, "[walpipeline] producer: reached end of WAL"); + break; + } + + /* + * Successfully decoded a record. Send it to the consumer. + */ + if (!WalPipeline_SendRecord(xlogreader)) + { + elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached"); + break; + } + + /* Update our position for monitoring */ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_lsn = xlogreader->EndRecPtr; + SpinLockRelease(&WalPipelineShm->mutex); + + CHECK_FOR_INTERRUPTS(); + } + + + if (end_of_wal) + { + /* Notify consumer we need to exit early */ + WalPipeline_SendShutdown(); + + /* wait until consumer set the flag */ + WalPipeline_WaitForConsumerShutdownRequest(); + } + + SpinLockAcquire(&WalPipelineShm->mutex); + records_sent = WalPipelineShm->records_sent; + records_received = WalPipelineShm->records_received; + SpinLockRelease(&WalPipelineShm->mutex); + + elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT, + records_sent, records_received); + + /* Cleanup */ + XLogReaderFree(xlogreader); + DisownRecoveryWakeupLatch(); + pfree(private); + cleanup_producer_resources(); +} + +/* + * Producer Function. + * Send a decoded WAL record to the consumer + */ +bool +WalPipeline_SendRecord(XLogReaderState *record) +{ + char *buffer = NULL; + Size msglen; + shm_mq_result res; + + + if (!producer_mq_handle) + return false; + + /* Serialize the record */ + msglen = serialize_wal_record(record, &buffer); + + res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true); + + if (res == SHM_MQ_SUCCESS) + { + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->records_sent++; + WalPipelineShm->bytes_sent += msglen; + SpinLockRelease(&WalPipelineShm->mutex); + + pfree(buffer); + return true; + } + + if (res == SHM_MQ_DETACHED) + { + elog(PANIC, "[walpipeline] producer: consumer detached"); + pfree(buffer); + return false; + } + + /* Some other error */ + elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res); + pfree(buffer); + return false; +} + +/* + * Producer Function. + * Send shutdown message to consumer + */ +bool +WalPipeline_SendShutdown(void) +{ + WalRecordMsgHeader hdr; + shm_mq_result res; + + if (!producer_mq_handle) + return false; + + hdr.msg_type = WAL_MSG_SHUTDOWN; + hdr.endRecPtr = InvalidXLogRecPtr; + + res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true); + return (res == SHM_MQ_SUCCESS); +} + +/* + * Producer Function. + * Send error message to consumer + */ +bool +WalPipeline_SendError(int errcode, const char *errmsg) +{ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->error_code = errcode; + strlcpy(WalPipelineShm->error_message, errmsg, + sizeof(WalPipelineShm->error_message)); + SpinLockRelease(&WalPipelineShm->mutex); + + return true; +} + + +/* + * Producer Function. + * Producer may can exit without waiting for the consumer, but its better to + * wait until consumer request shutdown. This way log messages will show + * no of records_sent & records_received records equal to each other. + */ +static void +WalPipeline_WaitForConsumerShutdownRequest(void) +{ + int iters = 0; + + while (true) + { + bool shutdown_requested; + + SpinLockAcquire(&WalPipelineShm->mutex); + shutdown_requested = WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + if (shutdown_requested) + break; + + if (++iters >= MAX_SHUTDOWN_WAIT_ITERS) + { + elog(WARNING, + "[walpipeline] producer: timed out waiting for consumer " + "to acknowledge shutdown, exiting anyway"); + break; + } + + /* Allow SIGTERM / SIGHUP to interrupt the wait */ + ProcessPipelineBgwInterrupts(); + + pg_usleep(10000); /* sleep 10ms */ + } +} + + + +/* + * serialize_wal_record (Producer) + * + * Pack a WalRecordMsgHeader followed by the DecodedXLogRecord into a + * contiguous buffer, converting interior pointers to relative offsets. + * + * Output buffer layout: + * [WalRecordMsgHeader][DecodedXLogRecord + trailing data] + */ +static Size +serialize_wal_record(XLogReaderState *xlogreader, char **outbuf) +{ + DecodedXLogRecord *dec = xlogreader->record; + DecodedXLogRecord *dec_copy; + WalRecordMsgHeader hdr; + Size payload_size = dec->size; + Size total = sizeof(WalRecordMsgHeader) + payload_size; + char *buf = palloc(total); + + /* build header */ + hdr.msg_type = WAL_MSG_RECORD; + hdr.readRecPtr = xlogreader->ReadRecPtr; + hdr.endRecPtr = xlogreader->EndRecPtr; + hdr.missingContrecPtr = xlogreader->missingContrecPtr; + hdr.abortedRecPtr = xlogreader->abortedRecPtr; + hdr.overwrittenRecPtr = xlogreader->overwrittenRecPtr; + hdr.decoded_size = payload_size; + + memcpy(buf, &hdr, sizeof(hdr)); + memcpy(buf + sizeof(hdr), dec, payload_size); + + /* + * Fix up the interior pointers: main_data and each block's data/bkp_image + * are absolute addresses in the producer. Convert them to byte offsets + * from the start of the copied DecodedXLogRecord so the consumer can + * reconstruct them. + */ + dec_copy = (DecodedXLogRecord *)(buf + sizeof(hdr)); + + if (dec_copy->main_data_len > 0) + dec_copy->main_data = (char *)((char *)dec->main_data - (char *)dec); + + for (int i = 0; i <= dec_copy->max_block_id; i++) + { + DecodedBkpBlock *blk = &dec_copy->blocks[i]; + if (!blk->in_use) + continue; + if (blk->has_data) + blk->data = (char *)((char *)dec->blocks[i].data - (char *)dec); + if (blk->has_image) + blk->bkp_image = (char *)((char *)dec->blocks[i].bkp_image - (char *)dec); + } + + *outbuf = buf; + return total; +} + + +/* + * Clean up producer-side resources + */ +static void +cleanup_producer_resources(void) +{ + if (producer_mq_handle) + { + shm_mq_detach(producer_mq_handle); + producer_mq_handle = NULL; + } + + if (producer_dsm_seg) + { + dsm_detach(producer_dsm_seg); + producer_dsm_seg = NULL; + } + + producer_mq = NULL; + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = 0; + WalPipelineShm->producer_exited = true; + SpinLockRelease(&WalPipelineShm->mutex); +} + + +/* + * Cleanup callback for process exit + */ +static void +wal_pipeline_cleanup_callback(int code, Datum arg) +{ + pid_t mypid = MyProcPid; + bool is_producer = false; + + if (WalPipelineShm) + { + SpinLockAcquire(&WalPipelineShm->mutex); + is_producer = (WalPipelineShm->producer_pid == mypid); + SpinLockRelease(&WalPipelineShm->mutex); + } + + if (is_producer) + cleanup_producer_resources(); + else + cleanup_consumer_resources(); +} + +/* -------------------------------- + * signal handler routines + * -------------------------------- + */ + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +PipelineBgwSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; + WakeupRecovery(); +} + +/* + * Re-read the config file. + * + * If one of the critical walreceiver options has changed, flag xlogrecovery.c + * to restart it. + */ +static void +PipelineRereadConfig(void) +{ + char *conninfo = pstrdup(PrimaryConnInfo); + char *slotname = pstrdup(PrimarySlotName); + bool tempSlot = wal_receiver_create_temp_slot; + bool conninfoChanged; + bool slotnameChanged; + bool tempSlotChanged = false; + + ProcessConfigFile(PGC_SIGHUP); + + conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0; + slotnameChanged = strcmp(slotname, PrimarySlotName) != 0; + + /* + * wal_receiver_create_temp_slot is used only when we have no slot + * configured. We do not need to track this change if it has no effect. + */ + if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0) + tempSlotChanged = tempSlot != wal_receiver_create_temp_slot; + pfree(conninfo); + pfree(slotname); + + if (conninfoChanged || slotnameChanged || tempSlotChanged) + StartupRequestWalReceiverRestart(); +} + +/* + * Process any requests or signals received recently. + */ +void +ProcessPipelineBgwInterrupts(void) +{ + + bool shutdown_requested; + + if (got_SIGHUP) + { + got_SIGHUP = false; + PipelineRereadConfig(); + } + + SpinLockAcquire(&WalPipelineShm->mutex); + shutdown_requested = WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + if (shutdown_requested) + proc_exit(0); + + CHECK_FOR_INTERRUPTS(); +} \ No newline at end of file diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index c236e2b7969..88c0b43ac76 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -35,6 +35,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" +#include "access/xlogpipeline.h" #include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogrecovery.h" @@ -100,6 +101,8 @@ int recovery_min_apply_delay = 0; char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; bool wal_receiver_create_temp_slot = false; +bool wal_pipeline_enabled = false; +int wal_pipeline_mq_size_mb = 128; /* * recoveryTargetTimeLineGoal: what the user requested, if any @@ -206,17 +209,6 @@ typedef struct XLogPageReadPrivate /* flag to tell XLogPageRead that we have started replaying */ static bool InRedo = false; -/* - * Codes indicating where we got a WAL file from during recovery, or where - * to attempt to get one. - */ -typedef enum -{ - XLOG_FROM_ANY = 0, /* request to read WAL from any source */ - XLOG_FROM_ARCHIVE, /* restored using restore_command */ - XLOG_FROM_PG_WAL, /* existing file in pg_wal */ - XLOG_FROM_STREAM, /* streamed from primary */ -} XLogSource; /* human-readable names for XLogSources, for debugging output */ static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"}; @@ -4739,6 +4731,49 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue } +/* + * Pipeline bgw should be aware of all the parameters thats been initialized by + * the startup process before performing the actual recoevery. Other then this + * there are also some variables that keep on changing. The pipeline & the startup + * process should be aware of the state change of such variables, we can use shared + * memory for such variables. + */ +void +InitializePipelineRecoveryEnv(WalPipelineParams *params) +{ + StandbyMode = params->StandbyMode; + StandbyModeRequested = params->StandbyModeRequested; + ArchiveRecoveryRequested = params->ArchiveRecoveryRequested; + InArchiveRecovery = params->InArchiveRecovery; + recoveryTargetTLI = params->recoveryTargetTLI; + minRecoveryPointTLI = params->minRecoveryPointTLI; + minRecoveryPoint = params->minRecoveryPoint; + currentSource = params->currentSource; + lastSourceFailed = params->lastSourceFailed; + pendingWalRcvRestart = params->pendingWalRcvRestart; + RedoStartTLI = params->RedoStartTLI; + RedoStartLSN = params->RedoStartLSN; + standbyState = params->standbyState; + CheckPointLoc = params->CheckPointLoc; + CheckPointTLI = params->CheckPointTLI; + flushedUpto = params->flushedUpto; + receiveTLI = params->receiveTLI; + abortedRecPtr = params->abortedRecPtr; + missingContrecPtr = params->missingContrecPtr; + InRedo = params->InRedo; + backupEndRequired = params->backupEndRequired; + backupStartPoint = params->backupStartPoint; + backupEndPoint = params->backupEndPoint; + curFileTLI = params->curFileTLI; + InRecovery = true; + + /* + * As pipeline will be reading the wal, so better to own the latch to wait at. + */ + if (ArchiveRecoveryRequested) + OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch); +} + /* * GUC check_hook for primary_slot_name */ diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 3914d22a514..b118bf11505 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/xlogpipeline.h" #include "commands/repack.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -166,6 +167,10 @@ static const struct { .fn_name = "DataChecksumsWorkerMain", .fn_addr = DataChecksumsWorkerMain + }, + { + .fn_name = "WalPipeline_ProducerMain", + .fn_addr = WalPipeline_ProducerMain } }; diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index fcb6ab80583..b1ae94215f3 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -3462,6 +3462,21 @@ boot_val => 'false', }, +{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY', + short_desc => 'Use parallel workers to speedup recovery.', + variable => 'wal_pipeline_enabled', + boot_val => 'false', +}, + +{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY', + short_desc => 'Size of the shared memory queue used by the WAL pipeline.', + flags => 'GUC_UNIT_MB', + variable => 'wal_pipeline_mq_size_mb', + boot_val => '128', + min => '1', + max => '1024', +}, + { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.', variable => 'wal_receiver_create_temp_slot', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e3e462f3efb..2b130cfbd6b 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -280,6 +280,8 @@ #recovery_prefetch = try # prefetch pages referenced in the WAL? #wal_decode_buffer_size = 512kB # lookahead window used for prefetching # (change requires restart) +#wal_pipeline = off # decode in parallel +#wal_pipeline_queue_size = 128MB # - Archiving - diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 437b4f32349..9465e52cf2b 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb; extern PGDLLIMPORT int max_wal_size_mb; extern PGDLLIMPORT int wal_keep_size_mb; extern PGDLLIMPORT int max_slot_wal_keep_size_mb; +extern PGDLLIMPORT int wal_pipeline_mq_size_mb; +extern PGDLLIMPORT bool wal_pipeline_enabled; extern PGDLLIMPORT int XLOGbuffers; extern PGDLLIMPORT int XLogArchiveTimeout; extern PGDLLIMPORT int wal_retrieve_retry_interval; diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h new file mode 100644 index 00000000000..b4cde273e1c --- /dev/null +++ b/src/include/access/xlogpipeline.h @@ -0,0 +1,158 @@ +/*------------------------------------------------------------------------- + * + * xlogpipeline.h + * WAL replay pipeline for parallel recovery + * + * This module implements a producer-consumer pipeline for WAL replay: + * - Producer: background worker that reads and decodes WAL records + * - Consumer: startup process: core redo loop + * + * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL + * records from producer to consumer, enabling parallelism while + * maintaining sequential replay semantics. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * + * src/include/access/xlogpipeline.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAL_PIPELINE_H +#define WAL_PIPELINE_H + +#include "access/xlogreader.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" + +/* + * Magic number for shared memory TOC + */ +#define PG_WAL_PIPELINE_MAGIC 0x57414C50 /* "WALP" */ + +/* + * Message types sent through the pipeline + */ +typedef enum WalMsgType +{ + WAL_MSG_INVALID = 0, + WAL_MSG_RECORD, /* Decoded WAL record */ + WAL_MSG_SHUTDOWN, /* Graceful shutdown request */ + WAL_MSG_ERROR /* Error occurred in producer */ +} WalMsgType; + +/* Wire header for a serialized WAL message */ +typedef struct WalRecordMsgHeader +{ + WalMsgType msg_type; /* WAL_MSG_RECORD etc */ + uint32 decoded_size; /* byte length of the payload that follows */ + XLogRecPtr readRecPtr; /* XLogReaderState->ReadRecPtr */ + XLogRecPtr endRecPtr; /* XLogReaderState->EndRecPtr */ + XLogRecPtr missingContrecPtr; /* XLogReaderState->missingContrecPtr */ + XLogRecPtr abortedRecPtr; /* XLogReaderState->abortedRecPtr */ + XLogRecPtr overwrittenRecPtr; /* XLogReaderState->overwrittenRecPtr */ +} WalRecordMsgHeader; + +/* + * Parameters passed from StartupXLOG (consumer side) + * to the WAL pipeline producer background worker. + */ +typedef struct WalPipelineParams +{ + bool StandbyMode; + bool StandbyModeRequested; + bool ArchiveRecoveryRequested; + bool InArchiveRecovery; + bool InRedo; + bool lastSourceFailed; + bool pendingWalRcvRestart; + bool backupEndRequired; + + TimeLineID RedoStartTLI; + TimeLineID CheckPointTLI; + TimeLineID recoveryTargetTLI; + TimeLineID minRecoveryPointTLI; + TimeLineID ReplayTLI; + TimeLineID receiveTLI; + + XLogRecPtr backupStartPoint; + XLogRecPtr backupEndPoint; + XLogRecPtr CheckPointLoc; + XLogRecPtr RedoStartLSN; + XLogRecPtr NextRecPtr; + XLogRecPtr minRecoveryPoint; + XLogRecPtr flushedUpto; + XLogRecPtr abortedRecPtr; + XLogRecPtr missingContrecPtr; + + int readFile; + XLogSegNo readSegNo; + uint32 readOff; + uint32 readLen; + XLogSource readSource; + TimeLineID curFileTLI; + + + HotStandbyState standbyState; + XLogSource currentSource; + +} WalPipelineParams; + +/* + * Shared memory control structure for the WAL pipeline + */ +typedef struct WalPipelineShmCtl +{ + /* Lifecycle management */ + slock_t mutex; + bool initialized; + bool shutdown_requested; + bool producer_exited; + + /* Producer state */ + pid_t producer_pid; + ProcNumber producer_procnum; + XLogRecPtr producer_lsn; /* Last LSN read by producer */ + + /* Consumer state */ + pid_t consumer_pid; + XLogRecPtr consumer_lsn; /* Last LSN recieved by consumer */ + XLogRecPtr applied_lsn; /* Last LSN applied by consumer */ + + /* Queue handles */ + dsm_handle dsm_seg_handle; + shm_mq_handle *producer_mq_handle; + shm_mq_handle *consumer_mq_handle; + + /* Statistics */ + uint64 records_sent; + uint64 records_received; + uint64 bytes_sent; + uint64 bytes_received; + + /* Error state */ + int error_code; + char error_message[256]; +} WalPipelineShmCtl; + +/* consumer may have to compute prefetecher stats */ +extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined; + +/* + * Public API functions + */ + +/* Producer functions (called by background worker) */ +extern void WalPipeline_ProducerMain(Datum main_arg); +extern bool WalPipeline_SendRecord(XLogReaderState *record); +extern bool WalPipeline_SendShutdown(void); +extern bool WalPipeline_SendError(int errcode, const char *errmsg); + +extern void ProcessPipelineBgwInterrupts(void); + +/* Global shared memory pointer */ +extern WalPipelineShmCtl *WalPipelineShm; + +#endif /* WAL_PIPELINE_H */ \ No newline at end of file diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index ba7750dca0b..8b943b8f395 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -11,6 +11,7 @@ #ifndef XLOGRECOVERY_H #define XLOGRECOVERY_H +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" @@ -60,6 +61,17 @@ typedef enum RecoveryPauseState RECOVERY_PAUSED, /* recovery is paused */ } RecoveryPauseState; +/* Codes indicating where we got a WAL file from during recovery, or where + * to attempt to get one. + */ +typedef enum +{ + XLOG_FROM_ANY = 0, /* request to read WAL from any source */ + XLOG_FROM_ARCHIVE, /* restored using restore_command */ + XLOG_FROM_PG_WAL, /* existing file in pg_wal */ + XLOG_FROM_STREAM, /* streamed from primary */ +} XLogSource; + /* * Shared-memory state for WAL recovery. */ @@ -205,6 +217,8 @@ typedef struct bool recovery_signal_file_found; } EndOfWalRecoveryInfo; +struct WalPipelineParams; /* forward declaration */ + extern EndOfWalRecoveryInfo *FinishWalRecovery(void); extern void ShutdownWalRecovery(void); extern void RemovePromoteSignalFiles(void); @@ -217,6 +231,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI); +extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, + bool fetching_ckpt, TimeLineID replayTLI); +extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf); extern bool PromoteIsTriggered(void); extern bool CheckPromoteSignal(void); @@ -226,6 +244,7 @@ extern void StartupRequestWalReceiverRestart(void); extern void XLogRequestWalReceiverReply(void); extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue); +extern void InitializePipelineRecoveryEnv(struct WalPipelineParams *params); extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); diff --git a/src/include/storage/subsystemlist.h b/src/include/storage/subsystemlist.h index 9ad619080be..e9ff6de9a1a 100644 --- a/src/include/storage/subsystemlist.h +++ b/src/include/storage/subsystemlist.h @@ -42,6 +42,7 @@ PG_SHMEM_SUBSYSTEM(MultiXactShmemCallbacks) PG_SHMEM_SUBSYSTEM(BufferManagerShmemCallbacks) PG_SHMEM_SUBSYSTEM(StrategyCtlShmemCallbacks) PG_SHMEM_SUBSYSTEM(BufTableShmemCallbacks) +PG_SHMEM_SUBSYSTEM(WalPipelineShmemCallbacks) /* lock manager */ PG_SHMEM_SUBSYSTEM(LockManagerShmemCallbacks) diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 36d789720a3..fc23acbaec2 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -61,6 +61,7 @@ tests += { 't/050_redo_segment_missing.pl', 't/051_effective_wal_level.pl', 't/052_checkpoint_segment_missing.pl', + 't/053_walpipeline.pl', ], }, } diff --git a/src/test/recovery/t/053_walpipeline.pl b/src/test/recovery/t/053_walpipeline.pl new file mode 100644 index 00000000000..2fb790aadc5 --- /dev/null +++ b/src/test/recovery/t/053_walpipeline.pl @@ -0,0 +1,208 @@ +# Copyright (c) 2025-2026, PostgreSQL Global Development Group +# +# Tests for the WAL pipeline feature (wal_pipeline GUC). + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# ---------- +# Helpers +# ---------- + +sub slurp_log +{ + my ($node) = @_; + open(my $fh, '<', $node->logfile()) or die "Cannot open log: $!"; + my @lines = <$fh>; + close($fh); + return @lines; +} + +sub log_matches +{ + my ($node, $re) = @_; + return grep { /$re/ } slurp_log($node); +} + + +# ######################################## +# wal_pipeline = on, basic recovery +# ######################################## + +my $node1 = PostgreSQL::Test::Cluster->new('p1-recovery'); +$node1->init; +$node1->start; + +$node1->safe_psql('postgres', q{ + CREATE TABLE t (id serial PRIMARY KEY, v text); + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# generate more WAL +$node1->safe_psql('postgres', q{ + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# crash stop to force WAL recovery +$node1->stop('immediate'); + +# restart → recovery happens +$node1->append_conf('postgresql.conf', "wal_pipeline = on"); +$node1->start; + + +# Producer started +ok(scalar log_matches($node1, qr/\[walpipeline\] producer: started at/), + 'producer started message found in log'); + +# Pipeline stopped cleanly +ok(scalar log_matches($node1, qr/\[walpipeline\] stopped/), + 'pipeline stopped message found in log'); + +# Consumer received shutdown from producer +ok(scalar log_matches($node1, qr/\[walpipeline\] consumer: received shutdown message/), + 'consumer received shutdown message from producer'); + +# sent == received +my @exit_lines = log_matches($node1, + qr/\[walpipeline\] producer: exiting: sent=\d+ received=\d+/); +ok(scalar @exit_lines >= 1, 'producer exiting line found in log'); + +my ($sent, $recv) = $exit_lines[-1] =~ /sent=(\d+) received=(\d+)/; +ok(defined $sent && $sent > 0, "sent count ($sent) is positive"); +ok(defined $recv && $recv > 0, "received count ($recv) is positive"); +is($sent, $recv, "no records lost in pipeline queue: sent=$sent received=$recv"); + +# No PANIC +ok(!(scalar log_matches($node1, qr/\bPANIC\b/)), + 'no PANIC messages during pipeline recovery'); + +# Data integrity +my $count = $node1->safe_psql('postgres', 'SELECT count(*) FROM t'); +is($count + 0, 100_000, 'all 100000 rows visible after pipeline recovery'); + +$node1->stop; + +# ############################################################## +# wal_pipeline = off (baseline, no pipeline log messages) +# ############################################################## + +my $node2 = PostgreSQL::Test::Cluster->new('p0-recovery'); +$node2->init; +$node2->start; + +$node2->safe_psql('postgres', q{ + CREATE TABLE t (id serial PRIMARY KEY, v text); + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# generate more WAL +$node2->safe_psql('postgres', q{ + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# crash stop to force WAL recovery +$node2->stop('immediate'); + +# restart → recovery happens +$node2->append_conf('postgresql.conf', "wal_pipeline = off"); +$node2->start; + +ok(!(scalar log_matches($node2, qr/\[walpipeline\] producer: started/)), + 'no pipeline log messages when wal_pipeline = off'); + +my $count2 = $node2->safe_psql('postgres', 'SELECT count(*) FROM t'); +is($count2 + 0, 100_000, 'all rows present after non-pipeline recovery'); + +$node2->stop; + + + +# ################################################################### +# pipeline on vs off produce identical data (checksum comparison) +# ################################################################### + +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 1); +$primary->start; + +$primary->safe_psql('postgres', q{ + CREATE TABLE t (id serial PRIMARY KEY, v text); + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1, 30000) i; +}); + +$primary->backup('backup3'); + +$primary->safe_psql('postgres', q{ + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1, 30000) i; + UPDATE t SET v = 'x' WHERE id % 10 = 0; +}); + +# ensure WAL boundary +$primary->safe_psql('postgres', 'SELECT pg_switch_wal()'); +my $target_lsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_lsn()'); + +my $replica_on = PostgreSQL::Test::Cluster->new('replica_p1'); +$replica_on->init_from_backup($primary, 'backup3', + has_streaming => 1); +$replica_on->append_conf('postgresql.conf', "wal_pipeline = on\n"); +$replica_on->start; + +my $replica_off = PostgreSQL::Test::Cluster->new('replica_p0'); +$replica_off->init_from_backup($primary, 'backup3', + has_streaming => 1); +$replica_off->append_conf('postgresql.conf', "wal_pipeline = off\n"); +$replica_off->start; + +# wait for replicas to catch up +$primary->wait_for_catchup($replica_on); +$primary->wait_for_catchup($replica_off); + +my $md5_on = $replica_on->safe_psql('postgres', + "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t"); + +my $md5_off = $replica_off->safe_psql('postgres', + "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t"); + +is($md5_on, $md5_off, + 'table checksum identical between pipeline=on and pipeline=off'); + +$replica_on->stop; +$replica_off->stop; +$primary->stop('fast'); + + + +# ################################# +# pipeline when on tiny replay +# ################################# + +my $node3 = PostgreSQL::Test::Cluster->new('p1-small-replay'); +$node3->init; +$node3->start; + +# crash stop to force WAL recovery +$node3->stop('immediate'); + +# restart → recovery happens +$node3->append_conf('postgresql.conf', "wal_pipeline = on"); +$node3->start; + +ok(scalar log_matches($node3, qr/\[walpipeline\] producer: exiting: sent=0 received=0/), + 'pipeline producer sent zero records'); + +ok((scalar log_matches($node3, qr/redo done at/)), + 'pipeline redo done even with tiny replay'); + +$node3->stop; + +done_testing(); \ No newline at end of file -- 2.49.0