From e8619cb11f065a8a86495a9a7aa16c073cd6e37e Mon Sep 17 00:00:00 2001
From: Imran Zaheer <imran.zaheer@cybertec.at>
Date: Tue, 17 Mar 2026 21:47:56 +0500
Subject: [PATCH v2 1/2] Pipelined Recovery - Producer

This includes the producer specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between differemt steps of replay.

The producer includes a background worker that reads and decodes WAL records,
then send them to the startup process for the redo. IPC happens via shared
 memory message queues (shm_mq), allowing the decoder to run ahead of the apply process.

This provides some improvement in recovery performance for CPU-bound workloads.

New GUC: wal_pipeline (default: off)

Author: Imran Zaheer <imran.zhir@gmail.com>
Idea by: Ants Aasma <ants@cybertec.at>
---
 src/backend/access/transam/Makefile           |   1 +
 src/backend/access/transam/meson.build        |   1 +
 src/backend/access/transam/xlogpipeline.c     | 677 ++++++++++++++++++
 src/backend/access/transam/xlogrecovery.c     |  43 ++
 src/backend/postmaster/bgworker.c             |   5 +
 src/backend/storage/ipc/ipci.c                |   5 +
 src/backend/utils/misc/guc_parameters.dat     |  15 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/access/xlog.h                     |   2 +
 src/include/access/xlogpipeline.h             | 188 +++++
 src/include/access/xlogrecovery.h             |  19 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/053_walpipeline.pl        | 208 ++++++
 13 files changed, 1167 insertions(+)
 create mode 100644 src/backend/access/transam/xlogpipeline.c
 create mode 100644 src/include/access/xlogpipeline.h
 create mode 100644 src/test/recovery/t/053_walpipeline.pl

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..ba0bf343769 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,6 +32,7 @@ OBJS = \
 	xlogbackup.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogpipeline.o \
 	xlogprefetcher.o \
 	xlogreader.o \
 	xlogrecovery.o \
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 06aadc7f315..be37b40581d 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -20,6 +20,7 @@ backend_sources += files(
   'xlogbackup.c',
   'xlogfuncs.c',
   'xloginsert.c',
+  'xlogpipeline.c',
   'xlogprefetcher.c',
   'xlogrecovery.c',
   'xlogstats.c',
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
new file mode 100644
index 00000000000..4b95a11d16b
--- /dev/null
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -0,0 +1,677 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.c
+ *    WAL replay pipeline implementation
+ *
+ * This module implements a producer-consumer pipeline for WAL replay.
+ * The producer (background worker) reads and decodes WAL records in parallel
+ * with the consumer (startup process) that applies them.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *    src/backend/access/transam/xlogpipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlogpipeline.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/bufmgr.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/md.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/rel.h"
+#include "utils/timeout.h"
+
+
+/* Global shared memory control structure */
+WalPipelineShmCtl *WalPipelineShm = NULL;
+
+XLogPrefetcher *xlogprefetcher_pipelined = NULL;
+
+/* Local state for producer */
+static dsm_segment *producer_dsm_seg = NULL;
+static shm_mq *producer_mq = NULL;
+static shm_mq_handle *producer_mq_handle = NULL;
+
+
+/*
+ * Flags set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+
+/* Signal handlers */
+static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
+
+/* Forward declarations */
+static void wal_pipeline_cleanup_callback(int code, Datum arg);
+static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static void cleanup_producer_resources(void);
+static void WalPipeline_WaitForConsumerShutdownRequest(void);
+
+/* copied from xlogrecovery.c */
+/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
+typedef struct XLogPageReadPrivate
+{
+	int			emode;
+	bool		fetching_ckpt;	/* are we fetching a checkpoint record? */
+	bool		randAccess;
+	TimeLineID	replayTLI;
+} XLogPageReadPrivate;
+
+/*
+ * Compute space needed for WAL pipeline shared memory
+ */
+Size
+WalPipelineShmemSize(void)
+{
+	Size        size = 0;
+
+	size = add_size(size, sizeof(WalPipelineShmCtl));
+
+	return size;
+}
+
+/*
+ * Initialize WAL pipeline shared memory structures
+ */
+void
+WalPipelineShmemInit(void)
+{
+	bool        found;
+
+	WalPipelineShm = (WalPipelineShmCtl *)
+		ShmemInitStruct("WAL Pipeline Control",
+						sizeof(WalPipelineShmCtl),
+						&found);
+
+	if (!found)
+	{
+		/* First time through, initialize */
+		SpinLockInit(&WalPipelineShm->mutex);
+		WalPipelineShm->initialized = false;
+		WalPipelineShm->shutdown_requested = false;
+		WalPipelineShm->producer_exited = false;
+		WalPipelineShm->producer_pid = 0;
+		WalPipelineShm->consumer_pid = 0;
+		WalPipelineShm->producer_lsn = InvalidXLogRecPtr;
+		WalPipelineShm->consumer_lsn = InvalidXLogRecPtr;
+		WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+		WalPipelineShm->records_sent = 0;
+		WalPipelineShm->records_received = 0;
+		WalPipelineShm->bytes_sent = 0;
+		WalPipelineShm->bytes_received = 0;
+		WalPipelineShm->error_code = 0;
+		WalPipelineShm->error_message[0] = '\0';
+	}
+}
+
+
+
+/*
+ * Producer Function.
+ * Main loop for the producer background worker.
+ */
+void
+WalPipeline_ProducerMain(Datum main_arg)
+{
+	dsm_handle           handle = DatumGetUInt32(main_arg);
+	dsm_segment        	*seg;
+	shm_toc            	*toc;
+	WalPipelineParams  	*params;
+	XLogReaderState    	*xlogreader;
+	XLogPageReadPrivate *private;
+	XLogRecord         	*record;
+	TimeLineID           replayTLI = 0;
+	bool 				 end_of_wal = false;
+	uint64				 records_sent;
+	uint64				 records_received;
+
+	/*
+	 * Properly accept or ignore signals the postmaster might send us.
+	 */
+	pqsignal(SIGHUP, PipelineBgwSigHupHandler); 	/* reload config file */
+
+	/* Register cleanup callback */
+	before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+	seg = dsm_attach(handle);
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("[walpipeline] producer: could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment")));
+
+	/* Lookup params and queue */
+	params = shm_toc_lookup(toc, 1, false);
+	producer_mq = shm_toc_lookup(toc, 2, false);
+
+	/* Set up producer side of queue */
+	producer_dsm_seg = seg;
+	shm_mq_set_sender(producer_mq, MyProc);
+	producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL);
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->producer_pid = MyProcPid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	/* DSM is now attached, so safe to unblock the signals */
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up WAL reading processor */
+	private = palloc0(sizeof(XLogPageReadPrivate));
+	xlogreader =
+		XLogReaderAllocate(wal_segment_size, NULL,
+						   XL_ROUTINE(.page_read = &XLogPageRead,
+									  .segment_open = NULL,
+									  .segment_close = wal_segment_close),
+						   private);
+
+	if (!xlogreader)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+	xlogreader->system_identifier = GetSystemIdentifier();
+
+	/*
+	 * Set the WAL decode buffer size.  This limits how far ahead we can read
+	 * in the WAL.
+	 */
+	XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+	/* Init some important globals before starting */
+	replayTLI = params->ReplayTLI;
+	InitializePipelineRecoveryEnv(params);
+
+	/* Reinit the WAL prefetcher. */
+	xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader);
+
+
+	elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u",
+		 LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI);
+
+	XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr);
+
+	/* Main decoding loop */
+	while (true)
+	{
+		bool shutdown_requested;
+
+		/* Check if consumer requested shutdown */
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		shutdown_requested = WalPipelineShm->shutdown_requested;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (shutdown_requested)
+		{
+			elog(DEBUG1, "[walpipeline] producer: shutdown requested by consumer");
+			break;
+		}
+
+		/* Read next WAL record */
+		record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI);
+
+		if (record == NULL)
+		{
+			end_of_wal = true;
+			elog(DEBUG1, "[walpipeline] producer: reached end of WAL");
+			break;
+		}
+
+		/*
+		 * Successfully decoded a record. Send it to the consumer.
+		 */
+		if (!WalPipeline_SendRecord(xlogreader))
+		{
+			elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached");
+			break;
+		}
+
+		/* Update our position for monitoring */
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->producer_lsn = xlogreader->EndRecPtr;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+
+	if (end_of_wal)
+	{
+		/* Notify consumer we need to exit early */
+		WalPipeline_SendShutdown();
+
+		/* wait until consumer set the flag */
+		WalPipeline_WaitForConsumerShutdownRequest();
+	}
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	records_sent = WalPipelineShm->records_sent;
+	records_received = WalPipelineShm->records_received;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT,
+		 records_sent, records_received);
+
+	/* Cleanup */
+	XLogReaderFree(xlogreader);
+	DisownRecoveryWakeupLatch();
+	pfree(private);
+	cleanup_producer_resources();
+}
+
+/*
+ * Producer Function.
+ * Send a decoded WAL record to the consumer
+ */
+bool
+WalPipeline_SendRecord(XLogReaderState *record)
+{
+	char       *buffer = NULL;
+	Size        msglen;
+	shm_mq_result res;
+
+
+	if (!producer_mq_handle)
+		return false;
+
+	/* Serialize the record */
+	msglen = serialize_wal_record(record, &buffer);
+
+	res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true);
+
+	if (res == SHM_MQ_SUCCESS)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->records_sent++;
+		WalPipelineShm->bytes_sent += msglen;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		pfree(buffer);
+		return true;
+	}
+
+	if (res == SHM_MQ_DETACHED)
+	{
+		elog(PANIC, "[walpipeline] producer: consumer detached");
+		pfree(buffer);
+		return false;
+	}
+
+	/* Some other error */
+	elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res);
+	pfree(buffer);
+	return false;
+}
+
+/*
+ * Producer Function.
+ * Send shutdown message to consumer
+ */
+bool
+WalPipeline_SendShutdown(void)
+{
+	WalRecordMsgHeader hdr;
+	shm_mq_result res;
+
+	if (!producer_mq_handle)
+		return false;
+
+	hdr.msg_type = WAL_MSG_SHUTDOWN;
+	hdr.endRecPtr = InvalidXLogRecPtr;
+
+	res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true);
+	return (res == SHM_MQ_SUCCESS);
+}
+
+/*
+ * Producer Function.
+ * Send error message to consumer
+ */
+bool
+WalPipeline_SendError(int errcode, const char *errmsg)
+{
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->error_code = errcode;
+	strlcpy(WalPipelineShm->error_message, errmsg,
+			sizeof(WalPipelineShm->error_message));
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	return true;
+}
+
+/*
+ * Producer Function.
+ * Producer may can exit without waiting for the consumer, but its better to
+ * wait until consumer request shutdown. This way log messages will show
+ * no of records_sent & records_received records equal to each other.
+ */
+static void
+WalPipeline_WaitForConsumerShutdownRequest(void)
+{
+	int			iters = 0;
+
+	while (true)
+	{
+		bool		shutdown_requested;
+
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		shutdown_requested = WalPipelineShm->shutdown_requested;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (shutdown_requested)
+			break;
+
+		if (++iters >= MAX_SHUTDOWN_WAIT_ITERS)
+		{
+			elog(WARNING,
+					"[walpipeline] producer: timed out waiting for consumer "
+					"to acknowledge shutdown, exiting anyway");
+			break;
+		}
+
+		/* Allow SIGTERM / SIGHUP to interrupt the wait */
+		ProcessPipelineBgwInterrupts();
+
+		pg_usleep(10000);  /* sleep 10ms */
+	}
+}
+
+
+/*
+ * serialize_wal_record (Producer)
+ *
+ * Serialize the currently decoded WAL record owned by the given
+ * XLogReaderState into a contiguous buffer.
+ *
+ * Allocation/layout of the output buffer:
+ *
+ *   [WalRecordMsgHeader]
+ *   [XLogRecord]
+ *   [main_data]
+ *   Repeated for each in-use block:
+ *     [SerializedBlockMeta]
+ *     [backup image bytes]   (optional)
+ *     [block data bytes]     (optional)
+ *
+ * The resulting buffer contains no pointers and is safe to transfer
+ * across processes or store externally.
+ */
+static Size
+serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
+{
+	DecodedXLogRecord *dec = xlogreader->record;
+	WalRecordMsgHeader hdr;
+	Size total;
+	char *ptr;
+
+	if (dec == NULL)
+		return 0;
+
+	/* ---- size calculation ---- */
+	total =
+		sizeof(WalRecordMsgHeader) +
+		sizeof(XLogRecord) +
+		dec->main_data_len;
+
+	for (int i = 0; i <= dec->max_block_id; i++)
+	{
+		const DecodedBkpBlock *blk = &dec->blocks[i];
+
+		if (!blk->in_use)
+			continue;
+
+		total += sizeof(SerializedBlockMeta);
+
+		if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+			total += blk->bimg_len;
+
+		if (blk->has_data && blk->data && blk->data_len > 0)
+			total += blk->data_len;
+	}
+
+	ptr = *outbuf = palloc(total);
+
+	/* ---- message header ---- */
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.msg_type      		= WAL_MSG_RECORD;
+	hdr.readRecPtr    		= xlogreader->ReadRecPtr;
+	hdr.abortedRecPtr    	= xlogreader->abortedRecPtr;
+	hdr.missingContrecPtr	= xlogreader->missingContrecPtr;
+	hdr.overwrittenRecPtr	= xlogreader->overwrittenRecPtr;
+	hdr.endRecPtr     		= xlogreader->EndRecPtr;
+	hdr.decoded_size  		= total - sizeof(WalRecordMsgHeader);
+	hdr.max_block_id  		= dec->max_block_id;
+	hdr.main_data_len 		= dec->main_data_len;
+	hdr.toplevel_xid 		= dec->toplevel_xid;
+	hdr.record_origin 		= dec->record_origin;
+
+	memcpy(ptr, &hdr, sizeof(hdr));
+	ptr += sizeof(hdr);
+
+	/* ---- XLogRecord ---- */
+	memcpy(ptr, &dec->header, sizeof(XLogRecord));
+	ptr += sizeof(XLogRecord);
+
+	/* ---- main data ---- */
+	if (dec->main_data_len > 0)
+	{
+		memcpy(ptr, dec->main_data, dec->main_data_len);
+		ptr += dec->main_data_len;
+	}
+
+	/* ---- blocks ---- */
+	for (int i = 0; i <= dec->max_block_id; i++)
+	{
+		const DecodedBkpBlock *blk = &dec->blocks[i];
+		SerializedBlockMeta meta;
+
+		if (!blk->in_use)
+			continue;
+
+		memset(&meta, 0, sizeof(meta));
+		meta.block_id    = i;
+		meta.in_use      = true;
+		meta.rlocator    = blk->rlocator;
+		meta.forknum     = blk->forknum;
+		meta.blkno       = blk->blkno;
+		meta.flags       = blk->flags;
+		meta.has_image   = blk->has_image;
+		meta.apply_image = blk->apply_image;
+		meta.has_data    = blk->has_data;
+		meta.bimg_len    = blk->bimg_len;
+		meta.bimg_info   = blk->bimg_info;
+		meta.hole_offset = blk->hole_offset;
+		meta.hole_length = blk->hole_length;
+		meta.data_len    = blk->data_len;
+
+		memcpy(ptr, &meta, sizeof(meta));
+		ptr += sizeof(meta);
+
+		if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+		{
+			memcpy(ptr, blk->bkp_image, blk->bimg_len);
+			ptr += blk->bimg_len;
+		}
+
+		if (blk->has_data && blk->data && blk->data_len > 0)
+		{
+			memcpy(ptr, blk->data, blk->data_len);
+			ptr += blk->data_len;
+		}
+	}
+
+	Assert(ptr - *outbuf == total);
+	return total;
+}
+
+
+/*
+ * We need to put some assertion that only pipeline worker should be touching
+ * the specific code.
+ */
+bool AmWalPipeline()
+{
+	if (MyBackendType == B_BG_WORKER && MyBgworkerEntry)
+	{
+		if (strncmp(MyBgworkerEntry->bgw_name, "wal pipeline", 12) == 0)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * Clean up producer-side resources
+ */
+static void
+cleanup_producer_resources(void)
+{
+	if (producer_mq_handle)
+	{
+		shm_mq_detach(producer_mq_handle);
+		producer_mq_handle = NULL;
+	}
+
+	if (producer_dsm_seg)
+	{
+		dsm_detach(producer_dsm_seg);
+		producer_dsm_seg = NULL;
+	}
+
+	producer_mq = NULL;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->producer_pid = 0;
+	WalPipelineShm->producer_exited = true;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Cleanup callback for process exit
+ */
+static void
+wal_pipeline_cleanup_callback(int code, Datum arg)
+{
+	pid_t mypid = MyProcPid;
+	bool is_producer = false;
+
+	if (WalPipelineShm)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		is_producer = (WalPipelineShm->producer_pid == mypid);
+		SpinLockRelease(&WalPipelineShm->mutex);
+	}
+
+	if (is_producer)
+		cleanup_producer_resources();
+	else
+		cleanup_consumer_resources();
+}
+
+/* --------------------------------
+ *		signal handler routines
+ * --------------------------------
+ */
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+PipelineBgwSigHupHandler(SIGNAL_ARGS)
+{
+	got_SIGHUP = true;
+	WakeupRecovery();
+}
+
+/*
+ * Re-read the config file.
+ *
+ * If one of the critical walreceiver options has changed, flag xlogrecovery.c
+ * to restart it.
+ */
+static void
+PipelineRereadConfig(void)
+{
+	char	   *conninfo = pstrdup(PrimaryConnInfo);
+	char	   *slotname = pstrdup(PrimarySlotName);
+	bool		tempSlot = wal_receiver_create_temp_slot;
+	bool		conninfoChanged;
+	bool		slotnameChanged;
+	bool		tempSlotChanged = false;
+
+	ProcessConfigFile(PGC_SIGHUP);
+
+	conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0;
+	slotnameChanged = strcmp(slotname, PrimarySlotName) != 0;
+
+	/*
+	 * wal_receiver_create_temp_slot is used only when we have no slot
+	 * configured.  We do not need to track this change if it has no effect.
+	 */
+	if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0)
+		tempSlotChanged = tempSlot != wal_receiver_create_temp_slot;
+	pfree(conninfo);
+	pfree(slotname);
+
+	if (conninfoChanged || slotnameChanged || tempSlotChanged)
+		StartupRequestWalReceiverRestart();
+}
+
+/*
+ * Process any requests or signals received recently.
+ */
+void
+ProcessPipelineBgwInterrupts(void)
+{
+
+	bool shutdown_requested;
+
+	if (got_SIGHUP)
+	{
+		got_SIGHUP = false;
+		PipelineRereadConfig();
+	}
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	shutdown_requested = WalPipelineShm->shutdown_requested;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	if (shutdown_requested)
+		proc_exit(0);
+
+	CHECK_FOR_INTERRUPTS();
+}
\ No newline at end of file
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6d2c4a86b96..b66ec80fa25 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4738,6 +4738,49 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
 }
 
 
+/*
+ * Pipeline bgw should be aware of all the parameters thats been initialized by
+ * the startup process before performing the actual recoevery. Other then this
+ * there are also some variables that keep on changing. The pipeline & the startup
+ * process should be aware of the state change of such variables, we can use shared
+ * memory for such variables.
+ */
+void
+InitializePipelineRecoveryEnv(WalPipelineParams *params)
+{
+	StandbyMode = params->StandbyMode;
+	StandbyModeRequested = params->StandbyModeRequested;
+	ArchiveRecoveryRequested = params->ArchiveRecoveryRequested;
+	InArchiveRecovery = params->InArchiveRecovery;
+	recoveryTargetTLI = params->recoveryTargetTLI;
+	minRecoveryPointTLI = params->minRecoveryPointTLI;
+	minRecoveryPoint = params->minRecoveryPoint;
+	currentSource = params->currentSource;
+	lastSourceFailed = params->lastSourceFailed;
+	pendingWalRcvRestart = params->pendingWalRcvRestart;
+	RedoStartTLI = params->RedoStartTLI;
+	RedoStartLSN = params->RedoStartLSN;
+	standbyState = params->standbyState;
+	CheckPointLoc = params->CheckPointLoc;
+	CheckPointTLI = params->CheckPointTLI;
+	flushedUpto = params->flushedUpto;
+	receiveTLI = params->receiveTLI;
+	abortedRecPtr = params->abortedRecPtr;
+	missingContrecPtr = params->missingContrecPtr;
+	InRedo = params->InRedo;
+	backupEndRequired = params->backupEndRequired;
+	backupStartPoint = params->backupStartPoint;
+	backupEndPoint = params->backupEndPoint;
+	curFileTLI = params->curFileTLI;
+	InRecovery = true;
+
+	/*
+	 * As pipeline will be reading the wal, so better to own the latch to wait at.
+	 */
+	if (ArchiveRecoveryRequested)
+		OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
 /*
  * GUC check_hook for primary_slot_name
  */
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 0104a86b9ec..192295ad695 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogpipeline.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -143,6 +144,10 @@ static const struct
 	{
 		.fn_name = "SequenceSyncWorkerMain",
 		.fn_addr = SequenceSyncWorkerMain
+	},
+	{
+		.fn_name = "WalPipeline_ProducerMain",
+		.fn_addr = WalPipeline_ProducerMain
 	}
 };
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index a4785daf1e5..f5eaff675f0 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/syncscan.h"
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogwait.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(void)
 	size = add_size(size, AioShmemSize());
 	size = add_size(size, WaitLSNShmemSize());
 	size = add_size(size, LogicalDecodingCtlShmemSize());
+	size = add_size(size, WalPipelineShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -224,6 +226,9 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize dynamic shared memory facilities. */
 	dsm_postmaster_startup(shim);
 
+	/* Initialize WAL pipeline module */
+	WalPipelineShmemInit();
+
 	/*
 	 * Now give loadable modules a chance to set up their shmem allocations
 	 */
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index a5a0edf2534..3523e7d459f 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3398,6 +3398,21 @@
   boot_val => 'false',
 },
 
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+  short_desc => 'Use parallel workers to speedup recovery.',
+  variable => 'wal_pipeline_enabled',
+  boot_val => 'false',
+},
+
+{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+  short_desc => 'Size of the shared memory queue used by the WAL pipeline.',
+  flags => 'GUC_UNIT_MB',
+  variable => 'wal_pipeline_mq_size_mb',
+  boot_val => '128',
+  min => '1',
+  max => '1024',
+},
+
 { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
   short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
   variable => 'wal_receiver_create_temp_slot',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e686d88afc4..1d2fdb9747a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -277,6 +277,8 @@
 #recovery_prefetch = try        # prefetch pages referenced in the WAL?
 #wal_decode_buffer_size = 512kB # lookahead window used for prefetching
                                 # (change requires restart)
+#wal_pipeline = off              # decode in parallel
+#wal_pipeline_queue_size = 128MB
 
 # - Archiving -
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dcc12eb8cbe..a0c26f8005f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb;
 extern PGDLLIMPORT int max_wal_size_mb;
 extern PGDLLIMPORT int wal_keep_size_mb;
 extern PGDLLIMPORT int max_slot_wal_keep_size_mb;
+extern PGDLLIMPORT int wal_pipeline_mq_size_mb;
+extern PGDLLIMPORT bool wal_pipeline_enabled;
 extern PGDLLIMPORT int XLOGbuffers;
 extern PGDLLIMPORT int XLogArchiveTimeout;
 extern PGDLLIMPORT int wal_retrieve_retry_interval;
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
new file mode 100644
index 00000000000..5740b05f79c
--- /dev/null
+++ b/src/include/access/xlogpipeline.h
@@ -0,0 +1,188 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.h
+ *    WAL replay pipeline for parallel recovery
+ *
+ * This module implements a producer-consumer pipeline for WAL replay:
+ * - Producer: background worker that reads and decodes WAL records
+ * - Consumer: startup process: core redo loop
+ *
+ * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL
+ * records from producer to consumer, enabling parallelism while
+ * maintaining sequential replay semantics.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogpipeline.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAL_PIPELINE_H
+#define WAL_PIPELINE_H
+
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * Magic number for shared memory TOC
+ */
+#define PG_WAL_PIPELINE_MAGIC 0x57414C50  /* "WALP" */
+
+/*
+ * Message types sent through the pipeline
+ */
+typedef enum WalMsgType
+{
+	WAL_MSG_INVALID = 0,
+	WAL_MSG_RECORD,         /* Decoded WAL record */
+	WAL_MSG_SHUTDOWN,       /* Graceful shutdown request */
+	WAL_MSG_ERROR           /* Error occurred in producer */
+} WalMsgType;
+
+/* Wire header for a serialized WAL message */
+typedef struct WalRecordMsgHeader
+{
+	WalMsgType  	 msg_type;       	/* WAL_MSG_RECORD etc */
+	XLogRecPtr  	readRecPtr;     	/* XLogReaderState->ReadRecPtr */
+	XLogRecPtr		abortedRecPtr;		/* XLogReaderState->abortedRecPtr */
+	XLogRecPtr		missingContrecPtr;	/* XLogReaderState->missingContrecPtr */
+	XLogRecPtr		overwrittenRecPtr;	/* XLogReaderState->overwrittenRecPtr */
+	ReplOriginId 	record_origin;		/* DecodedXLogRecord->record_origin */
+	TransactionId 	toplevel_xid; 		/* DecodedXLogRecord->toplevel_xid */
+	XLogRecPtr  	endRecPtr;      /* EndRecPtr */
+	uint32      	decoded_size;   /* total size of decoded record payload (not including this header) */
+	int32       	max_block_id;   /* highest block id (could be -1) */
+	uint32      	main_data_len;  /* length of main_data */
+	/* followed by decoded payload bytes */
+} WalRecordMsgHeader;
+
+/* Per-block metadata on the wire (no pointers) */
+typedef struct SerializedBlockMeta
+{
+	int32               block_id;
+	bool                in_use;
+	RelFileLocator      rlocator;   /* same as decoded block */
+	ForkNumber          forknum;
+	BlockNumber         blkno;
+	uint8               flags;
+	bool                has_image;
+	bool                apply_image;
+	bool                has_data;
+	uint16              bimg_len;
+	uint8               bimg_info;
+	uint16              hole_offset;
+	uint16              hole_length;
+	uint16              data_len;
+	/* followed by bimg bytes then data bytes */
+} SerializedBlockMeta;
+
+/*
+ * Parameters passed from StartupXLOG (consumer side)
+ * to the WAL pipeline producer background worker.
+ */
+typedef struct WalPipelineParams
+{
+	bool		StandbyMode;
+	bool		StandbyModeRequested;
+	bool		ArchiveRecoveryRequested;
+	bool		InArchiveRecovery;
+	bool		InRedo;
+	bool 		lastSourceFailed;
+	bool 		pendingWalRcvRestart;
+	bool 		backupEndRequired;
+
+	TimeLineID  RedoStartTLI;
+	TimeLineID  CheckPointTLI;
+	TimeLineID  recoveryTargetTLI;
+	TimeLineID	minRecoveryPointTLI;
+	TimeLineID  ReplayTLI;
+	TimeLineID	receiveTLI;
+
+	XLogRecPtr 	backupStartPoint;
+	XLogRecPtr 	backupEndPoint;
+	XLogRecPtr  CheckPointLoc;
+	XLogRecPtr  RedoStartLSN;
+	XLogRecPtr  NextRecPtr;
+	XLogRecPtr	minRecoveryPoint;
+	XLogRecPtr 	flushedUpto;
+	XLogRecPtr 	abortedRecPtr;
+	XLogRecPtr 	missingContrecPtr;
+
+	int	readFile;
+	XLogSegNo readSegNo;
+	uint32 readOff;
+	uint32 readLen;
+	XLogSource readSource;
+	TimeLineID curFileTLI;
+
+
+	HotStandbyState standbyState;
+	XLogSource 	currentSource;
+
+} WalPipelineParams;
+
+/*
+ * Shared memory control structure for the WAL pipeline
+ */
+typedef struct WalPipelineShmCtl
+{
+	/* Lifecycle management */
+	slock_t         mutex;
+	bool            initialized;
+	bool            shutdown_requested;
+	bool            producer_exited;
+
+	/* Producer state */
+	pid_t           producer_pid;
+	ProcNumber      producer_procnum;
+	XLogRecPtr      producer_lsn;   /* Last LSN read by producer */
+
+	/* Consumer state */
+	pid_t           consumer_pid;
+	XLogRecPtr      consumer_lsn;   /* Last LSN recieved by consumer */
+	XLogRecPtr      applied_lsn;   	/* Last LSN applied by consumer */
+
+	/* Queue handles */
+	dsm_handle      dsm_seg_handle;
+	shm_mq_handle   *producer_mq_handle;
+	shm_mq_handle   *consumer_mq_handle;
+
+	/* Statistics */
+	uint64          records_sent;
+	uint64          records_received;
+	uint64          bytes_sent;
+	uint64          bytes_received;
+
+	/* Error state */
+	int             error_code;
+	char            error_message[256];
+} WalPipelineShmCtl;
+
+/* consumer may have to compute prefetecher stats */
+extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
+
+/*
+ * Public API functions
+ */
+
+/* Initialize the WAL pipeline shared memory structures */
+extern Size WalPipelineShmemSize(void);
+extern void WalPipelineShmemInit(void);
+
+/* Producer functions (called by background worker) */
+extern void WalPipeline_ProducerMain(Datum main_arg);
+extern bool WalPipeline_SendRecord(XLogReaderState *record);
+extern bool WalPipeline_SendShutdown(void);
+extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+
+
+extern void ProcessPipelineBgwInterrupts(void);
+
+/* Global shared memory pointer */
+extern WalPipelineShmCtl *WalPipelineShm;
+
+#endif   /* WAL_PIPELINE_H */
\ No newline at end of file
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 2842106b285..e675ab8353d 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,7 @@
 #ifndef XLOGRECOVERY_H
 #define XLOGRECOVERY_H
 
+#include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
@@ -60,6 +61,17 @@ typedef enum RecoveryPauseState
 	RECOVERY_PAUSED,			/* recovery is paused */
 } RecoveryPauseState;
 
+/* Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one.
+ */
+typedef enum
+{
+	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
+	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
+	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
+	XLOG_FROM_STREAM,			/* streamed from primary */
+} XLogSource;
+
 /*
  * Shared-memory state for WAL recovery.
  */
@@ -208,6 +220,8 @@ typedef struct
 	bool		recovery_signal_file_found;
 } EndOfWalRecoveryInfo;
 
+struct WalPipelineParams;   /* forward declaration */
+
 extern EndOfWalRecoveryInfo *FinishWalRecovery(void);
 extern void ShutdownWalRecovery(void);
 extern void RemovePromoteSignalFiles(void);
@@ -220,6 +234,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern TimestampTz GetLatestXTime(void);
 extern TimestampTz GetCurrentChunkReplayStartTime(void);
 extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI);
+extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
+		   bool fetching_ckpt, TimeLineID replayTLI);
+extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+			 XLogRecPtr targetRecPtr, char *readBuf);
 
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
@@ -229,6 +247,7 @@ extern void StartupRequestWalReceiverRestart(void);
 extern void XLogRequestWalReceiverReply(void);
 
 extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue);
+extern void InitializePipelineRecoveryEnv(struct WalPipelineParams *params);
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 36d789720a3..fc23acbaec2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -61,6 +61,7 @@ tests += {
       't/050_redo_segment_missing.pl',
       't/051_effective_wal_level.pl',
       't/052_checkpoint_segment_missing.pl',
+      't/053_walpipeline.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/053_walpipeline.pl b/src/test/recovery/t/053_walpipeline.pl
new file mode 100644
index 00000000000..2fb790aadc5
--- /dev/null
+++ b/src/test/recovery/t/053_walpipeline.pl
@@ -0,0 +1,208 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+#
+# Tests for the WAL pipeline feature (wal_pipeline GUC).
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ----------
+# Helpers
+# ----------
+
+sub slurp_log
+{
+	my ($node) = @_;
+	open(my $fh, '<', $node->logfile()) or die "Cannot open log: $!";
+	my @lines = <$fh>;
+	close($fh);
+	return @lines;
+}
+
+sub log_matches
+{
+	my ($node, $re) = @_;
+	return grep { /$re/ } slurp_log($node);
+}
+
+
+# ########################################
+#   wal_pipeline = on, basic recovery
+# ########################################
+
+my $node1 = PostgreSQL::Test::Cluster->new('p1-recovery');
+$node1->init;
+$node1->start;
+
+$node1->safe_psql('postgres', q{
+    CREATE TABLE t (id serial PRIMARY KEY, v text);
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node1->safe_psql('postgres', q{
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node1->stop('immediate');
+
+# restart → recovery happens
+$node1->append_conf('postgresql.conf', "wal_pipeline = on");
+$node1->start;
+
+
+# Producer started
+ok(scalar log_matches($node1, qr/\[walpipeline\] producer: started at/),
+	'producer started message found in log');
+
+# Pipeline stopped cleanly
+ok(scalar log_matches($node1, qr/\[walpipeline\] stopped/),
+	'pipeline stopped message found in log');
+
+# Consumer received shutdown from producer
+ok(scalar log_matches($node1, qr/\[walpipeline\] consumer: received shutdown message/),
+	'consumer received shutdown message from producer');
+
+# sent == received
+my @exit_lines = log_matches($node1,
+	qr/\[walpipeline\] producer: exiting: sent=\d+ received=\d+/);
+ok(scalar @exit_lines >= 1, 'producer exiting line found in log');
+
+my ($sent, $recv) = $exit_lines[-1] =~ /sent=(\d+) received=(\d+)/;
+ok(defined $sent && $sent > 0, "sent count ($sent) is positive");
+ok(defined $recv && $recv > 0, "received count ($recv) is positive");
+is($sent, $recv, "no records lost in pipeline queue: sent=$sent received=$recv");
+
+# No PANIC
+ok(!(scalar log_matches($node1, qr/\bPANIC\b/)),
+	'no PANIC messages during pipeline recovery');
+
+# Data integrity
+my $count = $node1->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count + 0, 100_000, 'all 100000 rows visible after pipeline recovery');
+
+$node1->stop;
+
+# ##############################################################
+#    wal_pipeline = off (baseline, no pipeline log messages)
+# ##############################################################
+
+my $node2 = PostgreSQL::Test::Cluster->new('p0-recovery');
+$node2->init;
+$node2->start;
+
+$node2->safe_psql('postgres', q{
+    CREATE TABLE t (id serial PRIMARY KEY, v text);
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node2->safe_psql('postgres', q{
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node2->stop('immediate');
+
+# restart → recovery happens
+$node2->append_conf('postgresql.conf', "wal_pipeline = off");
+$node2->start;
+
+ok(!(scalar log_matches($node2, qr/\[walpipeline\] producer: started/)),
+	'no pipeline log messages when wal_pipeline = off');
+
+my $count2 = $node2->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count2 + 0, 100_000, 'all rows present after non-pipeline recovery');
+
+$node2->stop;
+
+
+
+# ###################################################################
+#    pipeline on vs off produce identical data (checksum comparison)
+# ###################################################################
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+$primary->safe_psql('postgres', q{
+    CREATE TABLE t (id serial PRIMARY KEY, v text);
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1, 30000) i;
+});
+
+$primary->backup('backup3');
+
+$primary->safe_psql('postgres', q{
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1, 30000) i;
+    UPDATE t SET v = 'x' WHERE id % 10 = 0;
+});
+
+# ensure WAL boundary
+$primary->safe_psql('postgres', 'SELECT pg_switch_wal()');
+my $target_lsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_lsn()');
+
+my $replica_on = PostgreSQL::Test::Cluster->new('replica_p1');
+$replica_on->init_from_backup($primary, 'backup3',
+    has_streaming => 1);
+$replica_on->append_conf('postgresql.conf', "wal_pipeline = on\n");
+$replica_on->start;
+
+my $replica_off = PostgreSQL::Test::Cluster->new('replica_p0');
+$replica_off->init_from_backup($primary, 'backup3',
+    has_streaming => 1);
+$replica_off->append_conf('postgresql.conf', "wal_pipeline = off\n");
+$replica_off->start;
+
+# wait for replicas to catch up
+$primary->wait_for_catchup($replica_on);
+$primary->wait_for_catchup($replica_off);
+
+my $md5_on  = $replica_on->safe_psql('postgres',
+    "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+my $md5_off = $replica_off->safe_psql('postgres',
+    "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+is($md5_on, $md5_off,
+    'table checksum identical between pipeline=on and pipeline=off');
+
+$replica_on->stop;
+$replica_off->stop;
+$primary->stop('fast');
+
+
+
+# #################################
+#    pipeline when on tiny replay
+# #################################
+
+my $node3 = PostgreSQL::Test::Cluster->new('p1-small-replay');
+$node3->init;
+$node3->start;
+
+# crash stop to force WAL recovery
+$node3->stop('immediate');
+
+# restart → recovery happens
+$node3->append_conf('postgresql.conf', "wal_pipeline = on");
+$node3->start;
+
+ok(scalar log_matches($node3, qr/\[walpipeline\] producer: exiting: sent=0 received=0/),
+	'pipeline producer sent zero records');
+
+ok((scalar log_matches($node3, qr/redo done at/)),
+	'pipeline redo done even with tiny replay');
+
+$node3->stop;
+
+done_testing();
\ No newline at end of file
-- 
2.34.1

