From 58c94d4772407125ea7f6a577a047719bced56d8 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Mon, 19 Jun 2023 17:46:45 +0200 Subject: [PATCH v2 6/6] Reformat the XLog record header Many WAL records don't utilize all bytes of the XLog header, resulting in a lot of wasted bytes. This new XLog header format saves bytes in several ways: - Records that don't need the XID won't include it This potentially saves 4 bytes /record - Records that are small enough don't need 4 bytes to describe their length By varint encoding the length, we save up to 3 bytes /record (up to 4 bytes /record if dataless records are considered) - Only include xl_rmgrinfo if it is non-0 Several RMGRs have a record that they use most often. We save 1 byte/record if that record id is 0. - Lose some alignment losses We save 1 byte/record by removing one byte of alignment losses --- src/backend/access/transam/xlog.c | 133 ++++++++++++++------- src/backend/access/transam/xloginsert.c | 135 ++++++++++++++++------ src/backend/access/transam/xlogreader.c | 93 ++++++++------- src/backend/access/transam/xlogrecovery.c | 4 +- src/bin/pg_resetwal/pg_resetwal.c | 38 +++--- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 1 + src/include/access/xlog_internal.h | 102 +++++++++++++++- src/include/access/xlogreader.h | 4 +- src/include/access/xlogrecord.h | 125 +++++++++++++++++--- 10 files changed, 483 insertions(+), 154 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4b1ff0d1aa..5937b43f50 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -737,17 +737,18 @@ XLogInsertRecord(XLogRecData *rdata, XLogCtlInsert *Insert = &XLogCtl->Insert; pg_crc32c rdata_crc; bool inserted; - XLogRecord *rechdr = (XLogRecord *) rdata->data; - uint8 rmgrinfo = rechdr->xl_rmgrinfo; - bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID && - rmgrinfo == XLOG_SWITCH); + XLogRecHdr rechdr; + bool isLogSwitch; XLogRecPtr StartPos; XLogRecPtr EndPos; + XLogRecPtr xl_prev; bool prevDoPageWrites = doPageWrites; TimeLineID insertTLI; + int rec_payload_len, + rec_hdr_len; - /* we assume that all of the record header is in the first chunk */ - Assert(rdata->len >= SizeOfXLogRecord); + /* we assume that all data of the record header is in the first chunk */ + Assert(rdata->len >= XLogRecordMinHdrSize); /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) @@ -758,6 +759,17 @@ XLogInsertRecord(XLogRecData *rdata, * change, so we can read it without a lock. */ insertTLI = XLogCtl->InsertTimeLineID; + rechdr = (XLogRecHdr) rdata->data; + + { + XLogRecord record = {0}; + XLogReadRecHdrInto(rechdr, (Size) rdata->len, &record); + + isLogSwitch = (record.xl_rmid == RM_XLOG_ID && + record.xl_rmgrinfo == XLOG_SWITCH); + rec_hdr_len = (int) XLogRecordHdrLen(rechdr->xl_info); + rec_payload_len = (int) record.xl_payload_len; + } /*---------- * @@ -834,34 +846,44 @@ XLogInsertRecord(XLogRecData *rdata, } /* - * Reserve space for the record in the WAL. This also sets the xl_prev - * pointer. + * Reserve space for the record in the WAL. */ if (isLogSwitch) - inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); + inserted = ReserveXLogSwitch(&StartPos, &EndPos, &xl_prev); else { - ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, - &rechdr->xl_prev); + ReserveXLogInsertLocation(rec_hdr_len + rec_payload_len, + &StartPos, &EndPos, &xl_prev); inserted = true; } if (inserted) { + char *rec = (char *) rechdr; + + /* fill in xl_prev */ + memcpy(rec + rec_hdr_len - sizeof(pg_crc32c) - sizeof(XLogRecPtr), + &xl_prev, sizeof(XLogRecPtr)); + /* * Now that xl_prev has been filled in, calculate CRC of the record * header. */ - rdata_crc = rechdr->xl_crc; - COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc)); + + memcpy(&rdata_crc, rec + rec_hdr_len - sizeof(pg_crc32c), + sizeof(pg_crc32c)); + + COMP_CRC32C(rdata_crc, rec, rec_hdr_len - sizeof(pg_crc32c)); FIN_CRC32C(rdata_crc); - rechdr->xl_crc = rdata_crc; + + memcpy(rec + rec_hdr_len - sizeof(pg_crc32c), + &rdata_crc, sizeof(pg_crc32c)); /* * All the record data, including the header, is now ready to be * inserted. Copy the record in the space reserved. */ - CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata, + CopyXLogRecordToWAL(rec_hdr_len + rec_payload_len, isLogSwitch, rdata, StartPos, EndPos, insertTLI); /* @@ -932,7 +954,8 @@ XLogInsertRecord(XLogRecData *rdata, */ if (inserted) { - EndPos = StartPos + SizeOfXLogRecord; + /* xlog switch is minimal record header, plus a byte for rmgrinfo */ + EndPos = StartPos + XLogRecordMinHdrSize + sizeof(uint8); if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size); @@ -949,7 +972,7 @@ XLogInsertRecord(XLogRecData *rdata, if (XLOG_DEBUG) { static XLogReaderState *debug_reader = NULL; - XLogRecord *record; + XLogRecHdr record; DecodedXLogRecord *decoded; StringInfoData buf; StringInfoData recordBuf; @@ -971,9 +994,9 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); /* We also need temporary space to decode the record. */ - record = (XLogRecord *) recordBuf.data; + record = (XLogRecHdr) recordBuf.data; decoded = (DecodedXLogRecord *) - palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len)); + palloc(DecodeXLogRecordRequiredSpace(recordBuf.len)); if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, @@ -1018,7 +1041,7 @@ XLogInsertRecord(XLogRecData *rdata, /* Report WAL traffic to the instrumentation. */ if (inserted) { - pgWalUsage.wal_bytes += rechdr->xl_tot_len; + pgWalUsage.wal_bytes += rec_hdr_len + rec_payload_len; pgWalUsage.wal_records++; pgWalUsage.wal_fpi += num_fpi; } @@ -1049,10 +1072,10 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, uint64 endbytepos; uint64 prevbytepos; - size = MAXALIGN(size); + size = XLP_ALIGN(size); /* All (non xlog-switch) records should contain data. */ - Assert(size > SizeOfXLogRecord); + Assert(size > XLogRecordMinHdrSize); /* * The duration the spinlock needs to be held is minimized by minimizing @@ -1103,7 +1126,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) uint64 startbytepos; uint64 endbytepos; uint64 prevbytepos; - uint32 size = MAXALIGN(SizeOfXLogRecord); + uint32 size = XLP_ALIGN(XLogRecordMinHdrSize + sizeof(uint8)); XLogRecPtr ptr; uint32 segleft; @@ -1176,10 +1199,10 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, freespace = INSERT_FREESPACE(CurrPos); /* - * there should be enough space for at least the first field (xl_tot_len) + * there should be enough space for at least the first XL_ALIGN quantum * on this page. */ - Assert(freespace >= sizeof(uint32)); + Assert(freespace >= XLP_ALIGN(1)); /* Copy record data */ written = 0; @@ -1247,7 +1270,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, if (isLogSwitch && XLogSegmentOffset(CurrPos, wal_segment_size) != 0) { /* An xlog-switch record doesn't contain any data besides the header */ - Assert(write_len == SizeOfXLogRecord); + Assert(write_len == XLogRecordMinHdrSize + sizeof(uint8)); /* Assert that we did reserve the right amount of space */ Assert(XLogSegmentOffset(EndPos, wal_segment_size) == 0); @@ -1291,7 +1314,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, else { /* Align the end position, so that the next record starts aligned */ - CurrPos = MAXALIGN64(CurrPos); + CurrPos = XLP_ALIGN(CurrPos); } if (CurrPos != EndPos) @@ -4649,11 +4672,15 @@ BootStrapXLOG(void) char *buffer; XLogPageHeader page; XLogLongPageHeader longpage; - XLogRecord *record; + XLogRecHdr rec_hdr; + char *baserecptr; char *recptr; uint64 sysidentifier; struct timeval tv; pg_crc32c crc; + const int rec_hdr_len = offsetof(XLogRecHdrData, xl_hdrdata) + + sizeof(uint8) + sizeof(XLogRecPtr) + sizeof(pg_crc32c); + const int rec_payload_len = sizeof(uint8) * 2 + sizeof(CheckPoint); /* allow ordinary WAL segment creation, like StartupXLOG() would */ SetInstallXLogFileSegmentActive(); @@ -4724,28 +4751,52 @@ BootStrapXLOG(void) longpage->xlp_seg_size = wal_segment_size; longpage->xlp_xlog_blcksz = XLOG_BLCKSZ; + /* if this changes, we need to update the code below */ + Assert(XLogLengthToSizeClass(SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint), XLS_UINT32) == XLS_UINT8); + /* if this changes, we need to add XLR_HAS_RMGRINFO */ + Assert(XLOG_CHECKPOINT_SHUTDOWN == 0); + /* Insert the initial checkpoint record */ - recptr = ((char *) page + SizeOfXLogLongPHD); - record = (XLogRecord *) recptr; - record->xl_prev = 0; - record->xl_xid = InvalidTransactionId; - record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint); - record->xl_info = 0; - record->xl_rmgrinfo = XLOG_CHECKPOINT_SHUTDOWN; - record->xl_rmid = RM_XLOG_ID; - recptr += SizeOfXLogRecord; + baserecptr = recptr = (((char *) page) + SizeOfXLogLongPHD); + + rec_hdr = (XLogRecHdr) baserecptr; + rec_hdr->xl_info = (char) XLS_UINT8; + rec_hdr->xl_rmid = (char) RM_XLOG_ID; + recptr += offsetof(XLogRecHdrData, xl_hdrdata); + + recptr += XLogWriteLength(SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint), + XLS_UINT8, XLS_UINT8, recptr); + + /* include prevptr */ + { + XLogRecPtr prevptr = 0; + memcpy(recptr, &prevptr, sizeof(XLogRecPtr)); + recptr += sizeof(XLogRecPtr); + } + + /* reserve location of crc */ + recptr += sizeof(pg_crc32c); + + Assert(recptr - baserecptr == XLogRecordHdrLen(rec_hdr->xl_info)); + Assert(recptr - baserecptr == rec_hdr_len); + /* fill the XLogRecordDataHeaderShort struct */ *(recptr++) = (char) XLR_BLOCK_ID_DATA_SHORT; *(recptr++) = sizeof(checkPoint); memcpy(recptr, &checkPoint, sizeof(checkPoint)); recptr += sizeof(checkPoint); - Assert(recptr - (char *) record == record->xl_tot_len); + + /* Assert length of record matches expectations */ + Assert(recptr - baserecptr == XLogRecordTotalLength(rec_hdr)); + Assert((rec_hdr)->xl_info == XLS_UINT8); INIT_CRC32C(crc); - COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); - COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); + COMP_CRC32C(crc, baserecptr + rec_hdr_len, rec_payload_len); + COMP_CRC32C(crc, baserecptr, rec_hdr_len - sizeof(pg_crc32c)); FIN_CRC32C(crc); - record->xl_crc = crc; + + memcpy(baserecptr + rec_hdr_len - sizeof(pg_crc32c), + &crc, sizeof(pg_crc32c)); /* Create first XLOG segment file */ openLogTLI = BootstrapTimeLineID; diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 4816c5284b..194196f596 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -545,12 +545,16 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included) { XLogRecData *rdt; - uint64 total_len = 0; + uint64 payload_len = 0; + XLogSizeClass payload_sizeclass = XLS_EMPTY; int block_id; pg_crc32c rdata_crc; registered_buffer *prev_regbuf = NULL; XLogRecData *rdt_datas_last; - XLogRecord *rechdr; + TransactionId xid; + XLogRecHdr rechdr; + uint8 xlr_flags = 0; + uint32 rec_hdr_len; char *scratch = hdr_scratch; /* @@ -558,9 +562,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP * All the modifications we do to the rdata chains below must handle that. */ - /* The record begins with the fixed-size header */ - rechdr = (XLogRecord *) scratch; - scratch += SizeOfXLogRecord; + /* The record begins with the variable-size header */ + rechdr = (XLogRecHdr) scratch; + + scratch += XLogRecordMaxHdrSize; hdr_rdt.next = NULL; rdt_datas_last = &hdr_rdt; @@ -779,7 +784,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP } } - total_len += bimg.length; + payload_len += bimg.length; } if (needs_data) @@ -797,7 +802,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP bkpb.fork_flags |= BKPBLOCK_HAS_DATA; data_length = (uint16) regbuf->rdata_len; data_sizeclass = XLogLengthToSizeClass(data_length, XLS_UINT16); - total_len += regbuf->rdata_len; + payload_len += regbuf->rdata_len; rdt_datas_last->next = regbuf->rdata_head; rdt_datas_last = regbuf->rdata_tail; @@ -851,16 +856,33 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP } /* followed by toplevel XID, if not already included in previous record */ - if (curinsert_flags & XLOG_INCLUDE_XID && IsSubxactTopXidLogPending()) + if (curinsert_flags & XLOG_INCLUDE_XID) { - TransactionId xid = GetTopTransactionIdIfAny(); + xid = GetCurrentTransactionIdIfAny(); + + if (IsSubxactTopXidLogPending()) + { + TransactionId txid = GetTopTransactionIdIfAny(); + + xlr_flags |= XLR_HAS_XID; + Assert(TransactionIdIsValid(xid)); - /* Set the flag that the top xid is included in the WAL */ - *topxid_included = true; + /* Set the flag that the top xid is included in the WAL */ + *topxid_included = true; - *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; - memcpy(scratch, &xid, sizeof(TransactionId)); - scratch += sizeof(TransactionId); + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &txid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + else if (TransactionIdIsValid(xid)) + { + xlr_flags |= XLR_HAS_XID; + } + } + else + { + xid = InvalidTransactionId; + Assert((xlr_flags & XLR_HAS_XID) == 0); } /* followed by main data, if any */ @@ -889,12 +911,64 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP } rdt_datas_last->next = mainrdata_head; rdt_datas_last = mainrdata_last; - total_len += mainrdata_len; + payload_len += mainrdata_len; } rdt_datas_last->next = NULL; - hdr_rdt.len = (scratch - hdr_scratch); - total_len += hdr_rdt.len; + /* Add the block headers section's length to the payload */ + payload_len += scratch - (hdr_scratch + XLogRecordMaxHdrSize); + + /* + * Fill in the fields in the record header. Prev-link is filled in later, + * once we know where in the WAL the record will be inserted. The CRC does + * not include the record header yet. + */ + payload_sizeclass = XLogLengthToSizeClass(payload_len, XLS_UINT32); + + xlr_flags |= payload_sizeclass; + + if (rmgr_info != 0) + xlr_flags |= XLR_HAS_RMGRINFO; + + /* Set up the xlog header. and xl_rmgr */ + rechdr->xl_info = xlr_flags; + rechdr->xl_rmid = rmid; + + rec_hdr_len = 0; + + /* next, xl_payload_len */ + rec_hdr_len += XLogWriteLength(payload_len, payload_sizeclass, + XLS_UINT32, + &rechdr->xl_hdrdata[rec_hdr_len]); + + if (xlr_flags & XLR_HAS_RMGRINFO) + rechdr->xl_hdrdata[rec_hdr_len++] = (char) rmgr_info; + + if (xlr_flags & XLR_HAS_XID) + { + Assert(curinsert_flags & XLOG_INCLUDE_XID); + Assert(TransactionIdIsValid(xid)); + + memcpy(&rechdr->xl_hdrdata[rec_hdr_len], &xid, sizeof(TransactionId)); + rec_hdr_len += sizeof(TransactionId); + } + + /* reserve space for XLogRecPtr and checksum */ + rec_hdr_len += sizeof(XLogRecPtr); + rec_hdr_len += sizeof(pg_crc32c); + /* Add static header length */ + rec_hdr_len += offsetof(XLogRecHdrData, xl_hdrdata); + + Assert(rec_hdr_len == XLogRecordHdrLen(rechdr->xl_info)); + + /* move the record to be placed the rest of the payload */ + memmove(hdr_scratch + XLogRecordMaxHdrSize - rec_hdr_len, + hdr_scratch, rec_hdr_len); + + rechdr = (XLogRecHdr) (hdr_scratch + XLogRecordMaxHdrSize - rec_hdr_len); + + hdr_rdt.data = (char *) rechdr; + hdr_rdt.len = (scratch - hdr_rdt.data); /* * Calculate CRC of the data @@ -905,10 +979,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP * header. */ INIT_CRC32C(rdata_crc); - COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + COMP_CRC32C(rdata_crc, hdr_scratch + XLogRecordMaxHdrSize, + hdr_rdt.len - rec_hdr_len); for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) COMP_CRC32C(rdata_crc, rdt->data, rdt->len); + memcpy(hdr_rdt.data + rec_hdr_len - sizeof(pg_crc32c), + &rdata_crc, sizeof(pg_crc32c)); + /* * Ensure that the XLogRecord is not too large. * @@ -917,28 +995,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rmgr_info, XLogRecPtr RedoRecP * not emit records larger than the sizes advertised to be supported. This * cap is based on DecodeXLogRecordRequiredSpace(). */ - if (total_len >= XLogRecordMaxSize) + if (payload_len + rec_hdr_len >= XLogRecordMaxSize) ereport(ERROR, (errmsg_internal("oversized WAL record"), errdetail_internal("WAL record would be %llu bytes (of maximum %u bytes); rmid %u flags %u.", - (unsigned long long) total_len, XLogRecordMaxSize, rmid, info))); - - /* - * Fill in the fields in the record header. Prev-link is filled in later, - * once we know where in the WAL the record will be inserted. The CRC does - * not include the record header yet. - */ - if (curinsert_flags & XLOG_INCLUDE_XID) - rechdr->xl_xid = GetCurrentTransactionIdIfAny(); - else - rechdr->xl_xid = InvalidTransactionId; - - rechdr->xl_tot_len = (uint32) total_len; - rechdr->xl_info = info; - rechdr->xl_rmid = rmid; - rechdr->xl_rmgrinfo = rmgr_info; - rechdr->xl_prev = InvalidXLogRecPtr; - rechdr->xl_crc = rdata_crc; + (unsigned long long) payload_len, XLogRecordMaxSize, rmid, info))); return &hdr_rdt; } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 733de8a8dc..13da1a11c9 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -49,8 +49,9 @@ static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, static void XLogReaderInvalReadState(XLogReaderState *state); static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); -static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, + XLogRecPtr PrevRecPtr, XLogRecHdr rechdr, + bool randAccess); +static bool ValidXLogRecord(XLogReaderState *state, XLogRecHdr record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, @@ -535,7 +536,7 @@ static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) { XLogRecPtr RecPtr; - XLogRecord *record; + XLogRecHdr record; XLogRecPtr targetPagePtr; bool randAccess; uint32 len, @@ -602,7 +603,7 @@ restart: * fits on the same page. */ readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); + Min(targetRecOff + XLogRecordMaxHdrSize, XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) @@ -644,13 +645,13 @@ restart: * Read the record length. * * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. + * header might not fit on this page. xl_payload_len is the third field of + * the data and starts at the third byte, so it must be on this page (the + * records are aligned to 8 bytes), but we cannot access most other fields + * until we've verified that we got the whole header. */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + record = (XLogRecHdr) (state->readBuf + (RecPtr % XLOG_BLCKSZ)); + total_len = XLogRecordTotalLength(record); /* * If the whole record header is on this page, validate it immediately. @@ -660,7 +661,7 @@ restart: * record" code path below; otherwise we might fail to apply * ValidXLogRecordHeader at all. */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + if (targetRecOff <= XLOG_BLCKSZ - XLogRecordHdrLen(record->xl_info)) { if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, randAccess)) @@ -670,12 +671,12 @@ restart: else { /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) + if (total_len < XLogRecordMinHdrSize) { report_invalid_record(state, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); + (uint32) XLogRecordMinHdrSize, total_len); goto err; } gotheader = false; @@ -818,7 +819,7 @@ restart: /* If we just reassembled the record header, validate it. */ if (!gotheader) { - record = (XLogRecord *) state->readRecordBuf; + record = (XLogRecHdr) state->readRecordBuf; if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, randAccess)) goto err; @@ -828,7 +829,7 @@ restart: Assert(gotheader); - record = (XLogRecord *) state->readRecordBuf; + record = (XLogRecHdr) state->readRecordBuf; if (!ValidXLogRecord(state, record, RecPtr)) goto err; @@ -860,7 +861,7 @@ restart: * Special processing if it's an XLOG SWITCH record */ if (record->xl_rmid == RM_XLOG_ID && - record->xl_rmgrinfo == XLOG_SWITCH) + XLRHdrGetRmgrInfo(record) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->NextRecPtr += state->segcxt.ws_segsize - 1; @@ -1114,22 +1115,25 @@ XLogReaderInvalReadState(XLogReaderState *state) */ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, + XLogRecPtr PrevRecPtr, XLogRecHdr rechdr, bool randAccess) { - if (record->xl_tot_len < SizeOfXLogRecord) + XLogRecord record = {0}; + XLogReadRecHdrInto(rechdr, XLogRecordHdrLen(rechdr->xl_info), &record); + + if (XLogRecordTotalLength(rechdr) < XLogRecordMinHdrSize) { report_invalid_record(state, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, record->xl_tot_len); + (uint32) XLogRecordMinHdrSize, record.xl_payload_len); return false; } - if (!RmgrIdIsValid(record->xl_rmid)) + if (!RmgrIdIsValid(rechdr->xl_rmid)) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", - record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); + rechdr->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; } if (randAccess) @@ -1138,12 +1142,14 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, * We can't exactly verify the prev-link, but surely it should be less * than the record's own address. */ - if (!(record->xl_prev < RecPtr)) + if (!(record.xl_prev < RecPtr)) { report_invalid_record(state, - "record with incorrect prev-link %X/%X at %X/%X", - LSN_FORMAT_ARGS(record->xl_prev), - LSN_FORMAT_ARGS(RecPtr)); + "record with incorrect prev-link %X/%X at %X/%X, info %u, rmgr %u", + LSN_FORMAT_ARGS(record.xl_prev), + LSN_FORMAT_ARGS(RecPtr), + (uint32) record.xl_info, + (uint32) record.xl_rmid); return false; } } @@ -1154,12 +1160,13 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, * check guards against torn WAL pages where a stale but valid-looking * WAL record starts on a sector boundary. */ - if (record->xl_prev != PrevRecPtr) + if (record.xl_prev != PrevRecPtr) { report_invalid_record(state, - "record with incorrect prev-link %X/%X at %X/%X", - LSN_FORMAT_ARGS(record->xl_prev), - LSN_FORMAT_ARGS(RecPtr)); + "record with incorrect prev-link %X/%X at %X/%X, expected %X/%X", + LSN_FORMAT_ARGS(record.xl_prev), + LSN_FORMAT_ARGS(RecPtr), + LSN_FORMAT_ARGS(PrevRecPtr)); return false; } } @@ -1179,19 +1186,26 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, * SizeOfXLogRecord. */ static bool -ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) +ValidXLogRecord(XLogReaderState *state, XLogRecHdr record, XLogRecPtr recptr) { - pg_crc32c crc; + pg_crc32c crc, + hdr_crc; + Size hdr_len = XLogRecordHdrLen(record->xl_info); + Size rec_length = XLogRecordTotalLength(record); /* Calculate the CRC */ INIT_CRC32C(crc); - COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); + COMP_CRC32C(crc, ((char *) record) + hdr_len, rec_length - hdr_len); /* include the record header last */ - COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); + COMP_CRC32C(crc, (char *) record, hdr_len - sizeof(pg_crc32c)); FIN_CRC32C(crc); - if (!EQ_CRC32C(record->xl_crc, crc)) + memcpy(&hdr_crc, ((char *) record) + hdr_len - sizeof(pg_crc32c), + sizeof(pg_crc32c)); + + if (!EQ_CRC32C(hdr_crc, crc)) { + Assert(false); report_invalid_record(state, "incorrect resource manager data checksum in record at %X/%X", LSN_FORMAT_ARGS(recptr)); @@ -1651,7 +1665,7 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len) bool DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, - XLogRecord *record, + XLogRecHdr record, XLogRecPtr lsn, char **errormsg) { @@ -1674,7 +1688,7 @@ DecodeXLogRecord(XLogReaderState *state, RelFileLocator *rlocator = NULL; uint8 block_id; - decoded->header = *record; + XLogReadRecHdrInto(record, XLogRecordHdrLen(record->xl_info), &decoded->header); decoded->lsn = lsn; decoded->next = NULL; decoded->record_origin = InvalidRepOriginId; @@ -1683,8 +1697,8 @@ DecodeXLogRecord(XLogReaderState *state, decoded->main_data_len = 0; decoded->max_block_id = -1; ptr = (char *) record; - ptr += SizeOfXLogRecord; - remaining = record->xl_tot_len - SizeOfXLogRecord; + ptr += XLogRecordHdrLen(record->xl_info); + remaining = decoded->header.xl_payload_len; /* Decode the headers */ datatotal = 0; @@ -1960,7 +1974,7 @@ DecodeXLogRecord(XLogReaderState *state, /* Report the actual size we used. */ decoded->size = MAXALIGN(out - (char *) decoded); - Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= + Assert(DecodeXLogRecordRequiredSpace(decoded->header.xl_payload_len) >= decoded->size); return true; @@ -1989,6 +2003,7 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, if (!XLogRecGetBlockTagExtended(record, block_id, rlocator, forknum, blknum, NULL)) { + Assert(false); #ifndef FRONTEND elog(ERROR, "could not locate backup block with ID %d in WAL record", block_id); diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 56fa2f74a2..0362ba747f 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4000,10 +4000,10 @@ ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, (errmsg("invalid xl_rmgrinfo in checkpoint record"))); return NULL; } - if (record->xl_tot_len != SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint)) + if (record->xl_payload_len != SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint)) { ereport(LOG, - (errmsg("invalid length of checkpoint record"))); + (errmsg("invalid payload length of checkpoint record"))); return NULL; } return record; diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 80cb9a5bc4..4e171eed1f 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -1040,12 +1040,15 @@ WriteEmptyXLOG(void) PGAlignedXLogBlock buffer; XLogPageHeader page; XLogLongPageHeader longpage; - XLogRecord *record; pg_crc32c crc; char path[MAXPGPATH]; int fd; int nbytes; char *recptr; + char *baserecptr; + const int rec_hdr_len = offsetof(XLogRecHdrData, xl_hdrdata) + + sizeof(uint8) + sizeof(XLogRecPtr) + sizeof(pg_crc32c); + const int rec_payload_len = sizeof(uint8) * 2 + sizeof(CheckPoint); memset(buffer.data, 0, XLOG_BLCKSZ); @@ -1061,26 +1064,33 @@ WriteEmptyXLOG(void) longpage->xlp_xlog_blcksz = XLOG_BLCKSZ; /* Insert the initial checkpoint record */ - recptr = (char *) page + SizeOfXLogLongPHD; - record = (XLogRecord *) recptr; - record->xl_prev = 0; - record->xl_xid = InvalidTransactionId; - record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint); - record->xl_info = 0; - record->xl_rmgrinfo = XLOG_CHECKPOINT_SHUTDOWN; - record->xl_rmid = RM_XLOG_ID; - - recptr += SizeOfXLogRecord; + baserecptr = recptr = (char *) page + SizeOfXLogLongPHD; + *(recptr++) = (char) XLS_UINT8; + *(recptr++) = (char) RM_XLOG_ID; + recptr += XLogWriteLength(SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint), + XLS_UINT8, XLS_UINT8, recptr); + + /* include prevptr */ + { + XLogRecPtr prevptr = 0; + memcpy(recptr, &prevptr, sizeof(XLogRecPtr)); + recptr += sizeof(XLogRecPtr); + } + /* reserve location of crc */ + recptr += sizeof(pg_crc32c); + *(recptr++) = (char) XLR_BLOCK_ID_DATA_SHORT; *(recptr++) = sizeof(CheckPoint); memcpy(recptr, &ControlFile.checkPointCopy, sizeof(CheckPoint)); INIT_CRC32C(crc); - COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); - COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); + COMP_CRC32C(crc, baserecptr + rec_hdr_len, rec_payload_len); + COMP_CRC32C(crc, baserecptr, rec_hdr_len - sizeof(pg_crc32c)); FIN_CRC32C(crc); - record->xl_crc = crc; + + memcpy(baserecptr + rec_hdr_len - sizeof(pg_crc32c), + &crc, sizeof(pg_crc32c)); /* Write the first page */ XLogFilePath(path, ControlFile.checkPointCopy.ThisTimeLineID, diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index 3b6a497e1b..7038ba0833 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -41,7 +41,7 @@ typedef enum RmgrIds static inline bool RmgrIdIsBuiltin(int rmid) { - return rmid <= RM_MAX_BUILTIN_ID; + return rmid <= RM_MAX_BUILTIN_ID && rmid != RM_INVALID_ID; } static inline bool diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 463bcb67c5..b53d731f56 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,6 +25,7 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */ +PG_RMGR(RM_INVALID_ID, "invalid", NULL, NULL, NULL, NULL, NULL, NULL, NULL) PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 876e2790f9..ea32a9ba1b 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -26,12 +26,20 @@ #include "pgtime.h" #include "storage/block.h" #include "storage/relfilelocator.h" +#include "transam.h" +/* + * WAL records (and other XLogPage page-level content) are aligned to 8 bytes. + * + * Note that the contents of the records is not aligned (!) + */ +#define ALIGNOF_XLP_CONTENT 8 +#define XLP_ALIGN(LEN) TYPEALIGN64(ALIGNOF_XLP_CONTENT, LEN) /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD113 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD114 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { @@ -49,7 +57,7 @@ typedef struct XLogPageHeaderData uint32 xlp_rem_len; /* total len of remaining data for record */ } XLogPageHeaderData; -#define SizeOfXLogShortPHD MAXALIGN(sizeof(XLogPageHeaderData)) +#define SizeOfXLogShortPHD XLP_ALIGN(sizeof(XLogPageHeaderData)) typedef XLogPageHeaderData *XLogPageHeader; @@ -66,7 +74,7 @@ typedef struct XLogLongPageHeaderData uint32 xlp_xlog_blcksz; /* just as a cross-check */ } XLogLongPageHeaderData; -#define SizeOfXLogLongPHD MAXALIGN(sizeof(XLogLongPageHeaderData)) +#define SizeOfXLogLongPHD XLP_ALIGN(sizeof(XLogLongPageHeaderData)) typedef XLogLongPageHeaderData *XLogLongPageHeader; @@ -381,7 +389,7 @@ static inline Size XLogWriteLength(uint32 length, XLogSizeClass sizeClass, case caseSizeClass: \ if ((caseSizeClass) <= maxSizeClass) \ { \ - field_type typedLength = length; \ + field_type typedLength = (field_type) length; \ memcpy(output, &typedLength, sizeof(field_type)); \ written = sizeof(field_type); \ } \ @@ -394,6 +402,9 @@ static inline Size XLogWriteLength(uint32 length, XLogSizeClass sizeClass, WRITE_OP(XLS_UINT8, uint8); WRITE_OP(XLS_UINT16, uint16); WRITE_OP(XLS_UINT32, uint32); + default: + Assert(false); + pg_unreachable(); } #undef WRITE_OP @@ -436,12 +447,95 @@ static inline Size XLogReadLength(uint32 *length, XLogSizeClass sizeClass, READ_OP(XLS_UINT8, uint8); READ_OP(XLS_UINT16, uint16); READ_OP(XLS_UINT32, uint32); + default: + Assert(false); + pg_unreachable(); } #undef READ_OP return readSize; } + +inline static uint8 XLRHdrGetRmgrInfo(XLogRecHdr record) +{ + XLogSizeClass recSizeClass; + int offset; + + if (!(record->xl_info & XLR_HAS_RMGRINFO)) + return 0; + + recSizeClass = XLR_SIZECLASS(record->xl_info); + /* xl_rmgrinfo is located immediately behind the xl_payload_len field */ + offset = XLogSizeClassToByteLength(recSizeClass); + + return (uint8) record->xl_hdrdata[offset]; +} + + +/* Works on any partial record */ +inline static Size XLogRecordTotalLength(XLogRecHdr record) +{ + uint8 xl_info = record->xl_info; + XLogSizeClass sizeClass; + uint32 length = 0; + sizeClass = XLR_SIZECLASS(xl_info); + + XLogReadLength(&length, sizeClass, XLS_UINT32, + &record->xl_hdrdata[0], 6); + + return (Size) length + XLogRecordHdrLen(xl_info); +} + +inline static void XLogReadRecHdrInto(XLogRecHdr recdata, Size length, + XLogRecord *record) +{ + Size offset = 0; + Size hdr_size PG_USED_FOR_ASSERTS_ONLY = 0; + XLogSizeClass sizeClass; + + Assert(length >= XLogRecordMinHdrSize); + + record->xl_info = recdata->xl_info; + + hdr_size = XLogRecordHdrLen(record->xl_info); + Assert(length >= hdr_size); + + record->xl_rmid = recdata->xl_rmid; + + sizeClass = XLR_SIZECLASS(record->xl_info); + offset += XLogReadLength(&record->xl_payload_len, sizeClass, + XLS_UINT32, &recdata->xl_hdrdata[offset], length - offset); + + if (record->xl_info & XLR_HAS_RMGRINFO) + { + record->xl_rmgrinfo = recdata->xl_hdrdata[offset]; + offset += sizeof(uint8); + } + else + { + record->xl_rmgrinfo = 0; + } + + if (record->xl_info & XLR_HAS_XID) + { + memcpy(&record->xl_xid, &recdata->xl_hdrdata[offset], sizeof(TransactionId)); + offset += sizeof(TransactionId); + } + else + { + record->xl_xid = InvalidTransactionId; + } + + memcpy(&record->xl_prev, &recdata->xl_hdrdata[offset], sizeof(XLogRecPtr)); + offset += sizeof(XLogRecPtr); + + memcpy(&record->xl_crc, &recdata->xl_hdrdata[offset], sizeof(pg_crc32c)); + offset += sizeof(pg_crc32c); + + Assert(hdr_size - 2 == offset); +} + /* * Method table for resource managers. * diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index a1d0216404..bb64f89dba 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -397,7 +397,7 @@ extern bool WALRead(XLogReaderState *state, extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); extern bool DecodeXLogRecord(XLogReaderState *state, DecodedXLogRecord *decoded, - XLogRecord *record, + XLogRecHdr record, XLogRecPtr lsn, char **errormsg); @@ -405,7 +405,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, * Macros that provide access to parts of the record most recently returned by * XLogReadRecord() or XLogNextRecord(). */ -#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len) +#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_payload_len) #define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev) #define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info) #define XLogRecGetRmgrInfo(decoder) ((decoder)->record->header.xl_rmgrinfo) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 42b06f163e..af615e2866 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -38,9 +38,38 @@ typedef enum XLogSizeClass { XLS_UINT32 = 3 /* length <= UINT32_MAX; stored in uint32 (4B) */ } XLogSizeClass; +static inline int XLogSizeClassToByteLength(XLogSizeClass sz) +{ + switch (sz) { + case XLS_EMPTY: + return 0; + case XLS_UINT8: + return sizeof(uint8); + case XLS_UINT16: + return sizeof(uint16); + case XLS_UINT32: + return sizeof(uint32); + default: + pg_unreachable(); + } +} + +typedef struct XLogRecord +{ + uint8 xl_info; /* flag bits, see below */ + RmgrId xl_rmid; /* resource manager for this record */ + uint32 xl_payload_len; /* total len of entire record */ + uint8 xl_rmgrinfo; /* rmgr flag bits, see below */ + TransactionId xl_xid; /* xact id */ + XLogRecPtr xl_prev; /* ptr to previous record in log */ + pg_crc32c xl_crc; /* CRC for this record */ +} XLogRecord; + +#define SizeOfXLogRecord (offsetof(XLogRecord, xl_crc) + sizeof(pg_crc32c)) + /* * The overall layout of an XLOG record is: - * Fixed-size header (XLogRecord struct) + * Variable-size header (containing the data of the XLogRecord struct) * XLogRecordBlockHeader struct * XLogRecordBlockHeader struct * ... @@ -50,6 +79,15 @@ typedef enum XLogSizeClass { * ... * main data * + * Different xlog record headers need to store different header fields, so + * depending on flags in the xl_info field the layout may change. + * + * Note that for records with a payload larger than 0xFFFFFFF, the size field + * will be 4 bytes long, which can be larger than the minimum MAXALIGN quantum. + * To still be able to read the record correctly, we always align WAL to 8 + * bytes, so that the length bytes are always present in the first aligned + * quantum of data. + * * There can be zero or more XLogRecordBlockHeaders, and 0 or more bytes of * rmgr-specific data not associated with a block. XLogRecord structs * always start on MAXALIGN boundaries in the WAL files, but the rest of @@ -59,22 +97,25 @@ typedef enum XLogSizeClass { * XLogRecordDataHeaderLong structs all begin with a single 'id' byte. It's * used to distinguish between block references, and the main data structs. */ -typedef struct XLogRecord +typedef struct XLogRecHdrData { - uint32 xl_tot_len; /* total len of entire record */ - TransactionId xl_xid; /* xact id */ - XLogRecPtr xl_prev; /* ptr to previous record in log */ uint8 xl_info; /* flag bits, see below */ RmgrId xl_rmid; /* resource manager for this record */ - uint8 xl_rmgrinfo; /* rmgr flag bits, see below */ - /* 1 byte of padding here, initialize to zero */ - pg_crc32c xl_crc; /* CRC for this record */ - - /* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */ + char xl_hdrdata[FLEXIBLE_ARRAY_MEMBER]; /* variable length record data */ -} XLogRecord; + /* + * Payload is as follows, in order: + * XLogRecord data encoded + * xl_payload_len; 0, 1, 2 or 4 bytes + * xl_rmgrinfo; 0 or 1 byte + * xl_xid; 0 or 4 bytes + * xl_prev; 8 bytes + * xl_prev follows; 8 bytes + * record payload, if present. + */ +} XLogRecHdrData; -#define SizeOfXLogRecord (offsetof(XLogRecord, xl_crc) + sizeof(pg_crc32c)) +typedef XLogRecHdrData *XLogRecHdr; /* * XLogReader needs to allocate all the data of a WAL record in a single @@ -87,13 +128,28 @@ typedef struct XLogRecord */ #define XLogRecordMaxSize (1020 * 1024 * 1024) +#define XLR_SIZECLASS_MASK 0x03 +#define XLR_SIZECLASS(xl_info) ((XLogSizeClass) ((xl_info) & XLR_SIZECLASS_MASK)) + +/* + * There are several rmgrs which don't use info bits, so we omit that + * byte whenever possible. + */ +#define XLR_HAS_RMGRINFO 0x04 + +/* + * If a WAL record uses the current transaction ID, that will be included + * in the record header. In all other cases we omit the XID to save bytes. + */ +#define XLR_HAS_XID 0x08 + /* * If a WAL record modifies any relation files, in ways not covered by the * usual block references, this flag is set. This is not used for anything * by PostgreSQL itself, but it allows external tools that read WAL and keep * track of modified blocks to recognize such special record types. */ -#define XLR_SPECIAL_REL_UPDATE 0x01 +#define XLR_SPECIAL_REL_UPDATE 0x10 /* * Enforces consistency checks of replayed WAL at recovery. If enabled, @@ -102,7 +158,48 @@ typedef struct XLogRecord * of XLogInsert can use this value if necessary, but if * wal_consistency_checking is enabled for a rmgr this is set unconditionally. */ -#define XLR_CHECK_CONSISTENCY 0x02 +#define XLR_CHECK_CONSISTENCY (0x20) + +#define XLogRecordMaxHdrSize ( \ + sizeof(uint8) /* xl_info */ + \ + sizeof(uint8) /* xl_rmid */ + \ + sizeof(uint32) /* xl_payload_len */ + \ + sizeof(uint8) /* xl_rmgrinfo */ + \ + sizeof(TransactionId) /* xl_xid */ + \ + sizeof(XLogRecPtr) /* xl_prev */ + \ + sizeof(pg_crc32c) /* xl_crc */ \ +) + +#define XLogRecordMinHdrSize ( \ + sizeof(uint8) /* xl_info */ + \ + sizeof(uint8) /* xl_rmid */ + \ + 0 /* xl_payload_len */ + \ + 0 /* xl_rmgrinfo */ + \ + 0 /* xl_xid */ + \ + sizeof(XLogRecPtr) /* xl_prev */ + \ + sizeof(pg_crc32c) /* xl_crc */ \ +) + +static inline Size XLogRecordHdrLen(uint8 info) +{ + Size size = 0; + /* xl_info */ + size += sizeof(uint8); + /* xl_rmid */ + size += sizeof(uint8); + /* xl_payload_len */ + size += XLogSizeClassToByteLength(XLR_SIZECLASS(info)); + /* xl_rmgrinfo */ + size += ((info) & XLR_HAS_RMGRINFO) ? sizeof(uint8) : 0; + /* xl_xid */ + size += ((info) & XLR_HAS_XID) ? sizeof(TransactionId) : 0; + /* xl_prev */ + size += sizeof(XLogRecPtr); + /* xl_crc */ + size += sizeof(pg_crc32c); + + return size; +} /* * Header info for block data appended to an XLOG record. -- 2.40.1