Compression of bigger WAL records

Started by Andrey M. Borodin12 months ago10 messages
#1Andrey M. Borodin
x4mmm@yandex-team.ru
1 attachment(s)

Hi hackers!

I propose a slight change to WAL compression: compress body of big records, if it's bigger than some threshold.

===Rationale===
0. Better compression ratio for full page images when pages are compressed together.

Consider following test:

set wal_compression to 'zstd';
create table a as select random() from generate_series(1,1e7);
create index on a(random ); -- warmup to avoid FPI for hint on the heap
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;

B-tree index will emit 97Mb of WAL instead of 125Mb when FPIs are compressed independently.

1. Compression of big records, that are not FPI. E.g. 2-pc records might be big enough to cross a threshold.

2. This might be a path to full WAL compression. In future I plan to propose a compression context: retaining compression dictionary between records. Obviously, the context cannot cross checkpoint borders. And a pool of contexts would be needed to fully utilize efficiency of compression codecs. Anyway - it's too early to theorize.

===Propotype===
I attach a prototype patch. It is functional, but some world tests fail. Probably, because they expect to generate more WAL without putting too much of entropy. Or, perhaps, I missed some bugs. In present version WAL_DEBUG does not indicate any problems. But a lot of quality assurance and commenting work is needed. It's a prototype.

To indicate that WAL record is compressed I use a bit in record->xl_info (XLR_COMPRESSED == 0x04). I found no places that use this bit...
If the record is compressed, record header is continued with information about compression: codec byte and uint32 of uncompressed xl_tot_len.

Currently, compression is done on StringInfo buffers, that are expanded before actual WALInsert() happens. If palloc() is needed during critical section, the compression is canceled. I do not like memory accounting before WALInsert, probably, something clever can be done about it.

WAL_DEBUG and wal_compression are enabled for debugging purposes. Of course, I do not propose to turn them on by default.

What do you think? Does this approach seem viable?

Best regards, Andrey Borodin.

Attachments:

v0-0001-Compress-big-WAL-records.patchapplication/octet-stream; name=v0-0001-Compress-big-WAL-records.patch; x-unix-mode=0644Download
From 7d4099a7f46a516e75816a7006827ab655fcfbf1 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v0] Compress big WAL records

This approach replaces FPI compression
---
 src/backend/access/transam/xlog.c             |   4 +-
 src/backend/access/transam/xloginsert.c       | 387 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 207 ++++++----
 src/backend/utils/misc/guc_tables.c           |   2 +-
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   2 +
 src/include/access/xlogrecord.h               |  20 +-
 src/include/pg_config_manual.h                |   2 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 9 files changed, 326 insertions(+), 301 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bf3dbda901..5674341368 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -121,7 +121,7 @@ char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
-int			wal_compression = WAL_COMPRESSION_NONE;
+int			wal_compression = WAL_COMPRESSION_ZSTD;
 char	   *wal_consistency_checking_string = NULL;
 bool	   *wal_consistency_checking = NULL;
 bool		wal_init_zero = true;
@@ -1031,7 +1031,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index efed097092..ac57008b71 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -40,27 +40,6 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -81,9 +60,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kep in StringInfor for a prototype
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const char *page, uint16 hole_offset,
-									uint16 hole_length, char *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -160,6 +144,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -231,6 +220,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -386,6 +379,7 @@ XLogRegisterData(const char *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -459,6 +454,142 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + sizeof(XLogCompressionData);
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionData *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	uint32 compr_len;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionData*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = BKPIMAGE_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = BKPIMAGE_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = BKPIMAGE_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	compressed_header->record_header.xl_tot_len = sizeof(XLogCompressionData) + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdt->data + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -509,6 +640,8 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
 		int			num_fpi = 0;
+		uint64		rec_size;
+
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -518,7 +651,18 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn, &num_fpi, &topxid_included);
+								 &fpw_lsn, &num_fpi, &topxid_included,
+								 &rec_size);
+
+		// TODO: Invent good name for a GUC controlling compression threshold for a record size
+		if (rec_size > 512 && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  topxid_included);
@@ -547,12 +691,11 @@ XLogInsert(RmgrId rmid, uint8 info)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included,
+				   uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -593,9 +736,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -649,7 +791,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		if (include_image)
 		{
 			const char *page = regbuf->page;
-			uint16		compressed_len = 0;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -666,32 +807,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -709,7 +838,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -720,48 +849,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
-			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
-
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -776,9 +867,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -821,12 +912,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -892,19 +977,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -928,92 +1000,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const char *page, uint16 hole_offset, uint16 hole_length,
-						char *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const char *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1389,4 +1380,10 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	if (data_before_compression == NULL)
+		data_before_compression = makeStringInfo();
+	if (compressed_data == NULL)
+		compressed_data = makeStringInfo();
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..0f7702350a 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -53,6 +53,7 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogRecord *record);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -524,6 +525,16 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi
 	return NULL;
 }
 
+uint32 XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionData *c = (XLogCompressionData*) record;
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 static XLogPageReadResult
 XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 {
@@ -532,7 +543,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_phisical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -643,7 +655,8 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_decomp = XLogGetRecordTotalLen(record);
+	total_len_phisical = record->xl_tot_len;
 
 	/*
 	 * If the whole record header is on this page, validate it immediately.
@@ -663,12 +676,12 @@ restart:
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_phisical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_phisical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -681,7 +694,7 @@ restart:
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
 	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
 	if (decoded == NULL && nonblocking)
 	{
@@ -694,7 +707,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_phisical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -724,7 +737,7 @@ restart:
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_phisical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 
 			if (readOff == XLREAD_WOULDBLOCK)
@@ -765,12 +778,12 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_phisical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_phisical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
@@ -813,7 +826,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_decomp > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -824,11 +837,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_decomp);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_phisical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -844,7 +857,7 @@ restart:
 	{
 		/* Wait for the record data to become available */
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   Min(targetRecOff + total_len_phisical, XLOG_BLCKSZ));
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -854,7 +867,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -878,7 +891,7 @@ restart:
 	{
 		Assert(!nonblocking);
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1646,6 +1659,91 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static char* decompression_buffer = NULL;
+static uint32 decompression_buffer_len = 0;
+
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionData	*src = (XLogCompressionData*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - sizeof(XLogCompressionData);
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (decompression_buffer_len < src->decompressed_length)
+		{
+			if (decompression_buffer)
+				pfree(decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			decompression_buffer_len = TYPEALIGN(BLCKSZ, src->decompressed_length);
+			decompression_buffer = palloc(decompression_buffer_len + SizeOfXLogRecord);
+		}
+		dst_h = (XLogRecord*) decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == BKPIMAGE_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								decompression_buffer_len, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == BKPIMAGE_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, decompression_buffer_len) <= 0)
+				decomp_success = false;
+#else
+			// report_invalid_record(src, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  "LZ4",
+			// 					  0);
+			return NULL;
+#endif
+		}
+		else if (src->method == BKPIMAGE_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														decompression_buffer_len,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			// report_invalid_record(src, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  "zstd",
+			// 					  0);
+			return NULL;
+#endif
+		}
+		else
+		{
+			// report_invalid_record(src, "could not restore image at %X/%X compressed with unknown method, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  0);
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			// report_invalid_record(src, "could not decompress image at %X/%X, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  0);
+			return NULL;
+		}
+
+		return (XLogRecord*) decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1684,6 +1782,8 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(record);
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1794,10 +1894,7 @@ DecodeXLogRecord(XLogReaderState *state,
 
 				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
 				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
+					Assert(false);
 				}
 				else
 					blk->hole_length = BLCKSZ - blk->bimg_len;
@@ -1836,19 +1933,6 @@ DecodeXLogRecord(XLogReaderState *state,
 					goto err;
 				}
 
-				/*
-				 * Cross-check that bimg_len < BLCKSZ if it is compressed.
-				 */
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
-					blk->bimg_len == BLCKSZ)
-				{
-					report_invalid_record(state,
-										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
-										  (unsigned int) blk->bimg_len,
-										  LSN_FORMAT_ARGS(state->ReadRecPtr));
-					goto err;
-				}
-
 				/*
 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
 				 * set nor COMPRESSED().
@@ -2057,7 +2141,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2081,63 +2164,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
 	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
+		Assert(false);
 	}
 
 	/* generate page, taking into account hole if necessary */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c9d8cd796a..a6b2420204 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -5057,7 +5057,7 @@ struct config_enum ConfigureNamesEnum[] =
 			NULL
 		},
 		&wal_compression,
-		WAL_COMPRESSION_NONE, wal_compression_options,
+		WAL_COMPRESSION_ZSTD, wal_compression_options,
 		NULL, NULL, NULL
 	},
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 71894262fb..0266e224bb 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -44,6 +44,7 @@ extern void XLogBeginInsert(void);
 extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const char *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9738462d3c..0cf64ab1b0 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -375,6 +375,8 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 /* Forget error produced by XLogReaderValidatePageHeader(). */
 extern void XLogReaderResetError(XLogReaderState *state);
 
+extern uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
 /*
  * Error information from WALRead that both backend and frontend caller can
  * process.  Currently only errors from pg_pread can be reported.
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index a06833ce0a..d113cd6aa4 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -90,6 +90,8 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -166,17 +168,14 @@ typedef struct XLogRecordBlockImageHeader
 	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
 			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+// TODO: replace sizeof(XLogCompressionData) everywhere
+typedef struct XLogCompressionData
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
-
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionData;
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +184,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..9ff11e3e6a 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -355,7 +355,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of syncscan operations (see also the trace_syncscan GUC var).
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index f408d4f69b..942cf26f3f 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
-- 
2.39.5 (Apple Git-154)

#2Kirill Reshke
reshkekirill@gmail.com
In reply to: Andrey M. Borodin (#1)
Re: Compression of bigger WAL records

I

./pgbin/bin/pg_waldump

On Sun, 12 Jan 2025 at 17:43, Andrey M. Borodin <x4mmm@yandex-team.ru> wrote:

Hi hackers!

I propose a slight change to WAL compression: compress body of big records, if it's bigger than some threshold.

Hi,
initdb fails when configured with --without-zstd

```
reshke@ygp-jammy:~/postgres$ ./pgbin/bin/initdb -D db
The files belonging to this database system will be owned by user "reshke".
This user must also own the server process.

The database cluster will be initialized with locale "C.UTF-8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".

Data page checksums are enabled.

creating directory db ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default "max_connections" ... 100
selecting default "autovacuum_worker_slots" ... 16
selecting default "shared_buffers" ... 128MB
selecting default time zone ... Etc/UTC
creating configuration files ... ok
running bootstrap script ... 2025-01-12 18:10:47.657 UTC [4167965]
FATAL: zstd is not supported by this build
2025-01-12 18:10:47.657 UTC [4167965] PANIC: cannot abort transaction
1, it was already committed
Aborted (core dumped)
child process exited with exit code 134
initdb: removing data directory "db"
```

Also pg_waldump fails with

```
corrupted size vs. prev_size
Aborted (core dumped)
```

Best regards,
Kirill Reshke

#3Andrey Borodin
x4mmm@yandex-team.ru
In reply to: Kirill Reshke (#2)
Re: Compression of bigger WAL records

Hi! Thanks for looking into this!

On 12 Jan 2025, at 23:36, Kirill Reshke <reshkekirill@gmail.com> wrote:

initdb fails when configured with --without-zstd

Yes, the patch is intended to demonstrate improvement when using Zstd.

On 12 Jan 2025, at 17:43, Andrey M. Borodin <x4mmm@yandex-team.ru> wrote:

WAL_DEBUG and wal_compression are enabled for debugging purposes. Of course, I do not propose to turn them on by default.

And this does not work well --without-zstd.

Also pg_waldump fails with

```
corrupted size vs. prev_size
Aborted (core dumped)
```

I’ll fix that, thanks!
Also seems like I forgot to bump WAL_FILE_MAGIC…

What do you think about proposed approach?

Best regards, Andrey Borodin.

#4Andrey M. Borodin
x4mmm@yandex-team.ru
In reply to: Andrey M. Borodin (#1)
1 attachment(s)
Re: Compression of bigger WAL records

On 12 Jan 2025, at 17:43, Andrey M. Borodin <x4mmm@yandex-team.ru> wrote:

I attach a prototype patch.

Here's v2, now it passes all the tests with wal_debug.

Some stats. On this test

create table a as select random() from generate_series(1,1e7);
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;
set wal_compression to 'lz4';
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;
set wal_compression to 'pglz';
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;
set wal_compression to 'zstd';
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;

I observe WAL size of the index:
method HEAD patched
pglz 193 MB 193 MB
lz4 160 MB 132 MB
zstd 125 MB 97 MB

So, for lz4 and zstd this seems to be a significant reduction.

I'm planning to work on improving the patch quality.

Thanks!

Best regards, Andrey Borodin.

Attachments:

v2-0001-Compress-big-WAL-records.patchapplication/octet-stream; name=v2-0001-Compress-big-WAL-records.patch; x-unix-mode=0644Download
From e52919c7656ac598b9aa63d1ad70e8dff34f6685 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v2] Compress big WAL records

This approach replaces FPI compression
---
 contrib/pg_walinspect/pg_walinspect.c         |   6 -
 src/backend/access/rmgrdesc/xlogdesc.c        |  44 +-
 src/backend/access/transam/xlog.c             |  19 +-
 src/backend/access/transam/xloginsert.c       | 388 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 248 ++++++-----
 src/backend/utils/misc/guc_tables.c           |  11 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   6 +
 src/include/access/xlogrecord.h               |  37 +-
 src/include/pg_config_manual.h                |   2 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 src/test/recovery/t/039_end_of_wal.pl         |   2 +-
 14 files changed, 401 insertions(+), 367 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 9e60941578..511bd2978a 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -311,12 +311,6 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 				flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
 			if (blk->apply_image)
 				flags[cnt++] = CStringGetTextDatum("APPLY");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
 			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 58040f2865..55dcfee59d 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -267,46 +267,18 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
 
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				uint8		bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
-
 				/* Calculate the amount of FPI data in the record. */
 				if (fpi_len)
 					*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
 
-				if (BKPIMAGE_COMPRESSED(bimg_info))
-				{
-					const char *method;
-
-					if ((bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-						method = "pglz";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-						method = "lz4";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-						method = "zstd";
-					else
-						method = "unknown";
-
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u, "
-									 "compression saved: %u, method: %s",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length,
-									 BLCKSZ -
-									 XLogRecGetBlock(record, block_id)->hole_length -
-									 XLogRecGetBlock(record, block_id)->bimg_len,
-									 method);
-				}
-				else
-				{
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length);
-				}
+				
+				appendStringInfo(buf,
+								 " (FPW%s); hole: offset: %u, length: %u",
+								 XLogRecBlockImageApply(record, block_id) ?
+								 "" : " for WAL verification",
+								 XLogRecGetBlock(record, block_id)->hole_offset,
+								 XLogRecGetBlock(record, block_id)->hole_length);
+				
 			}
 
 			if (pretty)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bf3dbda901..b54b4260ff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -135,6 +135,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -715,6 +716,22 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
+
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+		Assert(((int32_t)c->decompressed_length) > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1031,7 +1048,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index efed097092..6ae7e4015a 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -40,27 +40,6 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -81,9 +60,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kep in StringInfor for a prototype
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const char *page, uint16 hole_offset,
-									uint16 hole_length, char *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -160,6 +144,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -231,6 +220,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -386,6 +379,7 @@ XLogRegisterData(const char *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -459,6 +454,144 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + SizeOfXLogCompressedRecord;
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	int32 compr_len = -1;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionHeader*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdt->data + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -509,6 +642,8 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
 		int			num_fpi = 0;
+		uint64		rec_size;
+
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -518,7 +653,17 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn, &num_fpi, &topxid_included);
+								 &fpw_lsn, &num_fpi, &topxid_included,
+								 &rec_size);
+
+		if (rec_size > wal_compression_threshold && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  topxid_included);
@@ -547,12 +692,11 @@ XLogInsert(RmgrId rmid, uint8 info)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included,
+				   uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -593,9 +737,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -649,7 +792,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		if (include_image)
 		{
 			const char *page = regbuf->page;
-			uint16		compressed_len = 0;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -666,32 +808,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -709,7 +839,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -720,48 +850,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
-			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
-
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -776,9 +868,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -821,12 +913,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -892,19 +978,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -928,92 +1001,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const char *page, uint16 hole_offset, uint16 hole_length,
-						char *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const char *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1389,4 +1381,10 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	if (data_before_compression == NULL)
+		data_before_compression = makeStringInfo();
+	if (compressed_data == NULL)
+		compressed_data = makeStringInfo();
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..558f40c1fa 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -53,6 +54,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -169,6 +172,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -532,7 +537,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_phisical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -643,8 +649,27 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_phisical = record->xl_tot_len;
 
+	/* TODO: Actually, we should not trust this compression bit too... */
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+		{
+			total_len_decomp = -1; /* Need reassemble to know the size */
+		}
+		else
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			// Assert(((int32_t)c->decompressed_length) > 0); // We cannot assert this, this might be a garbage
+			total_len_decomp = c->decompressed_length;
+		}
+	}
+	else
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogRecord)
+			total_len_decomp = record->xl_tot_len;
+		else 
+			total_len_decomp = -1; /* We are not sure record is not compressed */
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -659,16 +684,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_phisical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_phisical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -680,9 +707,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -694,7 +723,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_phisical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -724,7 +753,7 @@ restart:
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_phisical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 
 			if (readOff == XLREAD_WOULDBLOCK)
@@ -765,12 +794,12 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_phisical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_phisical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
@@ -813,7 +842,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_phisical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -824,11 +853,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_phisical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_phisical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -843,8 +872,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_phisical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_phisical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -854,7 +884,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -877,8 +907,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp might be not actual */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			Assert(((int32_t)c->decompressed_length) > 0);
+			Assert(((int32_t)c->decompressed_length) < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1646,6 +1687,84 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state,
+												XLogRecord *record,
+												XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader	*src = (XLogCompressionHeader*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (state->decompression_buffer_size < src->decompressed_length + SizeOfXLogRecord)
+		{
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			state->decompression_buffer_size = TYPEALIGN(BLCKSZ, src->decompressed_length + SizeOfXLogRecord);
+			state->decompression_buffer = palloc(state->decompression_buffer_size);
+		}
+		dst_h = (XLogRecord*) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								state->decompression_buffer_size, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, state->decompression_buffer_size) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														state->decompression_buffer_size,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		return (XLogRecord*) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1684,6 +1803,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1791,16 +1918,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
-
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
-				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
-				}
-				else
-					blk->hole_length = BLCKSZ - blk->bimg_len;
+				blk->hole_length = BLCKSZ - blk->bimg_len;
 				datatotal += blk->bimg_len;
 
 				/*
@@ -1836,29 +1954,15 @@ DecodeXLogRecord(XLogReaderState *state,
 					goto err;
 				}
 
-				/*
-				 * Cross-check that bimg_len < BLCKSZ if it is compressed.
-				 */
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
-					blk->bimg_len == BLCKSZ)
-				{
-					report_invalid_record(state,
-										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
-										  (unsigned int) blk->bimg_len,
-										  LSN_FORMAT_ARGS(state->ReadRecPtr));
-					goto err;
-				}
-
 				/*
 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
 				 * set nor COMPRESSED().
 				 */
 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
-					!BKPIMAGE_COMPRESSED(blk->bimg_info) &&
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
+										  "BKPIMAGE_HAS_HOLE is not set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -2057,7 +2161,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2079,67 +2182,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	bkpb = &record->record->blocks[block_id];
 	ptr = bkpb->bkp_image;
 
-	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
-	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
-	}
-
 	/* generate page, taking into account hole if necessary */
 	if (bkpb->hole_length == 0)
 	{
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c9d8cd796a..795e93a014 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2980,6 +2980,17 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_compression_threshold", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Minimum WAL record length to engage compression."),
+			NULL,
+			GUC_UNIT_BYTE
+		},
+		&wal_compression_threshold,
+		512, 32, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_writer_flush_after", PGC_SIGHUP, WAL_SETTINGS,
 			gettext_noop("Amount of WAL written out by WAL writer that triggers a flush."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b2bc43383d..90245c9b8d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -239,6 +239,7 @@
 					# (change requires restart)
 #wal_compression = off			# enables compression of full-page writes;
 					# off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512	# min 32, minimal record length to be compressed
 #wal_init_zero = on			# zero-fill new WAL files
 #wal_recycle = on			# recycle WAL files
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 4411c1468a..f40c2ae76b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -56,6 +56,7 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 71894262fb..0266e224bb 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -44,6 +44,7 @@ extern void XLogBeginInsert(void);
 extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const char *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9738462d3c..f41106cbdc 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -252,6 +252,12 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/*
+	 * Buffer to decompress records
+	 */
+	char	   *decompression_buffer;
+	uint32 		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index a06833ce0a..9261760272 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -158,25 +156,19 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
 /* compression methods supported */
-#define BKPIMAGE_COMPRESS_PGLZ	0x04
-#define BKPIMAGE_COMPRESS_LZ4	0x08
-#define BKPIMAGE_COMPRESS_ZSTD	0x10
-
-#define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
-			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+typedef struct XLogCompressionHeader
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionHeader;
 
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +177,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..9ff11e3e6a 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -355,7 +355,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of syncscan operations (see also the trace_syncscan GUC var).
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index f408d4f69b..942cf26f3f 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
diff --git a/src/test/recovery/t/039_end_of_wal.pl b/src/test/recovery/t/039_end_of_wal.pl
index ab751eb271..d4c3d65877 100644
--- a/src/test/recovery/t/039_end_of_wal.pl
+++ b/src/test/recovery/t/039_end_of_wal.pl
@@ -81,7 +81,7 @@ sub emit_message
 	return int(
 		$node->safe_psql(
 			'postgres',
-			"SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
+			"SET wal_compression to off;SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
 		));
 }
 
-- 
2.39.5 (Apple Git-154)

#5Japin Li
japinli@hotmail.com
In reply to: Andrey M. Borodin (#4)
Re: Compression of bigger WAL records

On Tue, 21 Jan 2025 at 23:24, "Andrey M. Borodin" <x4mmm@yandex-team.ru> wrote:

On 12 Jan 2025, at 17:43, Andrey M. Borodin <x4mmm@yandex-team.ru> wrote:

I attach a prototype patch.

Here's v2, now it passes all the tests with wal_debug.

Some stats. On this test

create table a as select random() from generate_series(1,1e7);
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;
set wal_compression to 'lz4';
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;
set wal_compression to 'pglz';
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;
set wal_compression to 'zstd';
select pg_stat_reset_shared('wal'); create index on a(random ); select pg_size_pretty(wal_bytes) from pg_stat_wal;

I observe WAL size of the index:
method HEAD patched
pglz 193 MB 193 MB
lz4 160 MB 132 MB
zstd 125 MB 97 MB

So, for lz4 and zstd this seems to be a significant reduction.

I'm planning to work on improving the patch quality.

Thanks!

Hi, Andrey Borodin

I find this feature interesting; however, it cannot be applied to the current
master (b35434b134b) due to commit 32a18cc0a73.

Applying: Compress big WAL records
.git/rebase-apply/patch:83: trailing whitespace.

.git/rebase-apply/patch:90: trailing whitespace.

.git/rebase-apply/patch:315: trailing whitespace.

.git/rebase-apply/patch:780: trailing whitespace.
else
error: contrib/pg_walinspect/pg_walinspect.c: does not match index
error: src/backend/access/rmgrdesc/xlogdesc.c: does not match index
error: src/backend/access/transam/xlog.c: does not match index
error: src/backend/access/transam/xloginsert.c: does not match index
error: src/backend/access/transam/xlogreader.c: does not match index
error: src/backend/utils/misc/guc_tables.c: does not match index
error: src/backend/utils/misc/postgresql.conf.sample: does not match index
error: src/include/access/xlog.h: does not match index
error: src/include/access/xloginsert.h: does not match index
error: src/include/access/xlogreader.h: does not match index
error: src/include/access/xlogrecord.h: does not match index
error: src/include/pg_config_manual.h: does not match index
error: src/test/recovery/t/026_overwrite_contrecord.pl: does not match index
error: patch failed: src/test/recovery/t/039_end_of_wal.pl:81
error: src/test/recovery/t/039_end_of_wal.pl: patch does not apply
Patch failed at 0001 Compress big WAL records
hint: Use 'git am --show-current-patch=diff' to see the failed patch
When you have resolved this problem, run "git am --continue".
If you prefer to skip this patch, run "git am --skip" instead.
To restore the original branch and stop patching, run "git am --abort".

I see the patch compresses the WAL record according to the wal_compression,
IIRC the wal_compression is only used for FPI, right? Maybe we should update
the description of this parameter.

I see that the wal_compression_threshold defaults to 512. I wonder if you
chose this value based on testing or randomly.

--
Regrads,
Japin Li

#6Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Andrey M. Borodin (#4)
Re: Compression of bigger WAL records

On 2025/01/22 3:24, Andrey M. Borodin wrote:

On 12 Jan 2025, at 17:43, Andrey M. Borodin <x4mmm@yandex-team.ru> wrote:

I attach a prototype patch.

Here's v2, now it passes all the tests with wal_debug.

I like the idea of WAL compression more.

With the current approach, each backend needs to allocate memory twice
the size of the total WAL record. Right? One area is for the gathered
WAL record data (from rdt and registered_buffers), and the other is for
storing the compressed data. Could this lead to potential memory usage
concerns? Perhaps we should consider setting a limit on the maximum
memory each backend can use for WAL compression?

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

#7Andrey Borodin
x4mmm@yandex-team.ru
In reply to: Fujii Masao (#6)
1 attachment(s)
Re: Compression of bigger WAL records

On 23 Jan 2025, at 20:13, Japin Li <japinli@hotmail.com> wrote:

I find this feature interesting;

Thank you for your interest in the patch!

however, it cannot be applied to the current
master (b35434b134b) due to commit 32a18cc0a73.

PFA a rebased version.

I see the patch compresses the WAL record according to the wal_compression,
IIRC the wal_compression is only used for FPI, right? Maybe we should update
the description of this parameter.

Yes, I'll udpate documentation in future versions too.

I see that the wal_compression_threshold defaults to 512. I wonder if you
chose this value based on testing or randomly.

Voices in my head told me it's a good number.

On 28 Jan 2025, at 22:10, Fujii Masao <masao.fujii@oss.nttdata.com> wrote:

I like the idea of WAL compression more.

Thank you!

With the current approach, each backend needs to allocate memory twice
the size of the total WAL record. Right? One area is for the gathered
WAL record data (from rdt and registered_buffers), and the other is for
storing the compressed data.

Yes, exactly. And also a decompression buffer for each WAL reader.

Could this lead to potential memory usage
concerns? Perhaps we should consider setting a limit on the maximum
memory each backend can use for WAL compression?

Yes, the limit makes sense.

Also, we can reduce memory consumption by employing a streaming compression. Currently, I'm working on a prototype of such technology, because it would allow wholesale WAL compression. The idea is to reuse compression context from previous records to better compress new records. This would allow efficient compression of even very small records. However, there is exactly 0 chance to get it done in a decent shape before feature freeze.

The chances of getting currently proposed approach to v18 seems slim either... I'm hesitating to register this patch on the CF. What do you think?

Best regards, Andrey Borodin.

Attachments:

v3-0001-Compress-big-WAL-records.patchapplication/octet-stream; name=v3-0001-Compress-big-WAL-records.patch; x-unix-mode=0644Download
From cbe90a3a40fb0513a15ccbd2b592689fccbccbba Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v3] Compress big WAL records

This approach replaces FPI compression
---
 contrib/pg_walinspect/pg_walinspect.c         |   6 -
 src/backend/access/rmgrdesc/xlogdesc.c        |  44 +-
 src/backend/access/transam/xlog.c             |  19 +-
 src/backend/access/transam/xloginsert.c       | 389 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 248 ++++++-----
 src/backend/utils/misc/guc_tables.c           |  11 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   6 +
 src/include/access/xlogrecord.h               |  37 +-
 src/include/pg_config_manual.h                |   2 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 13 files changed, 401 insertions(+), 366 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 9e60941578..511bd2978a 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -311,12 +311,6 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 				flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
 			if (blk->apply_image)
 				flags[cnt++] = CStringGetTextDatum("APPLY");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
 			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 58040f2865..55dcfee59d 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -267,46 +267,18 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
 
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				uint8		bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
-
 				/* Calculate the amount of FPI data in the record. */
 				if (fpi_len)
 					*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
 
-				if (BKPIMAGE_COMPRESSED(bimg_info))
-				{
-					const char *method;
-
-					if ((bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-						method = "pglz";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-						method = "lz4";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-						method = "zstd";
-					else
-						method = "unknown";
-
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u, "
-									 "compression saved: %u, method: %s",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length,
-									 BLCKSZ -
-									 XLogRecGetBlock(record, block_id)->hole_length -
-									 XLogRecGetBlock(record, block_id)->bimg_len,
-									 method);
-				}
-				else
-				{
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length);
-				}
+				
+				appendStringInfo(buf,
+								 " (FPW%s); hole: offset: %u, length: %u",
+								 XLogRecBlockImageApply(record, block_id) ?
+								 "" : " for WAL verification",
+								 XLogRecGetBlock(record, block_id)->hole_offset,
+								 XLogRecGetBlock(record, block_id)->hole_length);
+				
 			}
 
 			if (pretty)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bf3dbda901..b54b4260ff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -135,6 +135,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -715,6 +716,22 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
+
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+		Assert(((int32_t)c->decompressed_length) > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1031,7 +1048,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index c66012c3a8..1eb724f61c 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -40,27 +40,6 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -81,9 +60,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kep in StringInfor for a prototype
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
-									uint16 hole_length, void *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -160,6 +144,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -231,6 +220,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -386,6 +379,7 @@ XLogRegisterData(const char *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -459,6 +454,144 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + SizeOfXLogCompressedRecord;
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	int32 compr_len = -1;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionHeader*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdt->data + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -509,6 +642,8 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
 		int			num_fpi = 0;
+		uint64		rec_size;
+
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -518,7 +653,17 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn, &num_fpi, &topxid_included);
+								 &fpw_lsn, &num_fpi, &topxid_included,
+								 &rec_size);
+
+		if (rec_size > wal_compression_threshold && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  topxid_included);
@@ -547,12 +692,11 @@ XLogInsert(RmgrId rmid, uint8 info)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included,
+				   uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -593,9 +737,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -648,8 +791,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 		if (include_image)
 		{
-			const PageData *page = regbuf->page;
 			uint16		compressed_len = 0;
+			const char *page = regbuf->page;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -666,32 +809,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -709,7 +840,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -720,48 +851,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
-			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
-
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -776,9 +869,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -821,12 +914,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -892,19 +979,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -928,92 +1002,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
-						void *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const void *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1389,4 +1382,10 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	if (data_before_compression == NULL)
+		data_before_compression = makeStringInfo();
+	if (compressed_data == NULL)
+		compressed_data = makeStringInfo();
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..558f40c1fa 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -53,6 +54,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -169,6 +172,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -532,7 +537,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_phisical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -643,8 +649,27 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_phisical = record->xl_tot_len;
 
+	/* TODO: Actually, we should not trust this compression bit too... */
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+		{
+			total_len_decomp = -1; /* Need reassemble to know the size */
+		}
+		else
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			// Assert(((int32_t)c->decompressed_length) > 0); // We cannot assert this, this might be a garbage
+			total_len_decomp = c->decompressed_length;
+		}
+	}
+	else
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogRecord)
+			total_len_decomp = record->xl_tot_len;
+		else 
+			total_len_decomp = -1; /* We are not sure record is not compressed */
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -659,16 +684,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_phisical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_phisical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -680,9 +707,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -694,7 +723,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_phisical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -724,7 +753,7 @@ restart:
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_phisical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 
 			if (readOff == XLREAD_WOULDBLOCK)
@@ -765,12 +794,12 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_phisical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_phisical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
@@ -813,7 +842,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_phisical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -824,11 +853,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_phisical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_phisical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -843,8 +872,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_phisical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_phisical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -854,7 +884,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -877,8 +907,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp might be not actual */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			Assert(((int32_t)c->decompressed_length) > 0);
+			Assert(((int32_t)c->decompressed_length) < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1646,6 +1687,84 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state,
+												XLogRecord *record,
+												XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader	*src = (XLogCompressionHeader*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (state->decompression_buffer_size < src->decompressed_length + SizeOfXLogRecord)
+		{
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			state->decompression_buffer_size = TYPEALIGN(BLCKSZ, src->decompressed_length + SizeOfXLogRecord);
+			state->decompression_buffer = palloc(state->decompression_buffer_size);
+		}
+		dst_h = (XLogRecord*) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								state->decompression_buffer_size, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, state->decompression_buffer_size) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														state->decompression_buffer_size,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		return (XLogRecord*) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1684,6 +1803,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1791,16 +1918,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
-
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
-				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
-				}
-				else
-					blk->hole_length = BLCKSZ - blk->bimg_len;
+				blk->hole_length = BLCKSZ - blk->bimg_len;
 				datatotal += blk->bimg_len;
 
 				/*
@@ -1836,29 +1954,15 @@ DecodeXLogRecord(XLogReaderState *state,
 					goto err;
 				}
 
-				/*
-				 * Cross-check that bimg_len < BLCKSZ if it is compressed.
-				 */
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
-					blk->bimg_len == BLCKSZ)
-				{
-					report_invalid_record(state,
-										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
-										  (unsigned int) blk->bimg_len,
-										  LSN_FORMAT_ARGS(state->ReadRecPtr));
-					goto err;
-				}
-
 				/*
 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
 				 * set nor COMPRESSED().
 				 */
 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
-					!BKPIMAGE_COMPRESSED(blk->bimg_info) &&
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
+										  "BKPIMAGE_HAS_HOLE is not set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -2057,7 +2161,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2079,67 +2182,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	bkpb = &record->record->blocks[block_id];
 	ptr = bkpb->bkp_image;
 
-	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
-	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
-	}
-
 	/* generate page, taking into account hole if necessary */
 	if (bkpb->hole_length == 0)
 	{
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 38cb9e970d..d37d601f83 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2981,6 +2981,17 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_compression_threshold", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Minimum WAL record length to engage compression."),
+			NULL,
+			GUC_UNIT_BYTE
+		},
+		&wal_compression_threshold,
+		512, 32, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_writer_flush_after", PGC_SIGHUP, WAL_SETTINGS,
 			gettext_noop("Amount of WAL written out by WAL writer that triggers a flush."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 079efa1baa..c5a34c0e7f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -231,6 +231,7 @@
 					# (change requires restart)
 #wal_compression = off			# enables compression of full-page writes;
 					# off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512	# min 32, minimal record length to be compressed
 #wal_init_zero = on			# zero-fill new WAL files
 #wal_recycle = on			# recycle WAL files
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 4411c1468a..f40c2ae76b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -56,6 +56,7 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 3f6b351052..497bd89b91 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -45,6 +45,7 @@ extern void XLogBeginInsert(void);
 extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const char *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9738462d3c..f41106cbdc 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -252,6 +252,12 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/*
+	 * Buffer to decompress records
+	 */
+	char	   *decompression_buffer;
+	uint32 		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index a06833ce0a..9261760272 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -158,25 +156,19 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
 /* compression methods supported */
-#define BKPIMAGE_COMPRESS_PGLZ	0x04
-#define BKPIMAGE_COMPRESS_LZ4	0x08
-#define BKPIMAGE_COMPRESS_ZSTD	0x10
-
-#define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
-			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+typedef struct XLogCompressionHeader
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionHeader;
 
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +177,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..9ff11e3e6a 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -355,7 +355,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of syncscan operations (see also the trace_syncscan GUC var).
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index f408d4f69b..942cf26f3f 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
-- 
2.39.5 (Apple Git-154)

#8wenhui qiu
qiuwenhuifx@gmail.com
In reply to: Andrey Borodin (#7)
Re: Compression of bigger WAL records

Hi Andery
I have a question ,If wal_compression_threshold is set to more than
the block size of the wal log, then the FPI is not compressed, and if so,
it might make sense to have a maximum value of this parameter that does not
exceed the block size of the wal log?

Best regards

On Thu, Jan 30, 2025 at 9:26 PM Andrey Borodin <x4mmm@yandex-team.ru> wrote:

Show quoted text

On 23 Jan 2025, at 20:13, Japin Li <japinli@hotmail.com> wrote:

I find this feature interesting;

Thank you for your interest in the patch!

however, it cannot be applied to the current
master (b35434b134b) due to commit 32a18cc0a73.

PFA a rebased version.

I see the patch compresses the WAL record according to the

wal_compression,

IIRC the wal_compression is only used for FPI, right? Maybe we should

update

the description of this parameter.

Yes, I'll udpate documentation in future versions too.

I see that the wal_compression_threshold defaults to 512. I wonder if you
chose this value based on testing or randomly.

Voices in my head told me it's a good number.

On 28 Jan 2025, at 22:10, Fujii Masao <masao.fujii@oss.nttdata.com>

wrote:

I like the idea of WAL compression more.

Thank you!

With the current approach, each backend needs to allocate memory twice
the size of the total WAL record. Right? One area is for the gathered
WAL record data (from rdt and registered_buffers), and the other is for
storing the compressed data.

Yes, exactly. And also a decompression buffer for each WAL reader.

Could this lead to potential memory usage
concerns? Perhaps we should consider setting a limit on the maximum
memory each backend can use for WAL compression?

Yes, the limit makes sense.

Also, we can reduce memory consumption by employing a streaming
compression. Currently, I'm working on a prototype of such technology,
because it would allow wholesale WAL compression. The idea is to reuse
compression context from previous records to better compress new records.
This would allow efficient compression of even very small records. However,
there is exactly 0 chance to get it done in a decent shape before feature
freeze.

The chances of getting currently proposed approach to v18 seems slim
either... I'm hesitating to register this patch on the CF. What do you
think?

Best regards, Andrey Borodin.

#9Andrey Borodin
x4mmm@yandex-team.ru
In reply to: wenhui qiu (#8)
1 attachment(s)
Re: Compression of bigger WAL records

On 31 Jan 2025, at 08:37, wenhui qiu <qiuwenhuifx@gmail.com> wrote:

Hi Andery
I have a question ,If wal_compression_threshold is set to more than the block size of the wal log, then the FPI is not compressed, and if so, it might make sense to have a maximum value of this parameter that does not exceed the block size of the wal log?

Oops, looks like I missed your question. Sorry for so long delay.

User might want to compress only megabyte+ records, there's nothing wrong with it. WAL record itself is capped by 1Gb (XLogRecordMaxSize), I do not see a reason to restrict wal_compression_threshold by lower value.

PFA rebased version.

Best regards, Andrey Borodin.

Attachments:

v4-0001-Compress-big-WAL-records.patchapplication/octet-stream; name=v4-0001-Compress-big-WAL-records.patch; x-unix-mode=0644Download
From 9b49a06fd1e59dcf2da5768118a357d08adbe15d Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v4] Compress big WAL records

This approach replaces FPI compression
---
 contrib/pg_walinspect/pg_walinspect.c         |   6 -
 src/backend/access/rmgrdesc/xlogdesc.c        |  44 +-
 src/backend/access/transam/xlog.c             |  19 +-
 src/backend/access/transam/xloginsert.c       | 390 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 248 ++++++-----
 src/backend/utils/misc/guc_tables.c           |  11 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   6 +
 src/include/access/xlogrecord.h               |  37 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 12 files changed, 400 insertions(+), 366 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 0398ad82cec..0d2a1d5a462 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -314,12 +314,6 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 				flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
 			if (blk->apply_image)
 				flags[cnt++] = CStringGetTextDatum("APPLY");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
 			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index cd6c2a2f650..585dd3fcec8 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -267,46 +267,18 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
 
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				uint8		bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
-
 				/* Calculate the amount of FPI data in the record. */
 				if (fpi_len)
 					*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
 
-				if (BKPIMAGE_COMPRESSED(bimg_info))
-				{
-					const char *method;
-
-					if ((bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-						method = "pglz";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-						method = "lz4";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-						method = "zstd";
-					else
-						method = "unknown";
-
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u, "
-									 "compression saved: %u, method: %s",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length,
-									 BLCKSZ -
-									 XLogRecGetBlock(record, block_id)->hole_length -
-									 XLogRecGetBlock(record, block_id)->bimg_len,
-									 method);
-				}
-				else
-				{
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length);
-				}
+				
+				appendStringInfo(buf,
+								 " (FPW%s); hole: offset: %u, length: %u",
+								 XLogRecBlockImageApply(record, block_id) ?
+								 "" : " for WAL verification",
+								 XLogRecGetBlock(record, block_id)->hole_offset,
+								 XLogRecGetBlock(record, block_id)->hole_length);
+				
 			}
 
 			if (pretty)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e7827c6ed9..f6c22af81ee 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -135,6 +135,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -725,6 +726,22 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
+
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+		Assert(((int32_t)c->decompressed_length) > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1041,7 +1058,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index c7571429e8e..a1d677419c0 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -40,27 +40,6 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -81,9 +60,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kep in StringInfor for a prototype
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
-									uint16 hole_length, void *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -160,6 +144,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -231,6 +220,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -386,6 +379,7 @@ XLogRegisterData(const void *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -459,6 +454,144 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + SizeOfXLogCompressedRecord;
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	int32 compr_len = -1;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionHeader*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, ((char *)rdt->data) + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -509,6 +642,8 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
 		int			num_fpi = 0;
+		uint64		rec_size;
+
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -518,7 +653,17 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn, &num_fpi, &topxid_included);
+								 &fpw_lsn, &num_fpi, &topxid_included,
+								 &rec_size);
+
+		if (rec_size > wal_compression_threshold && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  topxid_included);
@@ -559,12 +704,11 @@ XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included,
+				   uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -605,9 +749,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -660,8 +803,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 		if (include_image)
 		{
-			const PageData *page = regbuf->page;
-			uint16		compressed_len = 0;
+			const char *page = regbuf->page;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -678,32 +820,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -721,7 +851,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -732,48 +862,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
-			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
-
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -788,9 +880,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -833,12 +925,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -904,19 +990,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -940,92 +1013,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
-						void *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const void *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1401,4 +1393,10 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	if (data_before_compression == NULL)
+		data_before_compression = makeStringInfo();
+	if (compressed_data == NULL)
+		compressed_data = makeStringInfo();
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index ac1f801b1eb..b5e9f8ae1bb 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -54,6 +55,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -170,6 +173,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -533,7 +538,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_phisical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -644,8 +650,27 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_phisical = record->xl_tot_len;
 
+	/* TODO: Actually, we should not trust this compression bit too... */
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+		{
+			total_len_decomp = -1; /* Need reassemble to know the size */
+		}
+		else
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			// Assert(((int32_t)c->decompressed_length) > 0); // We cannot assert this, this might be a garbage
+			total_len_decomp = c->decompressed_length;
+		}
+	}
+	else
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogRecord)
+			total_len_decomp = record->xl_tot_len;
+		else 
+			total_len_decomp = -1; /* We are not sure record is not compressed */
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -660,16 +685,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_phisical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%08X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_phisical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -681,9 +708,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -695,7 +724,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_phisical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -725,7 +754,7 @@ restart:
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_phisical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 
 			if (readOff == XLREAD_WOULDBLOCK)
@@ -766,12 +795,12 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_phisical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%08X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_phisical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
@@ -814,7 +843,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_phisical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -825,11 +854,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_phisical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_phisical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -844,8 +873,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_phisical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_phisical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -855,7 +885,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -878,8 +908,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp might be not actual */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			Assert(((int32_t)c->decompressed_length) > 0);
+			Assert(((int32_t)c->decompressed_length) < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1656,6 +1697,84 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state,
+												XLogRecord *record,
+												XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader	*src = (XLogCompressionHeader*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (state->decompression_buffer_size < src->decompressed_length + SizeOfXLogRecord)
+		{
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			state->decompression_buffer_size = TYPEALIGN(BLCKSZ, src->decompressed_length + SizeOfXLogRecord);
+			state->decompression_buffer = palloc(state->decompression_buffer_size);
+		}
+		dst_h = (XLogRecord*) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								state->decompression_buffer_size, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, state->decompression_buffer_size) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														state->decompression_buffer_size,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		return (XLogRecord*) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1694,6 +1813,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1801,16 +1928,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
-
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
-				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
-				}
-				else
-					blk->hole_length = BLCKSZ - blk->bimg_len;
+				blk->hole_length = BLCKSZ - blk->bimg_len;
 				datatotal += blk->bimg_len;
 
 				/*
@@ -1846,29 +1964,15 @@ DecodeXLogRecord(XLogReaderState *state,
 					goto err;
 				}
 
-				/*
-				 * Cross-check that bimg_len < BLCKSZ if it is compressed.
-				 */
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
-					blk->bimg_len == BLCKSZ)
-				{
-					report_invalid_record(state,
-										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%08X",
-										  (unsigned int) blk->bimg_len,
-										  LSN_FORMAT_ARGS(state->ReadRecPtr));
-					goto err;
-				}
-
 				/*
 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
 				 * set nor COMPRESSED().
 				 */
 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
-					!BKPIMAGE_COMPRESSED(blk->bimg_info) &&
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%08X",
+										  "BKPIMAGE_HAS_HOLE is not set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -2067,7 +2171,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2089,67 +2192,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	bkpb = &record->record->blocks[block_id];
 	ptr = bkpb->bkp_image;
 
-	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
-	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%08X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
-	}
-
 	/* generate page, taking into account hole if necessary */
 	if (bkpb->hole_length == 0)
 	{
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..665a02983fe 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3028,6 +3028,17 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_compression_threshold", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Minimum WAL record length to engage compression."),
+			NULL,
+			GUC_UNIT_BYTE
+		},
+		&wal_compression_threshold,
+		512, 32, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_writer_flush_after", PGC_SIGHUP, WAL_SETTINGS,
 			gettext_noop("Amount of WAL written out by WAL writer that triggers a flush."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a9d8293474a..400dfbe8c36 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -247,6 +247,7 @@
 					# (change requires restart)
 #wal_compression = off			# enables compression of full-page writes;
 					# off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512	# min 32, minimal record length to be compressed
 #wal_init_zero = on			# zero-fill new WAL files
 #wal_recycle = on			# recycle WAL files
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d12798be3d8..56328c771db 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -56,6 +56,7 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index d6a71415d4f..25f6bd1c610 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -46,6 +46,7 @@ extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const void *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9738462d3c9..f41106cbdc1 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -252,6 +252,12 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/*
+	 * Buffer to decompress records
+	 */
+	char	   *decompression_buffer;
+	uint32 		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index a06833ce0a3..9261760272d 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -158,25 +156,19 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
 /* compression methods supported */
-#define BKPIMAGE_COMPRESS_PGLZ	0x04
-#define BKPIMAGE_COMPRESS_LZ4	0x08
-#define BKPIMAGE_COMPRESS_ZSTD	0x10
-
-#define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
-			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+typedef struct XLogCompressionHeader
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionHeader;
 
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +177,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index f408d4f69b6..942cf26f3fa 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
-- 
2.39.5 (Apple Git-154)

#10Andrey Borodin
x4mmm@yandex-team.ru
In reply to: Andrey Borodin (#9)
1 attachment(s)
Re: Compression of bigger WAL records

On 14 Jul 2025, at 23:22, Andrey Borodin <x4mmm@yandex-team.ru> wrote:

PFA rebased version.

Here's a rebased version. Also I fixed a problem of possible wrong memory context used for allocating compression buffer.

Best regards, Andrey Borodin.

Attachments:

v5-0001-Compress-big-WAL-records.patchapplication/octet-stream; name=v5-0001-Compress-big-WAL-records.patch; x-unix-mode=0644Download
From 3ee78ef282e11ef05025ace175e2b4a214b7955a Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v5] Compress big WAL records

This approach replaces FPI compression
---
 contrib/pg_walinspect/pg_walinspect.c         |   6 -
 src/backend/access/rmgrdesc/xlogdesc.c        |  44 +-
 src/backend/access/transam/xlog.c             |  19 +-
 src/backend/access/transam/xloginsert.c       | 395 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 249 +++++++----
 src/backend/utils/misc/guc_parameters.dat     |   9 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   6 +
 src/include/access/xlogrecord.h               |  37 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 12 files changed, 418 insertions(+), 352 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 716a0922c6b..d9e3153c449 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -314,12 +314,6 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 				flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
 			if (blk->apply_image)
 				flags[cnt++] = CStringGetTextDatum("APPLY");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
 			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index ff078f22264..b05633a12bb 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -278,46 +278,18 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
 
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				uint8		bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
-
 				/* Calculate the amount of FPI data in the record. */
 				if (fpi_len)
 					*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
 
-				if (BKPIMAGE_COMPRESSED(bimg_info))
-				{
-					const char *method;
-
-					if ((bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-						method = "pglz";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-						method = "lz4";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-						method = "zstd";
-					else
-						method = "unknown";
-
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u, "
-									 "compression saved: %u, method: %s",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length,
-									 BLCKSZ -
-									 XLogRecGetBlock(record, block_id)->hole_length -
-									 XLogRecGetBlock(record, block_id)->bimg_len,
-									 method);
-				}
-				else
-				{
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length);
-				}
+				
+				appendStringInfo(buf,
+								 " (FPW%s); hole: offset: %u, length: %u",
+								 XLogRecBlockImageApply(record, block_id) ?
+								 "" : " for WAL verification",
+								 XLogRecGetBlock(record, block_id)->hole_offset,
+								 XLogRecGetBlock(record, block_id)->hole_length);
+				
 			}
 
 			if (pretty)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 81dc86847c0..c9c7a1b255b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -138,6 +138,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -717,6 +718,22 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
+
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+		Assert(((int32_t)c->decompressed_length) > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1034,7 +1051,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 92c48e768c3..2e6a55a48b2 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -42,27 +42,6 @@
 #include "utils/memutils.h"
 #include "utils/pgstat_internal.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -83,9 +62,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -115,6 +91,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kept in StringInfo for the prototype.
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before we could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -140,9 +126,7 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
 									   uint64 *fpi_bytes,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
-									uint16 hole_length, void *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -163,6 +147,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -234,6 +223,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -303,6 +293,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -356,6 +348,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -390,6 +383,7 @@ XLogRegisterData(const void *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -444,6 +438,7 @@ XLogRegisterBufData(uint8 block_id, const void *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -463,6 +458,144 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + SizeOfXLogCompressedRecord;
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	int32 compr_len = -1;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionHeader*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, ((char *)rdt->data) + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -514,6 +647,7 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecData *rdt;
 		int			num_fpi = 0;
 		uint64		fpi_bytes = 0;
+		uint64		rec_size;
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -524,7 +658,16 @@ XLogInsert(RmgrId rmid, uint8 info)
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
 								 &fpw_lsn, &num_fpi, &fpi_bytes,
-								 &topxid_included);
+								 &topxid_included, &rec_size);
+
+		if (rec_size > wal_compression_threshold && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  fpi_bytes, topxid_included);
@@ -566,12 +709,10 @@ static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
 				   XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes,
-				   bool *topxid_included)
+				   bool *topxid_included, uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -612,9 +753,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -667,8 +807,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 		if (include_image)
 		{
-			const PageData *page = regbuf->page;
-			uint16		compressed_len = 0;
+			const char *page = regbuf->page;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -685,32 +824,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -728,7 +855,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -739,48 +866,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
 			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
+				bimg.length = BLCKSZ - hole_length;
 
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
-			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
-
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -795,9 +884,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -843,12 +932,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -914,19 +997,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -950,92 +1020,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length,
-						void *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const void *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1411,4 +1400,18 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	/*
+	 * Allocate compression buffers in xloginsert_cxt to ensure they persist
+	 * for the lifetime of the backend, matching other WAL insertion buffers.
+	 */
+	if (data_before_compression == NULL)
+	{
+		MemoryContext oldcxt = MemoryContextSwitchTo(xloginsert_cxt);
+
+		data_before_compression = makeStringInfo();
+		compressed_data = makeStringInfo();
+		MemoryContextSwitchTo(oldcxt);
+	}
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c60aa9a51e9..3facf39842f 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -54,6 +55,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -170,6 +173,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -533,7 +538,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_physical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -644,8 +650,27 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_physical = record->xl_tot_len;
 
+	/* TODO: Actually, we should not trust this compression bit too... */
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+		{
+			total_len_decomp = -1; /* Need reassemble to know the size */
+		}
+		else
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			// Assert(((int32_t)c->decompressed_length) > 0); // We cannot assert this, this might be a garbage
+			total_len_decomp = c->decompressed_length;
+		}
+	}
+	else
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogRecord)
+			total_len_decomp = record->xl_tot_len;
+		else 
+			total_len_decomp = -1; /* We are not sure record is not compressed */
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -660,16 +685,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_physical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%08X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_physical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -681,9 +708,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -695,7 +724,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_physical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -728,7 +757,9 @@ restart:
 			 * can handle the case where the previous record ended as being a
 			 * partial one.
 			 */
-			readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
+			readOff = ReadPageInternal(state, targetPagePtr,
+									   Min(total_len_physical - gotlen + SizeOfXLogShortPHD,
+										   XLOG_BLCKSZ));
 			if (readOff == XLREAD_WOULDBLOCK)
 				return XLREAD_WOULDBLOCK;
 			else if (readOff < 0)
@@ -767,19 +798,19 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_physical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%08X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_physical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_physical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 			if (readOff == XLREAD_WOULDBLOCK)
 				return XLREAD_WOULDBLOCK;
@@ -824,7 +855,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_physical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -835,11 +866,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_physical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_physical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -854,8 +885,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_physical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_physical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -865,7 +897,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_physical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -888,8 +920,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp might be not actual */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			Assert(((int32_t)c->decompressed_length) > 0);
+			Assert(((int32_t)c->decompressed_length) < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1666,6 +1709,93 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state,
+												XLogRecord *record,
+												XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader	*src = (XLogCompressionHeader*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (state->decompression_buffer_size < src->decompressed_length + SizeOfXLogRecord)
+		{
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			state->decompression_buffer_size = TYPEALIGN(BLCKSZ, src->decompressed_length + SizeOfXLogRecord);
+			state->decompression_buffer = palloc_extended(state->decompression_buffer_size,
+														  MCXT_ALLOC_NO_OOM);
+			if (!state->decompression_buffer)
+			{
+				state->decompression_buffer_size = 0;
+				report_invalid_record(state,
+									  "out of memory while decompressing record at %X/%X",
+									  LSN_FORMAT_ARGS(recptr));
+				return NULL;
+			}
+		}
+		dst_h = (XLogRecord*) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								state->decompression_buffer_size, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, state->decompression_buffer_size) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														state->decompression_buffer_size,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		return (XLogRecord*) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1704,6 +1834,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1811,16 +1949,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
-
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
-				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
-				}
-				else
-					blk->hole_length = BLCKSZ - blk->bimg_len;
+				blk->hole_length = BLCKSZ - blk->bimg_len;
 				datatotal += blk->bimg_len;
 
 				/*
@@ -1878,8 +2007,8 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %d at %X/%08X",
-										  blk->data_len,
+										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%08X",
+										  (unsigned int) blk->bimg_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
 				}
@@ -2077,7 +2206,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2099,67 +2227,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	bkpb = &record->record->blocks[block_id];
 	ptr = bkpb->bkp_image;
 
-	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
-	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%08X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%08X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
-	}
-
 	/* generate page, taking into account hole if necessary */
 	if (bkpb->hole_length == 0)
 	{
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 7c60b125564..182f45e4de6 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3325,6 +3325,15 @@
   options => 'wal_compression_options',
 },
 
+{ name => 'wal_compression_threshold', type => 'int', context => 'PGC_SIGHUP', group => 'WAL_SETTINGS',
+  short_desc => 'Minimum WAL record length to engage compression.',
+  flags => 'GUC_UNIT_BYTE',
+  variable => 'wal_compression_threshold',
+  boot_val => '512',
+  min => '32',
+  max => 'INT_MAX',
+},
+
 { name => 'wal_consistency_checking', type => 'string', context => 'PGC_SUSET', group => 'DEVELOPER_OPTIONS',
   short_desc => 'Sets the WAL resource managers for which WAL consistency checks are done.',
   long_desc => 'Full-page images will be logged for all data blocks and cross-checked against the results of WAL replay.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dc9e2255f8a..3693f3bf5a7 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -247,6 +247,7 @@
                                         # (change requires restart)
 #wal_compression = off                  # enables compression of full-page writes;
                                         # off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512        # min 32, minimal record length to be compressed
 #wal_init_zero = on                     # zero-fill new WAL files
 #wal_recycle = on                       # recycle WAL files
 #wal_buffers = -1                       # min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0591a885dd1..11c7c9e5719 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -57,6 +57,7 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 16ebc76e743..c06e8037c1b 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -46,6 +46,7 @@ extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const void *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9b63b6aff75..97066eecf29 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -251,6 +251,12 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/*
+	 * Buffer to decompress records
+	 */
+	char	   *decompression_buffer;
+	uint32 		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index e8999d3fe91..05aa8e4b46e 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -158,25 +156,23 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
 /* compression methods supported */
-#define BKPIMAGE_COMPRESS_PGLZ	0x04
-#define BKPIMAGE_COMPRESS_LZ4	0x08
-#define BKPIMAGE_COMPRESS_ZSTD	0x10
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
 
 #define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
-			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
+	(((info) & (XLR_COMPRESS_PGLZ | XLR_COMPRESS_LZ4 | \
+			  XLR_COMPRESS_ZSTD)) != 0)
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+typedef struct XLogCompressionHeader
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionHeader;
 
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +181,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index 4f501169f42..a8044752b92 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
-- 
2.51.2