From d8f11b8382310eac70259e29d996e13435c40902 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Thu, 29 Jan 2026 20:07:43 +0500 Subject: [PATCH v1] Pipelined Recoveries Implement a producer-consumer architecture for WAL replay that separates WAL decoding from recovery process, enabling parallel processing between differemt steps of replay. A background worker reads and decodes WAL records while the startup process applies them. IPC happens via shared memory message queues (shm_mq), allowing the decoder to run ahead of the apply process. This provides some improvement in recovery performance for CPU-bound workloads. New GUC: wal_pipeline (default: off) Author: Imran Zaheer Idea by: Ants Aasma --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/wal_pipeline.c | 1004 +++++++++++++++++++ src/backend/access/transam/xlogprefetcher.c | 2 +- src/backend/access/transam/xlogrecovery.c | 143 ++- src/backend/postmaster/bgworker.c | 4 + src/backend/storage/ipc/ipci.c | 5 + src/backend/utils/misc/guc_parameters.dat | 6 + src/include/access/wal_pipeline.h | 184 ++++ src/include/access/xlogrecovery.h | 12 + src/include/utils/guc.h | 2 + 10 files changed, 1347 insertions(+), 16 deletions(-) create mode 100644 src/backend/access/transam/wal_pipeline.c create mode 100644 src/include/access/wal_pipeline.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index a32f473e0a2..2cd0425f7a1 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -25,6 +25,7 @@ OBJS = \ transam.o \ twophase.o \ twophase_rmgr.o \ + wal_pipeline.o \ varsup.o \ xact.o \ xlog.o \ diff --git a/src/backend/access/transam/wal_pipeline.c b/src/backend/access/transam/wal_pipeline.c new file mode 100644 index 00000000000..a27d9f2083f --- /dev/null +++ b/src/backend/access/transam/wal_pipeline.c @@ -0,0 +1,1004 @@ +/*------------------------------------------------------------------------- + * + * wal_pipeline.c + * WAL replay pipeline implementation + * + * This module implements a producer-consumer pipeline for WAL replay. + * The producer (background worker) reads and decodes WAL records in parallel + * with the consumer (startup process) that applies them. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/wal_pipeline.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "access/heapam_xlog.h" +#include "access/rmgr.h" +#include "access/wal_pipeline.h" +#include "access/xlog.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogrecord.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "access/xlog_internal.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/bufmgr.h" +#include "storage/dsm.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/md.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "storage/smgr.h" +#include "tcop/tcopprot.h" +#include "utils/elog.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/rel.h" + + +/* Global shared memory control structure */ +WalPipelineShmCtl *WalPipelineShm = NULL; + +/* Local state for producer */ +static dsm_segment *producer_dsm_seg = NULL; +static shm_mq *producer_mq = NULL; +static shm_mq_handle *producer_mq_handle = NULL; + +/* Local state for consumer */ +static dsm_segment *consumer_dsm_seg = NULL; +static shm_mq *consumer_mq = NULL; +static shm_mq_handle *consumer_mq_handle = NULL; + +/* Forward declarations */ +static void wal_pipeline_cleanup_callback(int code, Datum arg); +static Size serialize_wal_record(XLogReaderState *record, char **buffer); +static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len); +static void cleanup_producer_resources(void); +static void cleanup_consumer_resources(void); + +/* copeied from xlogrecovery.c */ +/* Parameters passed down from ReadRecord to the XLogPageRead callback. */ +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; + TimeLineID replayTLI; +} XLogPageReadPrivate; + +/* + * WalPipelineShmemSize + * Compute space needed for WAL pipeline shared memory + */ +Size +WalPipelineShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(WalPipelineShmCtl)); + + return size; +} + +/* + * WalPipelineShmemInit + * Initialize WAL pipeline shared memory structures + */ +void +WalPipelineShmemInit(void) +{ + bool found; + + WalPipelineShm = (WalPipelineShmCtl *) + ShmemInitStruct("WAL Pipeline Control", + sizeof(WalPipelineShmCtl), + &found); + + if (!found) + { + /* First time through, initialize */ + SpinLockInit(&WalPipelineShm->mutex); + WalPipelineShm->initialized = false; + WalPipelineShm->shutdown_requested = false; + WalPipelineShm->producer_exited = false; + WalPipelineShm->producer_pid = 0; + WalPipelineShm->consumer_pid = 0; + WalPipelineShm->producer_lsn = InvalidXLogRecPtr; + WalPipelineShm->consumer_lsn = InvalidXLogRecPtr; + WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID; + WalPipelineShm->producer_mq_handle = NULL; + WalPipelineShm->consumer_mq_handle = NULL; + WalPipelineShm->records_sent = 0; + WalPipelineShm->records_received = 0; + WalPipelineShm->bytes_sent = 0; + WalPipelineShm->bytes_received = 0; + WalPipelineShm->error_code = 0; + WalPipelineShm->error_message[0] = '\0'; + } +} + + +/* + * WalPipeline_Start + * Initialize and start the WAL pipeline + */ +void +WalPipeline_Start(WalPipelineParams *params) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle; + dsm_segment *seg; + shm_toc_estimator e; + shm_toc *toc; + Size segsize; + shm_mq *mq; + WalPipelineParams *shared_params; + pid_t pid; + BgwHandleStatus status; + + SpinLockAcquire(&WalPipelineShm->mutex); + + if (WalPipelineShm->initialized) + { + SpinLockRelease(&WalPipelineShm->mutex); + return; /* Already started */ + } + + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams)); + shm_toc_estimate_chunk(&e, WAL_PIPELINE_QUEUE_SIZE); + shm_toc_estimate_keys(&e, 2); /* key=1 → params, key=2 → mq */ + segsize = shm_toc_estimate(&e); + + seg = dsm_create(segsize, 0); + dsm_pin_segment(seg); + + toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC, + dsm_segment_address(seg), segsize); + + /* + * These are some global variables from xlogrecovery.c that our + * pipeline should be aware of, so passing it through the shmem. + * + * WalPipelineParams is shared-memory safe and contains + * only plain data (no pointers). + */ + shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams)); + shm_toc_insert(toc, 1, shared_params); + *shared_params = *params; + + /* create the message queue */ + mq = shm_mq_create(shm_toc_allocate(toc, WAL_PIPELINE_QUEUE_SIZE), + WAL_PIPELINE_QUEUE_SIZE); + shm_toc_insert(toc, 2, mq); + + /* update shared state */ + WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg); + WalPipelineShm->consumer_pid = MyProcPid; + WalPipelineShm->initialized = true; + + SpinLockRelease(&WalPipelineShm->mutex); + + /* Set up consumer side of the queue */ + consumer_dsm_seg = seg; + consumer_mq = mq; + shm_mq_set_receiver(consumer_mq, MyProc); + + /* Register cleanup callback */ + before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0); + + /* Register background worker */ + memset(&worker, 0, sizeof(worker)); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_PostmasterStart; + worker.bgw_restart_time = BGW_NEVER_RESTART; + sprintf(worker.bgw_library_name, "postgres"); + sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain"); + snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer"); + snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background worker for WAL pipeline"))); + + status = WaitForBackgroundWorkerStartup(handle, &pid); + + if (status != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background process"), + errhint("More details may be available in the server log."))); + else + ereport(LOG, (errmsg("[walpipeline] started."))); + + consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL); +} + +/* + * WalPipeline_Stop + * Stop the WAL pipeline + */ +void +WalPipeline_Stop(void) +{ + if (!WalPipelineShm || !WalPipelineShm->initialized) + return; + + WalPipeline_RequestShutdown(); + + /* Wait for producer to exit (with timeout) */ + for (int i = 0; i < 100; i++) /* 10 second timeout */ + { + SpinLockAcquire(&WalPipelineShm->mutex); + if (WalPipelineShm->producer_pid == 0) + { + SpinLockRelease(&WalPipelineShm->mutex); + break; + } + SpinLockRelease(&WalPipelineShm->mutex); + + pg_usleep(100000); /* 100ms */ + } + + cleanup_consumer_resources(); + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->initialized = false; + SpinLockRelease(&WalPipelineShm->mutex); + + elog(LOG, "WAL pipeline stopped"); +} + +/* + * WalPipeline_RequestShutdown + * Request graceful shutdown of the pipeline + */ +void +WalPipeline_RequestShutdown(void) +{ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->shutdown_requested = true; + SpinLockRelease(&WalPipelineShm->mutex); + + /* Send shutdown message if queue is available */ + if (consumer_mq_handle) + WalPipeline_SendShutdown(); +} + +/* + * WalPipeline_ProducerMain + * Main loop for the producer background worker + */ +void +WalPipeline_ProducerMain(Datum main_arg) +{ + dsm_handle handle = DatumGetUInt32(main_arg); + dsm_segment *seg; + shm_toc *toc; + WalPipelineParams *params; + XLogReaderState *xlogreader; + XLogPrefetcher *xlogprefetcher; + XLogPageReadPrivate *private; + XLogRecord *record; + TimeLineID replayTLI = 0; + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + seg = dsm_attach(handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("[walpipeline] producer: could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment"))); + + /* Lookup params and queue */ + params = shm_toc_lookup(toc, 1, false); + producer_mq = shm_toc_lookup(toc, 2, false); + + /* Set up producer side of queue */ + producer_dsm_seg = seg; + shm_mq_set_sender(producer_mq, MyProc); + producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL); + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = MyProcPid; + SpinLockRelease(&WalPipelineShm->mutex); + + /* Set up WAL reading processor */ + private = palloc0(sizeof(XLogPageReadPrivate)); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + private); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->system_identifier = GetSystemIdentifier(); + + /* + * Set the WAL decode buffer size. This limits how far ahead we can read + * in the WAL. + */ + XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); + + /* Create a WAL prefetcher. */ + xlogprefetcher = XLogPrefetcherAllocate(xlogreader); + + /* Init some important globals before starting */ + StandbyModeRequested = params->StandbyModeRequested; + ArchiveRecoveryRequested = params->ArchiveRecoveryRequested; + InArchiveRecovery = params->InArchiveRecovery; + replayTLI = params->ReplayTLI; + recoveryTargetTLI = params->recoveryTargetTLI; + minRecoveryPointTLI = params->minRecoveryPointTLI; + minRecoveryPoint = params->minRecoveryPoint; + InRedo = params->InRedo; + InRecovery = true; + + elog(DEBUG1, "[walpipeline] producer: started at %X/%X, TLI %u buffer", + LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI); + + XLogPrefetcherBeginRead(xlogprefetcher, params->NextRecPtr); + + /* Main decoding loop */ + while (true) + { + record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); + + if (record == NULL) + { + /* + * No error message means we've caught up to the end of available WAL. + * Check if there's a valid EndRecPtr - if not, we're truly at the end. + */ + if (XLogRecPtrIsInvalid(xlogreader->EndRecPtr)) + { + /* + * We've reached the end of available WAL. + * In crash recovery, this means we're done. + * In streaming replication, wait for more WAL. + */ + if (!RecoveryInProgress()) + { + elog(LOG, "[walpipeline] producer: recovery completed"); + break; + } + + /* Wait for more WAL to arrive */ + pg_usleep(10000); /* 10ms */ + continue; + } + else + { + /* + * Cannot read more records, shut it down. + */ + WalPipeline_SendShutdown(); + break; + } + } + + /* + * Successfully read and decoded a record. + * Send it to the consumer. + */ + if (!WalPipeline_SendRecord(xlogreader)) + { + elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached"); + break; + } + + /* Update our position for monitoring */ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_lsn = xlogreader->EndRecPtr; + SpinLockRelease(&WalPipelineShm->mutex); + + CHECK_FOR_INTERRUPTS(); + } + + /* Wait until consumer finished processing records and sent a shutdown request */ + while (true) + { + SpinLockAcquire(&WalPipelineShm->mutex); + if (WalPipelineShm->shutdown_requested) + { + SpinLockRelease(&WalPipelineShm->mutex); + break; + } + SpinLockRelease(&WalPipelineShm->mutex); + pg_usleep(10000); /* sleep 10ms */ + } + + SpinLockAcquire(&WalPipelineShm->mutex); + elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT, + WalPipelineShm->records_sent, WalPipelineShm->records_received); + SpinLockRelease(&WalPipelineShm->mutex); + + /* Cleanup */ + XLogReaderFree(xlogreader); + pfree(private); + cleanup_producer_resources(); +} + +/* + * WalPipeline_SendRecord + * Send a decoded WAL record to the consumer + */ +bool +WalPipeline_SendRecord(XLogReaderState *record) +{ + char *buffer = NULL; + Size msglen; + shm_mq_result res; + + + if (!producer_mq_handle) + return false; + + /* Serialize the record */ + msglen = serialize_wal_record(record, &buffer); + + if (msglen > WAL_PIPELINE_MAX_MSG_SIZE) + { + elog(WARNING, "[walpipeline] producer: wal record at %X/%X too large (%zu bytes), skipping", + LSN_FORMAT_ARGS(record->ReadRecPtr), msglen); + pfree(buffer); + return true; + } + + res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true); + + if (res == SHM_MQ_SUCCESS) + { + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->records_sent++; + WalPipelineShm->bytes_sent += msglen; + SpinLockRelease(&WalPipelineShm->mutex); + + pfree(buffer); + return true; + } + + if (res == SHM_MQ_DETACHED) + { + elog(WARNING, "[walpipeline] producer: consumer detached"); + pfree(buffer); + return false; + } + + /* Some other error */ + elog(WARNING, "[walpipeline] producer: shm_mq_send failed with result %d", res); + pfree(buffer); + return false; +} + +/* + * WalPipeline_SendShutdown + * Send shutdown message to consumer + */ +bool +WalPipeline_SendShutdown(void) +{ + WalMsgHeader hdr; + shm_mq_result res; + + if (!producer_mq_handle) + return false; + + hdr.msg_type = WAL_MSG_SHUTDOWN; + hdr.msg_len = sizeof(WalMsgHeader); + hdr.lsn = InvalidXLogRecPtr; + + res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true); + return (res == SHM_MQ_SUCCESS); +} + +/* + * WalPipeline_SendError + * Send error message to consumer + */ +bool +WalPipeline_SendError(int errcode, const char *errmsg) +{ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->error_code = errcode; + strlcpy(WalPipelineShm->error_message, errmsg, + sizeof(WalPipelineShm->error_message)); + SpinLockRelease(&WalPipelineShm->mutex); + + return true; +} + +/* + * WalPipeline_ReceiveRecord + * Receive and deserialize a WAL record from the producer + */ +DecodedXLogRecord * +WalPipeline_ReceiveRecord(void) +{ + shm_mq_result res; + Size nbytes; + void *data; + WalMsgHeader *hdr; + DecodedXLogRecord *record; + + if (!consumer_mq_handle) + return NULL; + + /* Receive message from queue */ + res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false); + + if (res != SHM_MQ_SUCCESS) + { + /* Detached or error */ + return NULL; + } + + hdr = (WalMsgHeader *) data; + + /* Handle different message types */ + switch (hdr->msg_type) + { + case WAL_MSG_RECORD: + record = deserialize_wal_record((char *) data, nbytes); + + /* Update statistics */ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->records_received++; + WalPipelineShm->bytes_received += nbytes; + WalPipelineShm->consumer_lsn = hdr->lsn; + SpinLockRelease(&WalPipelineShm->mutex); + + return record; + + case WAL_MSG_SHUTDOWN: + elog(LOG, "[walpipeline] consumer: received shutdown message from the producer"); + return NULL; + + case WAL_MSG_ERROR: + SpinLockAcquire(&WalPipelineShm->mutex); + ereport(ERROR, + (errcode(WalPipelineShm->error_code), + errmsg("[walpipeline] consumer: received error from the producer: %s", + WalPipelineShm->error_message))); + SpinLockRelease(&WalPipelineShm->mutex); + return NULL; + + default: + elog(WARNING, "[walpipeline] consumer: unknown message type: %d", + hdr->msg_type); + return NULL; + } +} + +/* + * WalPipeline_CheckProducerAlive + * Check if producer is still running + */ +bool +WalPipeline_CheckProducerAlive(void) +{ + pid_t pid; + bool alive; + + SpinLockAcquire(&WalPipelineShm->mutex); + pid = WalPipelineShm->producer_pid; + SpinLockRelease(&WalPipelineShm->mutex); + + if (pid == 0) + return false; + + alive = (kill(pid, 0) == 0); + + if (!alive) + { + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = 0; + SpinLockRelease(&WalPipelineShm->mutex); + } + + return alive; +} + +/* + * WalPipeline_IsActive + * Check if pipeline is active + */ +bool +WalPipeline_IsActive(void) +{ + bool active; + + if (!WalPipelineShm) + return false; + + SpinLockAcquire(&WalPipelineShm->mutex); + active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + return active; +} + +/* + * WalPipeline_GetStats + * Get pipeline statistics + */ +void +WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received, + XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn) +{ + SpinLockAcquire(&WalPipelineShm->mutex); + + if (records_sent) + *records_sent = WalPipelineShm->records_sent; + if (records_received) + *records_received = WalPipelineShm->records_received; + if (producer_lsn) + *producer_lsn = WalPipelineShm->producer_lsn; + if (consumer_lsn) + *consumer_lsn = WalPipelineShm->consumer_lsn; + + SpinLockRelease(&WalPipelineShm->mutex); +} + +/* + * serialize_wal_record + * + * Serialize the currently decoded WAL record owned by the given + * XLogReaderState into a contiguous buffer. + * + * Allocation/layout of the output buffer: + * + * [WalRecordMsgHeader] + * [XLogRecord] + * [main_data] + * Repeated for each in-use block: + * [SerializedBlockMeta] + * [backup image bytes] (optional) + * [block data bytes] (optional) + * + * The resulting buffer contains no pointers and is safe to transfer + * across processes or store externally. + */ +static Size +serialize_wal_record(XLogReaderState *xlogreader, char **outbuf) +{ + DecodedXLogRecord *dec = xlogreader->record; + WalRecordMsgHeader hdr; + Size total; + char *ptr; + + if (dec == NULL) + return 0; + + /* ---- size calculation ---- */ + total = + sizeof(WalRecordMsgHeader) + + sizeof(XLogRecord) + + dec->main_data_len; + + for (int i = 0; i <= dec->max_block_id; i++) + { + const DecodedBkpBlock *blk = &dec->blocks[i]; + + if (!blk->in_use) + continue; + + total += sizeof(SerializedBlockMeta); + + if (blk->has_image && blk->bkp_image && blk->bimg_len > 0) + total += blk->bimg_len; + + if (blk->has_data && blk->data && blk->data_len > 0) + total += blk->data_len; + } + + ptr = *outbuf = palloc(total); + + /* ---- message header ---- */ + memset(&hdr, 0, sizeof(hdr)); + hdr.msg_type = WAL_MSG_RECORD; + hdr.readRecPtr = xlogreader->ReadRecPtr; + hdr.endRecPtr = xlogreader->EndRecPtr; + hdr.decoded_size = total - sizeof(WalRecordMsgHeader); + hdr.max_block_id = dec->max_block_id; + hdr.main_data_len = dec->main_data_len; + + memcpy(ptr, &hdr, sizeof(hdr)); + ptr += sizeof(hdr); + + /* ---- XLogRecord ---- */ + memcpy(ptr, &dec->header, sizeof(XLogRecord)); + ptr += sizeof(XLogRecord); + + /* ---- main data ---- */ + if (dec->main_data_len > 0) + { + memcpy(ptr, dec->main_data, dec->main_data_len); + ptr += dec->main_data_len; + } + + /* ---- blocks ---- */ + for (int i = 0; i <= dec->max_block_id; i++) + { + const DecodedBkpBlock *blk = &dec->blocks[i]; + SerializedBlockMeta meta; + + if (!blk->in_use) + continue; + + memset(&meta, 0, sizeof(meta)); + meta.block_id = i; + meta.in_use = true; + meta.rlocator = blk->rlocator; + meta.forknum = blk->forknum; + meta.blkno = blk->blkno; + meta.flags = blk->flags; + meta.has_image = blk->has_image; + meta.apply_image = blk->apply_image; + meta.has_data = blk->has_data; + meta.bimg_len = blk->bimg_len; + meta.bimg_info = blk->bimg_info; + meta.hole_offset = blk->hole_offset; + meta.hole_length = blk->hole_length; + meta.data_len = blk->data_len; + + memcpy(ptr, &meta, sizeof(meta)); + ptr += sizeof(meta); + + if (blk->has_image && blk->bkp_image && blk->bimg_len > 0) + { + memcpy(ptr, blk->bkp_image, blk->bimg_len); + ptr += blk->bimg_len; + } + + if (blk->has_data && blk->data && blk->data_len > 0) + { + memcpy(ptr, blk->data, blk->data_len); + ptr += blk->data_len; + } + } + + Assert(ptr - *outbuf == total); + return total; +} + +/* + * deserialize_wal_record + * + * Deserialize a WAL record from a buffer into a DecodedXLogRecord. + * + * Memory layout: + * [DecodedXLogRecord + blocks][main_data][block_images][block_data] + */ +DecodedXLogRecord * +deserialize_wal_record(const char *buf, Size len) +{ + const char *ptr = buf; + const char *end = buf + len; + WalRecordMsgHeader hdr; + DecodedXLogRecord *dec = NULL; + char *alloc_ptr; + int nblocks; + Size total; + + if (len < sizeof(WalRecordMsgHeader)) + return NULL; + + memcpy(&hdr, ptr, sizeof(hdr)); + ptr += sizeof(hdr); + + if (hdr.decoded_size != len - sizeof(WalRecordMsgHeader)) + return NULL; + + nblocks = (hdr.max_block_id >= 0) ? hdr.max_block_id + 1 : 0; + + /* ---- space allocation ---- */ + total = MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock)) + + MAXALIGN(hdr.decoded_size); + + dec = palloc(total); + memset(dec, 0, total); + + alloc_ptr = (char *)dec + MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock)); + + /* ---- record metadata ---- */ + dec->lsn = hdr.readRecPtr; + dec->next_lsn = hdr.endRecPtr; + dec->max_block_id = hdr.max_block_id; + dec->main_data_len = hdr.main_data_len; + + /* ---- XLogRecord ---- */ + if (ptr + sizeof(XLogRecord) > end) + goto fail; + + memcpy(&dec->header, ptr, sizeof(XLogRecord)); + ptr += sizeof(XLogRecord); + + /* ---- main data ---- */ + if (hdr.main_data_len > 0) + { + if (ptr + hdr.main_data_len > end) + goto fail; + + dec->main_data = alloc_ptr; + memcpy(dec->main_data, ptr, hdr.main_data_len); + ptr += hdr.main_data_len; + alloc_ptr += MAXALIGN(hdr.main_data_len); + } + + /* ---- blocks ---- */ + for (int i = 0; i < nblocks && ptr < end; i++) + { + SerializedBlockMeta meta; + DecodedBkpBlock *blk; + + if (ptr + sizeof(meta) > end) + break; + + memcpy(&meta, ptr, sizeof(meta)); + ptr += sizeof(meta); + + if (!meta.in_use) + continue; + + if (meta.block_id < 0 || meta.block_id >= nblocks) + goto fail; + + blk = &dec->blocks[meta.block_id]; + + blk->in_use = true; + blk->rlocator = meta.rlocator; + blk->forknum = meta.forknum; + blk->blkno = meta.blkno; + blk->flags = meta.flags; + blk->has_image = meta.has_image; + blk->apply_image = meta.apply_image; + blk->has_data = meta.has_data; + blk->bimg_len = meta.bimg_len; + blk->bimg_info = meta.bimg_info; + blk->hole_offset = meta.hole_offset; + blk->hole_length = meta.hole_length; + blk->data_len = meta.data_len; + + if (blk->has_image && blk->bimg_len > 0) + { + if (ptr + blk->bimg_len > end) + goto fail; + + blk->bkp_image = alloc_ptr; + memcpy(blk->bkp_image, ptr, blk->bimg_len); + ptr += blk->bimg_len; + alloc_ptr += MAXALIGN(blk->bimg_len); + } + + if (blk->has_data && blk->data_len > 0) + { + if (ptr + blk->data_len > end) + goto fail; + + blk->data = alloc_ptr; + memcpy(blk->data, ptr, blk->data_len); + ptr += blk->data_len; + alloc_ptr += MAXALIGN(blk->data_len); + } + } + + dec->size = alloc_ptr - (char *)dec; + dec->oversized = false; + + return dec; + +fail: + if (dec) + pfree(dec); + + elog(LOG, "deserialize_wal_record: failed"); + return NULL; +} + +/* + * cleanup_producer_resources + * Clean up producer-side resources + */ +static void +cleanup_producer_resources(void) +{ + if (producer_mq_handle) + { + shm_mq_detach(producer_mq_handle); + producer_mq_handle = NULL; + } + + if (producer_dsm_seg) + { + dsm_detach(producer_dsm_seg); + producer_dsm_seg = NULL; + } + + producer_mq = NULL; + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = 0; + WalPipelineShm->producer_exited = true; + SpinLockRelease(&WalPipelineShm->mutex); +} + +/* + * cleanup_consumer_resources + * Clean up consumer-side resources + */ +static void +cleanup_consumer_resources(void) +{ + if (consumer_mq_handle) + { + shm_mq_detach(consumer_mq_handle); + consumer_mq_handle = NULL; + } + + if (consumer_dsm_seg) + { + dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg)); + dsm_detach(consumer_dsm_seg); + consumer_dsm_seg = NULL; + } + + consumer_mq = NULL; + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->consumer_pid = 0; + WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID; + SpinLockRelease(&WalPipelineShm->mutex); +} + +/* + * wal_pipeline_cleanup_callback + * Cleanup callback for process exit + */ +static void +wal_pipeline_cleanup_callback(int code, Datum arg) +{ + pid_t mypid = MyProcPid; + bool is_producer = false; + + if (WalPipelineShm) + { + SpinLockAcquire(&WalPipelineShm->mutex); + is_producer = (WalPipelineShm->producer_pid == mypid); + SpinLockRelease(&WalPipelineShm->mutex); + } + + if (is_producer) + cleanup_producer_resources(); + else + cleanup_consumer_resources(); +} diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 3c3f067aafb..2ffa82db913 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -350,7 +350,7 @@ XLogPrefetchReconfigure(void) static inline void XLogPrefetchIncrement(pg_atomic_uint64 *counter) { - Assert(AmStartupProcess() || !IsUnderPostmaster); + // Assert(AmStartupProcess() || !IsUnderPostmaster); pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); } diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index a81dcbb5d79..0da26c32608 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -32,6 +32,7 @@ #include "access/timeline.h" #include "access/transam.h" +#include "access/wal_pipeline.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" @@ -98,6 +99,7 @@ int recovery_min_apply_delay = 0; char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; bool wal_receiver_create_temp_slot = false; +bool wal_pipeline_enabled = false; /* * recoveryTargetTimeLineGoal: what the user requested, if any @@ -145,7 +147,7 @@ bool InArchiveRecovery = false; * in standby mode. These variables are only valid in the startup process. * They work similarly to ArchiveRecoveryRequested and InArchiveRecovery. */ -static bool StandbyModeRequested = false; +bool StandbyModeRequested = false; bool StandbyMode = false; /* was a signal file present at startup? */ @@ -202,7 +204,7 @@ typedef struct XLogPageReadPrivate } XLogPageReadPrivate; /* flag to tell XLogPageRead that we have started replaying */ -static bool InRedo = false; +bool InRedo = false; /* * Codes indicating where we got a WAL file from during recovery, or where @@ -277,8 +279,8 @@ static TimeLineID receiveTLI = 0; * file. But this copy of minRecoveryPoint variable reflects the value at the * beginning of recovery, and is *not* updated after consistency is reached. */ -static XLogRecPtr minRecoveryPoint; -static TimeLineID minRecoveryPointTLI; +XLogRecPtr minRecoveryPoint; +TimeLineID minRecoveryPointTLI; static XLogRecPtr backupStartPoint; static XLogRecPtr backupEndPoint; @@ -419,12 +421,6 @@ static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); static void ConfirmRecoveryPaused(void); -static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, - int emode, bool fetching_ckpt, - TimeLineID replayTLI); - -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, @@ -1663,6 +1659,87 @@ ShutdownWalRecovery(void) DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch); } +/* + * Get the next record for redo. + * Use the pipeline if enabled for performance else read it directly. + * `reader_state_out` will only be used and updated in case pipeline is + * enabled. + */ +static XLogRecord * +ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode, + bool fetching_ckpt, TimeLineID replayTLI, + XLogReaderState **localreader, uint64 loop_count) +{ + + XLogRecord *record = NULL; + XLogReaderState *reader = *localreader; + + /* + * If pipeline not enabled read the record directly + */ + if (!wal_pipeline_enabled) + { + record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI); + return record; + } + + /* + * Get record from the pipeline + */ + if (WalPipeline_IsActive()) + { + DecodedXLogRecord *decoded_record = NULL; + + decoded_record = WalPipeline_ReceiveRecord(); + + if (decoded_record) + { + record = &decoded_record->header; + + /* + * The previous decoded record has been deserialized from + * from the pipeline and hence need to free the memory after + * use. + * + * But for the first iteration memory space for `reader->record` + * was allocated from the `decode_buffer`, and freeing this + * memory can be fatal. This memory will be freed automatically + * at the end of the recovery in `finishwalrecovery()`. So we + * will skip pfree for the fist iteration. + */ + if (reader->record && loop_count != 0) + pfree(reader->record); + + /* + * Update to local reader state. We don't have to update the + * internal reader state, only updating the primary parameters. + */ + reader->record = decoded_record; + reader->ReadRecPtr = decoded_record->lsn; + reader->DecodeRecPtr = decoded_record->lsn; + reader->EndRecPtr = decoded_record->next_lsn; + reader->NextRecPtr = decoded_record->next_lsn; + reader->decode_queue_head = decoded_record; + reader->decode_queue_tail = decoded_record; + + return record; + } + else + { + /* + * We will end up here only when pipeline couldn't read more + * records and have sent a shutdown msg. We will acknowldge this + * and will send back the shutdown request to properly stop the workers. + */ + WalPipeline_Stop(); + return NULL; + } + } + + elog(FATAL, "[walpipeline] consumer: either pipeline not active, or no record available from pipeline."); + return record; +} + /* * Perform WAL recovery. * @@ -1749,11 +1826,42 @@ PerformWalRecovery(void) { TimestampTz xtime; PGRUsage ru0; + uint64 loop_count = 0; pg_rusage_init(&ru0); InRedo = true; + if(wal_pipeline_enabled) + { + /* + * Start walpipline to decode the wal parallely. Also pass some + * important global parameters. These parameters are already set + * for the startup process but not for our pipeline worker. So in order + * to start decoding through the pipeline, these variables should be + * set. + * + * ArchiveRecoveryRequested T + * InArchiveRecovery. |____ used by ReadRecord when called by the pipeline process + * minRecoveryPoint; | + * minRecoveryPointTLI; ⊥ + * RedoStartLSN T____ used by XlogPageRead when called by the pipeline process + * RedoStartTLI ⊥ + */ + WalPipelineParams *params = palloc0(sizeof(WalPipelineParams)); + params->ReplayTLI = replayTLI; + params->NextRecPtr = xlogreader->NextRecPtr; + params->recoveryTargetTLI = recoveryTargetTLI; + params->StandbyModeRequested = StandbyModeRequested; + params->ArchiveRecoveryRequested = ArchiveRecoveryRequested; + params->InArchiveRecovery = InArchiveRecovery; + params->minRecoveryPointTLI = minRecoveryPointTLI; + params->minRecoveryPoint = minRecoveryPoint; + params->InRedo = InRedo; + + WalPipeline_Start(params); + } + RmgrStartup(); ereport(LOG, @@ -1859,7 +1967,7 @@ PerformWalRecovery(void) } /* Else, try to fetch the next WAL record */ - record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); + record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader, loop_count++); } while (record != NULL); /* @@ -1868,6 +1976,9 @@ PerformWalRecovery(void) if (reachedRecoveryTarget) { + if (wal_pipeline_enabled) + WalPipeline_Stop(); + if (!reachedConsistency) ereport(FATAL, (errmsg("requested recovery stop point is before consistent recovery point"))); @@ -1901,6 +2012,8 @@ PerformWalRecovery(void) RmgrCleanup(); + // XXX: testing purpose only + ereport(DEBUG1, (errmsg("replay loop fiinished apply loop count: " UINT64_FORMAT, loop_count))); ereport(LOG, errmsg("redo done at %X/%08X system usage: %s", LSN_FORMAT_ARGS(xlogreader->ReadRecPtr), @@ -3160,7 +3273,7 @@ ConfirmRecoveryPaused(void) * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ -static XLogRecord * +XLogRecord * ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, TimeLineID replayTLI) { @@ -3168,7 +3281,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher); XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - Assert(AmStartupProcess() || !IsUnderPostmaster); + // Assert(AmStartupProcess() || !IsUnderPostmaster); /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; @@ -3329,7 +3442,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int +int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { @@ -3341,7 +3454,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, int r; instr_time io_start; - Assert(AmStartupProcess() || !IsUnderPostmaster); + // Assert(AmStartupProcess() || !IsUnderPostmaster); XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size); targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 65deabe91a7..a86111e4a1f 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/wal_pipeline.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -136,6 +137,9 @@ static const struct }, { "SequenceSyncWorkerMain", SequenceSyncWorkerMain + }, + { + "WalPipeline_ProducerMain", WalPipeline_ProducerMain } }; diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2a3dfedf7e9..0fe2dd8de57 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/syncscan.h" #include "access/transam.h" #include "access/twophase.h" +#include "access/wal_pipeline.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "access/xlogwait.h" @@ -141,6 +142,7 @@ CalculateShmemSize(void) size = add_size(size, AioShmemSize()); size = add_size(size, WaitLSNShmemSize()); size = add_size(size, LogicalDecodingCtlShmemSize()); + size = add_size(size, WalPipelineShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -225,6 +227,9 @@ CreateSharedMemoryAndSemaphores(void) /* Initialize dynamic shared memory facilities. */ dsm_postmaster_startup(shim); + /* Initialize WAL pipeline module */ + WalPipelineShmemInit(); + /* * Now give loadable modules a chance to set up their shmem allocations */ diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index f0260e6e412..d538fc7be0c 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -3379,6 +3379,12 @@ boot_val => 'false', }, +{ name => 'wal_pipeline', type => 'bool', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY', + short_desc => 'Use parallel workers to speedup recovery.', + variable => 'wal_pipeline_enabled', + boot_val => 'false', +}, + { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.', variable => 'wal_receiver_create_temp_slot', diff --git a/src/include/access/wal_pipeline.h b/src/include/access/wal_pipeline.h new file mode 100644 index 00000000000..0463b8e2c76 --- /dev/null +++ b/src/include/access/wal_pipeline.h @@ -0,0 +1,184 @@ +/*------------------------------------------------------------------------- + * + * wal_pipeline.h + * WAL replay pipeline for parallel recovery + * + * This module implements a producer-consumer pipeline for WAL replay: + * - Producer: background worker that reads and decodes WAL records + * - Consumer: startup process: core redo loop + * + * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL + * records from producer to consumer, enabling parallelism while + * maintaining sequential replay semantics. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * + * src/include/access/wal_pipeline.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAL_PIPELINE_H +#define WAL_PIPELINE_H + +#include "access/xlogreader.h" +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" + +/* + * Magic number for shared memory TOC + */ +#define PG_WAL_PIPELINE_MAGIC 0x57414C50 /* "WALP" */ + +/* + * Message types sent through the pipeline + */ +typedef enum WalMsgType +{ + WAL_MSG_INVALID = 0, + WAL_MSG_RECORD, /* Decoded WAL record */ + WAL_MSG_SHUTDOWN, /* Graceful shutdown request */ + WAL_MSG_ERROR /* Error occurred in producer */ +} WalMsgType; + +/* + * Message header for all pipeline messages + * Each message starts with this header, followed by type-specific data + */ +typedef struct WalMsgHeader +{ + WalMsgType msg_type; + uint32 msg_len; /* Total message length including header */ + XLogRecPtr lsn; /* LSN of the record (for WAL_MSG_RECORD) */ +} WalMsgHeader; + +/* Wire header for a serialized WAL message */ +typedef struct WalRecordMsgHeader +{ + uint32 msg_type; /* WAL_MSG_RECORD etc */ + XLogRecPtr readRecPtr; /* XLogReaderState->ReadRecPtr */ + XLogRecPtr endRecPtr; /* EndRecPtr */ + uint32 decoded_size; /* total size of decoded record payload (not including this header) */ + int32 max_block_id; /* highest block id (could be -1) */ + uint32 main_data_len; /* length of main_data */ + /* followed by decoded payload bytes */ +} WalRecordMsgHeader; + +/* Per-block metadata on the wire (no pointers) */ +typedef struct SerializedBlockMeta +{ + int32 block_id; + bool in_use; + RelFileLocator rlocator; /* same as decoded block */ + ForkNumber forknum; + BlockNumber blkno; + uint8 flags; + bool has_image; + bool apply_image; + bool has_data; + uint16 bimg_len; + uint8 bimg_info; + uint16 hole_offset; + uint16 hole_length; + uint16 data_len; + /* followed by bimg bytes then data bytes */ +} SerializedBlockMeta; + +/* + * Parameters passed from StartupXLOG (consumer side) + * to the WAL pipeline producer background worker. + */ +typedef struct WalPipelineParams +{ + TimeLineID RedoStartTLI; + TimeLineID CheckPointTLI; + TimeLineID ReplayTLI; + TimeLineID recoveryTargetTLI; + TimeLineID minRecoveryPointTLI; + XLogRecPtr RedoStartLSN; + XLogRecPtr CheckPointLoc; + XLogRecPtr NextRecPtr; + XLogRecPtr minRecoveryPoint; + bool haveBackupLabel; + bool StandbyModeRequested; + bool ArchiveRecoveryRequested; + bool InArchiveRecovery; + bool InRedo; +} WalPipelineParams; + +/* + * Shared memory control structure for the WAL pipeline + */ +typedef struct WalPipelineShmCtl +{ + /* Lifecycle management */ + slock_t mutex; + bool initialized; + bool shutdown_requested; + bool producer_exited; + + /* Producer state */ + pid_t producer_pid; + ProcNumber producer_procnum; + XLogRecPtr producer_lsn; /* Last LSN read by producer */ + + /* Consumer state */ + pid_t consumer_pid; + XLogRecPtr consumer_lsn; /* Last LSN applied by consumer */ + + /* Queue handles */ + dsm_handle dsm_seg_handle; + shm_mq_handle *producer_mq_handle; + shm_mq_handle *consumer_mq_handle; + + /* Statistics */ + uint64 records_sent; + uint64 records_received; + uint64 bytes_sent; + uint64 bytes_received; + + /* Error state */ + int error_code; + char error_message[256]; +} WalPipelineShmCtl; + +/* Size of the shared memory queue (can be made configurable) */ +#define WAL_PIPELINE_QUEUE_SIZE (128 * 1024 * 1024) /* 8 MB */ + +/* Maximum size of a single message */ +#define WAL_PIPELINE_MAX_MSG_SIZE (2 * 1024 * 1024) /* 1 MB */ + +/* + * Public API functions + */ + +/* Initialize the WAL pipeline shared memory structures */ +extern Size WalPipelineShmemSize(void); +extern void WalPipelineShmemInit(void); + +/* Start/stop the pipeline */ +extern void WalPipeline_Start(WalPipelineParams *params); +extern void WalPipeline_Stop(void); +extern void WalPipeline_RequestShutdown(void); + +/* Producer functions (called by background worker) */ +extern void WalPipeline_ProducerMain(Datum main_arg); +extern bool WalPipeline_SendRecord(XLogReaderState *record); +extern bool WalPipeline_SendShutdown(void); +extern bool WalPipeline_SendError(int errcode, const char *errmsg); + +/* Consumer functions (called by startup/walreceiver process) */ +extern DecodedXLogRecord *WalPipeline_ReceiveRecord(void); +extern bool WalPipeline_CheckProducerAlive(void); +extern void WalPipeline_FreeRecord(XLogReaderState *record); +extern void WalPipeline_UpdateReaderState(XLogReaderState *dst, XLogReaderState *src); + +/* Status and monitoring */ +extern bool WalPipeline_IsActive(void); +extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received, + XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn); + +/* Global shared memory pointer */ +extern WalPipelineShmCtl *WalPipelineShm; + +#endif /* WAL_PIPELINE_H */ \ No newline at end of file diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index f926d89cb2f..76094982a49 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -11,6 +11,7 @@ #ifndef XLOGRECOVERY_H #define XLOGRECOVERY_H +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" @@ -85,6 +86,13 @@ extern PGDLLIMPORT bool reachedConsistency; /* Are we currently in standby mode? */ extern PGDLLIMPORT bool StandbyMode; +extern PGDLLIMPORT bool StandbyModeRequested; + +/* flag to tell XLogPageRead that we have started replaying */ +extern PGDLLIMPORT bool InRedo; + +extern PGDLLIMPORT XLogRecPtr minRecoveryPoint; +extern PGDLLIMPORT TimeLineID minRecoveryPointTLI; extern Size XLogRecoveryShmemSize(void); extern void XLogRecoveryShmemInit(void); @@ -153,6 +161,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI); +extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, + bool fetching_ckpt, TimeLineID replayTLI); +extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf); extern bool PromoteIsTriggered(void); extern bool CheckPromoteSignal(void); diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index bf39878c43e..7dc1101c95b 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -324,6 +324,8 @@ extern PGDLLIMPORT int tcp_user_timeout; extern PGDLLIMPORT char *role_string; extern PGDLLIMPORT bool in_hot_standby_guc; extern PGDLLIMPORT bool trace_sort; +extern PGDLLIMPORT bool wal_pipeline_enabled; + #ifdef DEBUG_BOUNDED_SORT extern PGDLLIMPORT bool optimize_bounded_sort; -- 2.49.0