From 7d4099a7f46a516e75816a7006827ab655fcfbf1 Mon Sep 17 00:00:00 2001 From: Andrey Borodin Date: Wed, 8 Jan 2025 16:39:20 +0500 Subject: [PATCH v0] Compress big WAL records This approach replaces FPI compression --- src/backend/access/transam/xlog.c | 4 +- src/backend/access/transam/xloginsert.c | 387 +++++++++--------- src/backend/access/transam/xlogreader.c | 207 ++++++---- src/backend/utils/misc/guc_tables.c | 2 +- src/include/access/xloginsert.h | 1 + src/include/access/xlogreader.h | 2 + src/include/access/xlogrecord.h | 20 +- src/include/pg_config_manual.h | 2 +- .../recovery/t/026_overwrite_contrecord.pl | 2 +- 9 files changed, 326 insertions(+), 301 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index bf3dbda901..5674341368 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -121,7 +121,7 @@ char *XLogArchiveCommand = NULL; bool EnableHotStandby = false; bool fullPageWrites = true; bool wal_log_hints = false; -int wal_compression = WAL_COMPRESSION_NONE; +int wal_compression = WAL_COMPRESSION_ZSTD; char *wal_consistency_checking_string = NULL; bool *wal_consistency_checking = NULL; bool wal_init_zero = true; @@ -1031,7 +1031,7 @@ XLogInsertRecord(XLogRecData *rdata, /* We also need temporary space to decode the record. */ record = (XLogRecord *) recordBuf.data; decoded = (DecodedXLogRecord *) - palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len)); + palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record))); if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index efed097092..ac57008b71 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -40,27 +40,6 @@ #include "storage/proc.h" #include "utils/memutils.h" -/* - * Guess the maximum buffer size required to store a compressed version of - * backup block image. - */ -#ifdef USE_LZ4 -#define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ) -#else -#define LZ4_MAX_BLCKSZ 0 -#endif - -#ifdef USE_ZSTD -#define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ) -#else -#define ZSTD_MAX_BLCKSZ 0 -#endif - -#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ) - -/* Buffer size required to store a compressed version of backup block image */ -#define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ) - /* * For each block reference registered with XLogRegisterBuffer, we fill in * a registered_buffer struct. @@ -81,9 +60,6 @@ typedef struct XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to * backup block data in XLogRecordAssemble() */ - - /* buffer to store a compressed version of backup block image */ - char compressed_page[COMPRESS_BUFSIZE]; } registered_buffer; static registered_buffer *registered_buffers; @@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0; static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; +/* + * Compression buffers are kep in StringInfor for a prototype + * compression_buffer_current_size == -1 means we entered critical section + * before could prepare compression buffers. + */ +static int32 compression_buffer_current_size; +static XLogRecData compressed_rdt_hdr; +static StringInfo data_before_compression = NULL; +static StringInfo compressed_data = NULL; + #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) @@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt; static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn, int *num_fpi, - bool *topxid_included); -static bool XLogCompressBackupBlock(const char *page, uint16 hole_offset, - uint16 hole_length, char *dest, uint16 *dlen); + bool *topxid_included, uint64 *rec_size); /* * Begin constructing a WAL record. This must be called before the @@ -160,6 +144,11 @@ XLogBeginInsert(void) elog(ERROR, "XLogBeginInsert was already called"); begininsert_called = true; + + if (data_before_compression) + resetStringInfo(data_before_compression); + if (compressed_data) + resetStringInfo(compressed_data); } /* @@ -231,6 +220,7 @@ XLogResetInsertion(void) mainrdata_len = 0; mainrdata_last = (XLogRecData *) &mainrdata_head; curinsert_flags = 0; + compression_buffer_current_size = 0; begininsert_called = false; } @@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) #endif regbuf->in_use = true; + + XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ); } /* @@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum, #endif regbuf->in_use = true; + XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ); } /* @@ -386,6 +379,7 @@ XLogRegisterData(const char *data, uint32 len) mainrdata_last = rdata; mainrdata_len += len; + XLogEnsureCompressionBuffer(len); } /* @@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const char *data, uint32 len) regbuf->rdata_tail->next = rdata; regbuf->rdata_tail = rdata; regbuf->rdata_len += len; + XLogEnsureCompressionBuffer(len); } /* @@ -459,6 +454,142 @@ XLogSetRecordFlags(uint8 flags) curinsert_flags |= flags; } +/* + * Make sure we have buffers needed for compression. + * We cannot do it during WALInsert(), because we will be in a critial section. + */ +void XLogEnsureCompressionBuffer(uint32 extraLen) +{ + uint64 compressed_buffer_size; + uint64 desired_buffer_size; + + if (wal_compression == WAL_COMPRESSION_NONE) + return; + + if (CritSectionCount > 0 || compression_buffer_current_size == -1) + { + /* We cannot prepare buffer during critical section, so bail out early */ + compression_buffer_current_size = -1; + return; + } + + compression_buffer_current_size += extraLen; + desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord; + Assert(data_before_compression->len == 0); + enlargeStringInfo(data_before_compression, desired_buffer_size); + + compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size); + +#ifdef USE_LZ4 + compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size)); +#endif +#ifdef USE_ZSTD + compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size)); +#endif + compressed_buffer_size = compressed_buffer_size + sizeof(XLogCompressionData); + Assert(compressed_data->len == 0); + enlargeStringInfo(compressed_data, compressed_buffer_size); +} + +/* Compress assembled record on top of compression buffers */ +static XLogRecData* +XLogCompressRdt(XLogRecData *rdt) +{ + XLogCompressionData *compressed_header; + XLogRecord *src_header; + uint32 orig_len; + uint32 compr_len; + + if (compression_buffer_current_size == -1) + return NULL; + + Assert(wal_compression != WAL_COMPRESSION_NONE); + + Assert(compression_buffer_current_size <= data_before_compression->maxlen); + + /* Build the whole record */ + for (; rdt != NULL; rdt = rdt->next) + appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len); + + src_header = (XLogRecord*) data_before_compression->data; + compressed_header = (XLogCompressionData*) compressed_data->data; + + compressed_header->record_header = *src_header; + compressed_header->decompressed_length = data_before_compression->len; + + orig_len = src_header->xl_tot_len - SizeOfXLogRecord; + + switch ((WalCompression) wal_compression) + { + case WAL_COMPRESSION_PGLZ: + compressed_header->method = BKPIMAGE_COMPRESS_PGLZ; + compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default); + if (compr_len == -1) + return NULL; + break; + + case WAL_COMPRESSION_LZ4: +#ifdef USE_LZ4 + compressed_header->method = BKPIMAGE_COMPRESS_LZ4; + compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len, + compressed_data->maxlen); + if (compr_len <= 0) + return NULL; +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_ZSTD: +#ifdef USE_ZSTD + compressed_header->method = BKPIMAGE_COMPRESS_ZSTD; + compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len, + ZSTD_CLEVEL_DEFAULT); + if (ZSTD_isError(compr_len)) + return NULL; +#else + elog(ERROR, "zstd is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_NONE: + Assert(false); /* cannot happen */ + return NULL; + break; + /* no default case, so that compiler will warn */ + } + + compressed_header->record_header.xl_tot_len = sizeof(XLogCompressionData) + compr_len; + + compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED; + + compressed_rdt_hdr.data = compressed_data->data; + compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len; + compressed_rdt_hdr.next = NULL; + + return &compressed_rdt_hdr; +} + +/* Checksum assebled record, possibly compressed */ +static void XLogChecksumRecord(XLogRecData *rdt) +{ + pg_crc32c rdata_crc; + XLogRecord *rechdr = (XLogRecord*) rdt->data; + /* + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. + */ + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, rdt->data + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord); + for (rdt = rdt->next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); + rechdr->xl_crc = rdata_crc; +} + /* * Insert an XLOG record having the specified RMID and info bytes, with the * body of the record being the data and buffer references registered earlier @@ -509,6 +640,8 @@ XLogInsert(RmgrId rmid, uint8 info) XLogRecPtr fpw_lsn; XLogRecData *rdt; int num_fpi = 0; + uint64 rec_size; + /* * Get values needed to decide whether to do full-page writes. Since @@ -518,7 +651,18 @@ XLogInsert(RmgrId rmid, uint8 info) GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, - &fpw_lsn, &num_fpi, &topxid_included); + &fpw_lsn, &num_fpi, &topxid_included, + &rec_size); + + // TODO: Invent good name for a GUC controlling compression threshold for a record size + if (rec_size > 512 && wal_compression != WAL_COMPRESSION_NONE) + { + XLogRecData *rdt_compressed = XLogCompressRdt(rdt); + if (rdt_compressed != NULL) + rdt = rdt_compressed; + } + + XLogChecksumRecord(rdt); EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, topxid_included); @@ -547,12 +691,11 @@ XLogInsert(RmgrId rmid, uint8 info) static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included) + XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included, + uint64 *rec_size) { - XLogRecData *rdt; uint64 total_len = 0; int block_id; - pg_crc32c rdata_crc; registered_buffer *prev_regbuf = NULL; XLogRecData *rdt_datas_last; XLogRecord *rechdr; @@ -593,9 +736,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, bool needs_data; XLogRecordBlockHeader bkpb; XLogRecordBlockImageHeader bimg; - XLogRecordBlockCompressHeader cbimg = {0}; + uint32 hole_length; bool samerel; - bool is_compressed = false; bool include_image; if (!regbuf->in_use) @@ -649,7 +791,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, if (include_image) { const char *page = regbuf->page; - uint16 compressed_len = 0; /* * The page needs to be backed up, so calculate its hole length @@ -666,32 +807,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, upper <= BLCKSZ) { bimg.hole_offset = lower; - cbimg.hole_length = upper - lower; + hole_length = upper - lower; } else { /* No "hole" to remove */ bimg.hole_offset = 0; - cbimg.hole_length = 0; + hole_length = 0; } } else { /* Not a standard page header, don't try to eliminate "hole" */ bimg.hole_offset = 0; - cbimg.hole_length = 0; - } - - /* - * Try to compress a block image if wal_compression is enabled - */ - if (wal_compression != WAL_COMPRESSION_NONE) - { - is_compressed = - XLogCompressBackupBlock(page, bimg.hole_offset, - cbimg.hole_length, - regbuf->compressed_page, - &compressed_len); + hole_length = 0; } /* @@ -709,7 +838,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rdt_datas_last->next = ®buf->bkp_rdatas[0]; rdt_datas_last = rdt_datas_last->next; - bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE; + bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE; /* * If WAL consistency checking is enabled for the resource manager @@ -720,48 +849,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, if (needs_backup) bimg.bimg_info |= BKPIMAGE_APPLY; - if (is_compressed) - { - /* The current compression is stored in the WAL record */ - bimg.length = compressed_len; - - /* Set the compression method used for this block */ - switch ((WalCompression) wal_compression) - { - case WAL_COMPRESSION_PGLZ: - bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ; - break; - - case WAL_COMPRESSION_LZ4: -#ifdef USE_LZ4 - bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4; -#else - elog(ERROR, "LZ4 is not supported by this build"); -#endif - break; - - case WAL_COMPRESSION_ZSTD: -#ifdef USE_ZSTD - bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD; -#else - elog(ERROR, "zstd is not supported by this build"); -#endif - break; - - case WAL_COMPRESSION_NONE: - Assert(false); /* cannot happen */ - break; - /* no default case, so that compiler will warn */ - } - - rdt_datas_last->data = regbuf->compressed_page; - rdt_datas_last->len = compressed_len; - } - else { - bimg.length = BLCKSZ - cbimg.hole_length; + bimg.length = BLCKSZ - hole_length; - if (cbimg.hole_length == 0) + if (hole_length == 0) { rdt_datas_last->data = page; rdt_datas_last->len = BLCKSZ; @@ -776,9 +867,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rdt_datas_last = rdt_datas_last->next; rdt_datas_last->data = - page + (bimg.hole_offset + cbimg.hole_length); + page + (bimg.hole_offset + hole_length); rdt_datas_last->len = - BLCKSZ - (bimg.hole_offset + cbimg.hole_length); + BLCKSZ - (bimg.hole_offset + hole_length); } } @@ -821,12 +912,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, { memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); scratch += SizeOfXLogRecordBlockImageHeader; - if (cbimg.hole_length != 0 && is_compressed) - { - memcpy(scratch, &cbimg, - SizeOfXLogRecordBlockCompressHeader); - scratch += SizeOfXLogRecordBlockCompressHeader; - } } if (!samerel) { @@ -892,19 +977,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, hdr_rdt.len = (scratch - hdr_scratch); total_len += hdr_rdt.len; - /* - * Calculate CRC of the data - * - * Note that the record header isn't added into the CRC initially since we - * don't know the prev-link yet. Thus, the CRC will represent the CRC of - * the whole record in the order: rdata, then backup blocks, then record - * header. - */ - INIT_CRC32C(rdata_crc); - COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); - for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) - COMP_CRC32C(rdata_crc, rdt->data, rdt->len); - /* * Ensure that the XLogRecord is not too large. * @@ -928,92 +1000,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; - rechdr->xl_crc = rdata_crc; + rechdr->xl_crc = 0; - return &hdr_rdt; -} + *rec_size = rechdr->xl_tot_len; -/* - * Create a compressed version of a backup block image. - * - * Returns false if compression fails (i.e., compressed result is actually - * bigger than original). Otherwise, returns true and sets 'dlen' to - * the length of compressed block image. - */ -static bool -XLogCompressBackupBlock(const char *page, uint16 hole_offset, uint16 hole_length, - char *dest, uint16 *dlen) -{ - int32 orig_len = BLCKSZ - hole_length; - int32 len = -1; - int32 extra_bytes = 0; - const char *source; - PGAlignedBlock tmp; - - if (hole_length != 0) - { - /* must skip the hole */ - memcpy(tmp.data, page, hole_offset); - memcpy(tmp.data + hole_offset, - page + (hole_offset + hole_length), - BLCKSZ - (hole_length + hole_offset)); - source = tmp.data; - - /* - * Extra data needs to be stored in WAL record for the compressed - * version of block image if the hole exists. - */ - extra_bytes = SizeOfXLogRecordBlockCompressHeader; - } - else - source = page; - - switch ((WalCompression) wal_compression) - { - case WAL_COMPRESSION_PGLZ: - len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default); - break; - - case WAL_COMPRESSION_LZ4: -#ifdef USE_LZ4 - len = LZ4_compress_default(source, dest, orig_len, - COMPRESS_BUFSIZE); - if (len <= 0) - len = -1; /* failure */ -#else - elog(ERROR, "LZ4 is not supported by this build"); -#endif - break; - - case WAL_COMPRESSION_ZSTD: -#ifdef USE_ZSTD - len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len, - ZSTD_CLEVEL_DEFAULT); - if (ZSTD_isError(len)) - len = -1; /* failure */ -#else - elog(ERROR, "zstd is not supported by this build"); -#endif - break; - - case WAL_COMPRESSION_NONE: - Assert(false); /* cannot happen */ - break; - /* no default case, so that compiler will warn */ - } - - /* - * We recheck the actual size even if compression reports success and see - * if the number of bytes saved by compression is larger than the length - * of extra data needed for the compressed version of block image. - */ - if (len >= 0 && - len + extra_bytes < orig_len) - { - *dlen = (uint16) len; /* successful compression */ - return true; - } - return false; + return &hdr_rdt; } /* @@ -1389,4 +1380,10 @@ InitXLogInsert(void) if (hdr_scratch == NULL) hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, HEADER_SCRATCH_SIZE); + + if (data_before_compression == NULL) + data_before_compression = makeStringInfo(); + if (compressed_data == NULL) + compressed_data = makeStringInfo(); + XLogEnsureCompressionBuffer(SizeOfXLogRecord); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 3596af0617..0f7702350a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -53,6 +53,7 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, static void ResetDecoder(XLogReaderState *state); static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); +static XLogRecord* XLogDecompressRecordIfNeeded(XLogRecord *record); /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 @@ -524,6 +525,16 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi return NULL; } +uint32 XLogGetRecordTotalLen(XLogRecord *record) +{ + if (record->xl_info & XLR_COMPRESSED) + { + XLogCompressionData *c = (XLogCompressionData*) record; + return c->decompressed_length; + } + return record->xl_tot_len; +} + static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) { @@ -532,7 +543,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) XLogRecPtr targetPagePtr; bool randAccess; uint32 len, - total_len; + total_len_decomp, + total_len_phisical; uint32 targetRecOff; uint32 pageHeaderSize; bool assembled; @@ -643,7 +655,8 @@ restart: * whole header. */ record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + total_len_decomp = XLogGetRecordTotalLen(record); + total_len_phisical = record->xl_tot_len; /* * If the whole record header is on this page, validate it immediately. @@ -663,12 +676,12 @@ restart: else { /* There may be no next page if it's too small. */ - if (total_len < SizeOfXLogRecord) + if (total_len_phisical < SizeOfXLogRecord) { report_invalid_record(state, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); + (uint32) SizeOfXLogRecord, total_len_phisical); goto err; } /* We'll validate the header once we have the next page. */ @@ -681,7 +694,7 @@ restart: * validated that total_len isn't garbage bytes from a recycled WAL page. */ decoded = XLogReadRecordAlloc(state, - total_len, + total_len_decomp, false /* allow_oversized */ ); if (decoded == NULL && nonblocking) { @@ -694,7 +707,7 @@ restart: } len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) + if (total_len_phisical > len) { /* Need to reassemble record */ char *contdata; @@ -724,7 +737,7 @@ restart: /* Wait for the next page to become available */ readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, + Min(total_len_phisical - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) @@ -765,12 +778,12 @@ restart: * we expect there to be left. */ if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) + total_len_phisical != (pageHeader->xlp_rem_len + gotlen)) { report_invalid_record(state, "invalid contrecord length %u (expected %lld) at %X/%X", pageHeader->xlp_rem_len, - ((long long) total_len) - gotlen, + ((long long) total_len_phisical) - gotlen, LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -813,7 +826,7 @@ restart: * also cross-checked total_len against xlp_rem_len on the second * page, and verified xlp_pageaddr on both. */ - if (total_len > state->readRecordBufSize) + if (total_len_decomp > state->readRecordBufSize) { char save_copy[XLOG_BLCKSZ * 2]; @@ -824,11 +837,11 @@ restart: Assert(gotlen <= lengthof(save_copy)); Assert(gotlen <= state->readRecordBufSize); memcpy(save_copy, state->readRecordBuf, gotlen); - allocate_recordbuf(state, total_len); + allocate_recordbuf(state, total_len_decomp); memcpy(state->readRecordBuf, save_copy, gotlen); buffer = state->readRecordBuf + gotlen; } - } while (gotlen < total_len); + } while (gotlen < total_len_phisical); Assert(gotheader); record = (XLogRecord *) state->readRecordBuf; @@ -844,7 +857,7 @@ restart: { /* Wait for the record data to become available */ readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); + Min(targetRecOff + total_len_phisical, XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) @@ -854,7 +867,7 @@ restart: if (!ValidXLogRecord(state, record, RecPtr)) goto err; - state->NextRecPtr = RecPtr + MAXALIGN(total_len); + state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical); state->DecodeRecPtr = RecPtr; } @@ -878,7 +891,7 @@ restart: { Assert(!nonblocking); decoded = XLogReadRecordAlloc(state, - total_len, + total_len_decomp, true /* allow_oversized */ ); /* allocation should always happen under allow_oversized */ Assert(decoded != NULL); @@ -1646,6 +1659,91 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len) return size; } +static char* decompression_buffer = NULL; +static uint32 decompression_buffer_len = 0; + +static XLogRecord* XLogDecompressRecordIfNeeded(XLogRecord *record) +{ + if (record->xl_info & XLR_COMPRESSED) + { + XLogCompressionData *src = (XLogCompressionData*) record; + bool decomp_success = true; + uint32 srclen = src->record_header.xl_tot_len - sizeof(XLogCompressionData); + char *dst; + XLogRecord *dst_h; + + if (decompression_buffer_len < src->decompressed_length) + { + if (decompression_buffer) + pfree(decompression_buffer); + /* Avoid small steps in growths, we compress only big records */ + decompression_buffer_len = TYPEALIGN(BLCKSZ, src->decompressed_length); + decompression_buffer = palloc(decompression_buffer_len + SizeOfXLogRecord); + } + dst_h = (XLogRecord*) decompression_buffer; + *dst_h = src->record_header; + dst_h->xl_tot_len = src->decompressed_length; + dst = (char*) &dst_h[1]; + + /* If a backup block image is compressed, decompress it */ + + if (src->method == BKPIMAGE_COMPRESS_PGLZ) + { + if (pglz_decompress((char*) &src[1], srclen, dst, + decompression_buffer_len, true) < 0) + decomp_success = false; + } + else if (src->method == BKPIMAGE_COMPRESS_LZ4) + { +#ifdef USE_LZ4 + if (LZ4_decompress_safe((char*) &src[1], dst, + srclen, decompression_buffer_len) <= 0) + decomp_success = false; +#else + // report_invalid_record(src, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + // LSN_FORMAT_ARGS((XLogRecPtr)0), + // "LZ4", + // 0); + return NULL; +#endif + } + else if (src->method == BKPIMAGE_COMPRESS_ZSTD) + { +#ifdef USE_ZSTD + size_t decomp_result = ZSTD_decompress(dst, + decompression_buffer_len, + (char*) &src[1], srclen); + if (ZSTD_isError(decomp_result)) + decomp_success = false; +#else + // report_invalid_record(src, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + // LSN_FORMAT_ARGS((XLogRecPtr)0), + // "zstd", + // 0); + return NULL; +#endif + } + else + { + // report_invalid_record(src, "could not restore image at %X/%X compressed with unknown method, block %d", + // LSN_FORMAT_ARGS((XLogRecPtr)0), + // 0); + return NULL; + } + + if (!decomp_success) + { + // report_invalid_record(src, "could not decompress image at %X/%X, block %d", + // LSN_FORMAT_ARGS((XLogRecPtr)0), + // 0); + return NULL; + } + + return (XLogRecord*) decompression_buffer; + } + return record; +} + /* * Decode a record. "decoded" must point to a MAXALIGNed memory area that has * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On @@ -1684,6 +1782,8 @@ DecodeXLogRecord(XLogReaderState *state, RelFileLocator *rlocator = NULL; uint8 block_id; + record = XLogDecompressRecordIfNeeded(record); + decoded->header = *record; decoded->lsn = lsn; decoded->next = NULL; @@ -1794,10 +1894,7 @@ DecodeXLogRecord(XLogReaderState *state, if (BKPIMAGE_COMPRESSED(blk->bimg_info)) { - if (blk->bimg_info & BKPIMAGE_HAS_HOLE) - COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16)); - else - blk->hole_length = 0; + Assert(false); } else blk->hole_length = BLCKSZ - blk->bimg_len; @@ -1836,19 +1933,6 @@ DecodeXLogRecord(XLogReaderState *state, goto err; } - /* - * Cross-check that bimg_len < BLCKSZ if it is compressed. - */ - if (BKPIMAGE_COMPRESSED(blk->bimg_info) && - blk->bimg_len == BLCKSZ) - { - report_invalid_record(state, - "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X", - (unsigned int) blk->bimg_len, - LSN_FORMAT_ARGS(state->ReadRecPtr)); - goto err; - } - /* * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is * set nor COMPRESSED(). @@ -2057,7 +2141,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) { DecodedBkpBlock *bkpb; char *ptr; - PGAlignedBlock tmp; if (block_id > record->record->max_block_id || !record->record->blocks[block_id].in_use) @@ -2081,63 +2164,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (BKPIMAGE_COMPRESSED(bkpb->bimg_info)) { - /* If a backup block image is compressed, decompress it */ - bool decomp_success = true; - - if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0) - { - if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data, - BLCKSZ - bkpb->hole_length, true) < 0) - decomp_success = false; - } - else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0) - { -#ifdef USE_LZ4 - if (LZ4_decompress_safe(ptr, tmp.data, - bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0) - decomp_success = false; -#else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", - LSN_FORMAT_ARGS(record->ReadRecPtr), - "LZ4", - block_id); - return false; -#endif - } - else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0) - { -#ifdef USE_ZSTD - size_t decomp_result = ZSTD_decompress(tmp.data, - BLCKSZ - bkpb->hole_length, - ptr, bkpb->bimg_len); - - if (ZSTD_isError(decomp_result)) - decomp_success = false; -#else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", - LSN_FORMAT_ARGS(record->ReadRecPtr), - "zstd", - block_id); - return false; -#endif - } - else - { - report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d", - LSN_FORMAT_ARGS(record->ReadRecPtr), - block_id); - return false; - } - - if (!decomp_success) - { - report_invalid_record(record, "could not decompress image at %X/%X, block %d", - LSN_FORMAT_ARGS(record->ReadRecPtr), - block_id); - return false; - } - - ptr = tmp.data; + Assert(false); } /* generate page, taking into account hole if necessary */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index c9d8cd796a..a6b2420204 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -5057,7 +5057,7 @@ struct config_enum ConfigureNamesEnum[] = NULL }, &wal_compression, - WAL_COMPRESSION_NONE, wal_compression_options, + WAL_COMPRESSION_ZSTD, wal_compression_options, NULL, NULL, NULL }, diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h index 71894262fb..0266e224bb 100644 --- a/src/include/access/xloginsert.h +++ b/src/include/access/xloginsert.h @@ -44,6 +44,7 @@ extern void XLogBeginInsert(void); extern void XLogSetRecordFlags(uint8 flags); extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info); extern void XLogEnsureRecordSpace(int max_block_id, int ndatas); +extern void XLogEnsureCompressionBuffer(uint32 extraLen); extern void XLogRegisterData(const char *data, uint32 len); extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags); extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 9738462d3c..0cf64ab1b0 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -375,6 +375,8 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, /* Forget error produced by XLogReaderValidatePageHeader(). */ extern void XLogReaderResetError(XLogReaderState *state); +extern uint32 XLogGetRecordTotalLen(XLogRecord *record); + /* * Error information from WALRead that both backend and frontend caller can * process. Currently only errors from pg_pread can be reported. diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index a06833ce0a..d113cd6aa4 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -90,6 +90,8 @@ typedef struct XLogRecord */ #define XLR_CHECK_CONSISTENCY 0x02 +#define XLR_COMPRESSED 0x04 + /* * Header info for block data appended to an XLOG record. * @@ -166,17 +168,14 @@ typedef struct XLogRecordBlockImageHeader ((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \ BKPIMAGE_COMPRESS_ZSTD)) != 0) -/* - * Extra header information used when page image has "hole" and - * is compressed. - */ -typedef struct XLogRecordBlockCompressHeader +/* Record of a compressed header */ +// TODO: replace sizeof(XLogCompressionData) everywhere +typedef struct XLogCompressionData { - uint16 hole_length; /* number of bytes in "hole" */ -} XLogRecordBlockCompressHeader; - -#define SizeOfXLogRecordBlockCompressHeader \ - sizeof(XLogRecordBlockCompressHeader) + XLogRecord record_header; + char method; + uint32 decompressed_length; +} XLogCompressionData; /* * Maximum size of the header for a block reference. This is used to size a @@ -185,7 +184,6 @@ typedef struct XLogRecordBlockCompressHeader #define MaxSizeOfXLogRecordBlockHeader \ (SizeOfXLogRecordBlockHeader + \ SizeOfXLogRecordBlockImageHeader + \ - SizeOfXLogRecordBlockCompressHeader + \ sizeof(RelFileLocator) + \ sizeof(BlockNumber)) diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 449e50bd78..9ff11e3e6a 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -355,7 +355,7 @@ * Enable debugging print statements for WAL-related operations; see * also the wal_debug GUC var. */ -/* #define WAL_DEBUG */ +#define WAL_DEBUG /* * Enable tracing of syncscan operations (see also the trace_syncscan GUC var). diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl index f408d4f69b..942cf26f3f 100644 --- a/src/test/recovery/t/026_overwrite_contrecord.pl +++ b/src/test/recovery/t/026_overwrite_contrecord.pl @@ -56,7 +56,7 @@ $$; my $initfile = $node->safe_psql('postgres', 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())'); $node->safe_psql('postgres', - qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))} + qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))} ); #$node->safe_psql('postgres', qq{create table foo ()}); my $endfile = $node->safe_psql('postgres', -- 2.39.5 (Apple Git-154)