diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e729180..d233ead 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -516,24 +516,19 @@ static SessionBackupState sessionBackupState = SESSION_BACKUP_NONE; */ typedef struct XLogCtlInsert { - slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */ - /* * CurrBytePos is the end of reserved WAL. The next record will be - * inserted at that position. PrevBytePos is the start position of the - * previously inserted (or rather, reserved) record - it is copied to the - * prev-link of the next record. These are stored as "usable byte + * inserted at that position.This is stored as "usable byte * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()). */ - uint64 CurrBytePos; - uint64 PrevBytePos; + pg_atomic_uint64 CurrBytePos; /* * Make sure the above heavily-contended spinlock and byte positions are * on their own cache line. In particular, the RedoRecPtr and full page * write variables below should be on a different cache line. They are * read on every WAL insertion, but updated rarely, and we don't want - * those reads to steal the cache line containing Curr/PrevBytePos. + * those reads to steal the cache line containing CurrBytePos. */ char pad[PG_CACHE_LINE_SIZE]; @@ -869,6 +864,7 @@ static bool XLogCheckpointNeeded(XLogSegNo new_segno); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool find_free, XLogSegNo max_segno, + bool avoid_conflicting_walid, bool use_lock); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notfoundOk); @@ -915,9 +911,8 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos); static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, - XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); -static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, - XLogRecPtr *PrevPtr); + XLogRecPtr *EndPos); +static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos); static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); static char *GetXLogBuffer(XLogRecPtr ptr); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); @@ -946,7 +941,7 @@ static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); * XLogSetRecordFlags() for details. * * The first XLogRecData in the chain must be for the record header, and its - * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and + * data must be MAXALIGNed. XLogInsertRecord fills in the xl_walid and * xl_crc fields in the header, the rest of the header must already be filled * by the caller. * @@ -986,7 +981,7 @@ XLogInsertRecord(XLogRecData *rdata, * * 1. Reserve the right amount of space from the WAL. The current head of * reserved space is kept in Insert->CurrBytePos, and is protected by - * insertpos_lck. + * atomic operations. * * 2. Copy the record to the reserved WAL space. This involves finding the * correct WAL buffer containing the reserved space, and copying the @@ -1047,22 +1042,23 @@ XLogInsertRecord(XLogRecData *rdata, } /* - * Reserve space for the record in the WAL. This also sets the xl_prev - * pointer. + * Reserve space for the record in the WAL. */ if (isLogSwitch) - inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); + inserted = ReserveXLogSwitch(&StartPos, &EndPos); else { - ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, - &rechdr->xl_prev); + ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos); inserted = true; } + /* Store the WAL segment identifier in the record. */ + rechdr->xl_walid = XLByteToSegID(StartPos, wal_segment_size); + if (inserted) { /* - * Now that xl_prev has been filled in, calculate CRC of the record + * Now that xl_walid has been filled in, calculate CRC of the record * header. */ rdata_crc = rechdr->xl_crc; @@ -1213,25 +1209,21 @@ XLogInsertRecord(XLogRecData *rdata, /* * Reserves the right amount of space for a record of given size from the WAL. * *StartPos is set to the beginning of the reserved section, *EndPos to - * its end+1. *PrevPtr is set to the beginning of the previous record; it is - * used to set the xl_prev of this record. + * its end+1. * * This is the performance critical part of XLogInsert that must be serialized * across backends. The rest can happen mostly in parallel. Try to keep this - * section as short as possible, insertpos_lck can be heavily contended on a - * busy system. + * section as short as possible. * * NB: The space calculation here must match the code in CopyXLogRecordToWAL, * where we actually copy the record to the reserved space. */ static void -ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, - XLogRecPtr *PrevPtr) +ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos) { XLogCtlInsert *Insert = &XLogCtl->Insert; uint64 startbytepos; uint64 endbytepos; - uint64 prevbytepos; size = MAXALIGN(size); @@ -1248,19 +1240,11 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, * because the usable byte position doesn't include any headers, reserving * X bytes from WAL is almost as simple as "CurrBytePos += X". */ - SpinLockAcquire(&Insert->insertpos_lck); - - startbytepos = Insert->CurrBytePos; + startbytepos = pg_atomic_fetch_add_u64(&Insert->CurrBytePos, size); endbytepos = startbytepos + size; - prevbytepos = Insert->PrevBytePos; - Insert->CurrBytePos = endbytepos; - Insert->PrevBytePos = startbytepos; - - SpinLockRelease(&Insert->insertpos_lck); *StartPos = XLogBytePosToRecPtr(startbytepos); *EndPos = XLogBytePosToEndRecPtr(endbytepos); - *PrevPtr = XLogBytePosToRecPtr(prevbytepos); /* * Check that the conversions between "usable byte positions" and @@ -1268,7 +1252,6 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, */ Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos); Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos); - Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos); } /* @@ -1281,36 +1264,32 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, * reserving any space, and the function returns false. */ static bool -ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) +ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos) { XLogCtlInsert *Insert = &XLogCtl->Insert; uint64 startbytepos; uint64 endbytepos; - uint64 prevbytepos; uint32 size = MAXALIGN(SizeOfXLogRecord); XLogRecPtr ptr; uint32 segleft; + uint32 initial_seg = 0; + uint64 compare_startbytepos; - /* - * These calculations are a bit heavy-weight to be done while holding a - * spinlock, but since we're holding all the WAL insertion locks, there - * are no other inserters competing for it. GetXLogInsertRecPtr() does - * compete for it, but that's not called very frequently. - */ - SpinLockAcquire(&Insert->insertpos_lck); - - startbytepos = Insert->CurrBytePos; - +start: + startbytepos = pg_atomic_read_u64(&Insert->CurrBytePos); ptr = XLogBytePosToEndRecPtr(startbytepos); if (XLogSegmentOffset(ptr, wal_segment_size) == 0) { - SpinLockRelease(&Insert->insertpos_lck); *EndPos = *StartPos = ptr; return false; } + /* Save the current WAL segment number */ + if (initial_seg == 0) + XLByteToSeg(XLogBytePosToEndRecPtr(startbytepos), initial_seg, + wal_segment_size); + endbytepos = startbytepos + size; - prevbytepos = Insert->PrevBytePos; *StartPos = XLogBytePosToRecPtr(startbytepos); *EndPos = XLogBytePosToEndRecPtr(endbytepos); @@ -1322,17 +1301,29 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) *EndPos += segleft; endbytepos = XLogRecPtrToBytePos(*EndPos); } - Insert->CurrBytePos = endbytepos; - Insert->PrevBytePos = startbytepos; - SpinLockRelease(&Insert->insertpos_lck); - - *PrevPtr = XLogBytePosToRecPtr(prevbytepos); + /* + * Atomically check if the WAL insert position is still the same as we had + * fetched above. If it's the same, then we just move the pointer to the + * end of current segment and we are done. If it has changed, then we must + * repeat the entire exercise. But if we have already switched to the next + * segment then no work is needed. + */ + compare_startbytepos = startbytepos; + if (!pg_atomic_compare_exchange_u64(&Insert->CurrBytePos, + &compare_startbytepos, + endbytepos)) + { + uint32 new_seg; + XLByteToSeg(XLogBytePosToEndRecPtr(compare_startbytepos), new_seg, + wal_segment_size); + if (new_seg == initial_seg) + goto start; + } Assert(XLogSegmentOffset(*EndPos, wal_segment_size) == 0); Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos); Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos); - Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos); return true; } @@ -1719,9 +1710,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) elog(PANIC, "cannot wait without a PGPROC structure"); /* Read the current insert position */ - SpinLockAcquire(&Insert->insertpos_lck); - bytepos = Insert->CurrBytePos; - SpinLockRelease(&Insert->insertpos_lck); + bytepos = pg_atomic_read_u64(&Insert->CurrBytePos); reservedUpto = XLogBytePosToEndRecPtr(bytepos); /* @@ -3281,7 +3270,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) */ max_segno = logsegno + CheckPointSegments; if (!InstallXLogFileSegment(&installed_segno, tmppath, - *use_existent, max_segno, + *use_existent, max_segno, false, use_lock)) { /* @@ -3430,7 +3419,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, /* * Now move the segment into place with its final name. */ - if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, false)) + if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, false, false)) elog(ERROR, "InstallXLogFileSegment should not have failed"); } @@ -3454,6 +3443,10 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, * free slot is found between *segno and max_segno. (Ignored when find_free * is false.) * + * avoid_conflicting_walid: if true, ensure that the old and the new recycled + * segment do not have matching 2 byte walid. (16 low order bits of segno.) + * This is only used when find_free is true. + * * use_lock: if true, acquire ControlFileLock while moving file into * place. This should be true except during bootstrap log creation. The * caller must *not* hold the lock at call. @@ -3465,6 +3458,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool find_free, XLogSegNo max_segno, + bool avoid_conflicting_walid, bool use_lock) { char path[MAXPGPATH]; @@ -3485,6 +3479,11 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, } else { + TimeLineID tmptli; + XLogSegNo tmpsegno; + + XLogFromFileName(tmppath, &tmptli, &tmpsegno, wal_segment_size); + /* Find a free slot to put it in */ while (stat(path, &stat_buf) == 0) { @@ -3496,6 +3495,16 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, return false; } (*segno)++; + + /* + * If avoid_conflicting_walid is true, then we must not recycle the + * WAL files so that the old and the recycled WAL segnos have the + * same lower order 16-bits. + */ + if (avoid_conflicting_walid && + XLogSegNoToSegID(tmpsegno) == XLogSegNoToSegID(*segno)) + (*segno)++; + XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size); } } @@ -3994,7 +4003,7 @@ RemoveXlogFile(const char *segname, XLogRecPtr PriorRedoPtr, XLogRecPtr endptr) if (endlogSegNo <= recycleSegNo && lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && InstallXLogFileSegment(&endlogSegNo, path, - true, recycleSegNo, true)) + true, recycleSegNo, true, true)) { ereport(DEBUG2, (errmsg("recycled write-ahead log file \"%s\"", @@ -4989,9 +4998,9 @@ XLOGShmemInit(void) XLogCtl->SharedHotStandbyActive = false; XLogCtl->WalWriterSleeping = false; - SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->ulsn_lck); + pg_atomic_init_u64(&XLogCtl->Insert.CurrBytePos, 0); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); } @@ -5097,7 +5106,7 @@ BootStrapXLOG(void) /* Insert the initial checkpoint record */ recptr = ((char *) page + SizeOfXLogLongPHD); record = (XLogRecord *) recptr; - record->xl_prev = 0; + record->xl_walid = 1; record->xl_xid = InvalidTransactionId; record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; @@ -7516,8 +7525,7 @@ StartupXLOG(void) * previous incarnation. */ Insert = &XLogCtl->Insert; - Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec); - Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); + pg_atomic_write_u64(&Insert->CurrBytePos, XLogRecPtrToBytePos(EndOfLog)); /* * Tricky point here: readBuf contains the *last* block that the LastRec @@ -8664,7 +8672,7 @@ CreateCheckPoint(int flags) * determine the checkpoint REDO pointer. */ WALInsertLockAcquireExclusive(); - curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos); + curInsert = XLogBytePosToRecPtr(pg_atomic_read_u64(&Insert->CurrBytePos)); /* * If this isn't a shutdown or forced checkpoint, and if there has been no @@ -11149,9 +11157,7 @@ GetXLogInsertRecPtr(void) XLogCtlInsert *Insert = &XLogCtl->Insert; uint64 current_bytepos; - SpinLockAcquire(&Insert->insertpos_lck); - current_bytepos = Insert->CurrBytePos; - SpinLockRelease(&Insert->insertpos_lck); + current_bytepos = pg_atomic_read_u64(&Insert->CurrBytePos); return XLogBytePosToRecPtr(current_bytepos); } diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 2a41667..5154a56 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -471,7 +471,7 @@ XLogInsert(RmgrId rmid, uint8 info) * Assemble a WAL record from the registered data and buffers into an * XLogRecData chain, ready for insertion with XLogInsertRecord(). * - * The record header fields are filled in, except for the xl_prev field. The + * The record header fields are filled in, except for the xl_walid field. The * calculated CRC does not include the record header yet. * * If there are any registered buffers, and a full-page image was not taken @@ -780,7 +780,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, COMP_CRC32C(rdata_crc, rdt->data, rdt->len); /* - * Fill in the fields in the record header. Prev-link is filled in later, + * Fill in the fields in the record header. xl_walid is filled in later, * once we know where in the WAL the record will be inserted. The CRC does * not include the record header yet. */ @@ -788,7 +788,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rechdr->xl_tot_len = total_len; rechdr->xl_info = info; rechdr->xl_rmid = rmid; - rechdr->xl_prev = InvalidXLogRecPtr; + rechdr->xl_walid = 0; rechdr->xl_crc = rdata_crc; return &hdr_rdt; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index aeaafed..ac40357 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -637,38 +637,30 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) RecPtr); return false; } - if (randAccess) - { - /* - * We can't exactly verify the prev-link, but surely it should be less - * than the record's own address. - */ - if (!(record->xl_prev < RecPtr)) - { - report_invalid_record(state, - "record with incorrect prev-link %X/%X at %X/%X", - (uint32) (record->xl_prev >> 32), - (uint32) record->xl_prev, - (uint32) (RecPtr >> 32), (uint32) RecPtr); - return false; - } - } - else + + /* + * Previously, we guarded against stale but valid-looking WAL records that + * start on sector boundaries by storing a pointer to the previous WAL + * record (xl_prev). xl_prev was 8 bytes. Storing the whole pointer is + * unnecessary, since we only need to store enough information to prevent + * reading stale WAL records, so we do that now by storing only a 2 byte + * value corresponding to the number of the WAL file. Even though 2 byte + * value cycles every 32768 values, this is still enough as long as we + * ensure that the old segment and the new segment do not share the same + * value and thus we can distinguish between the WAL records belonging to + * the old segment and the new segment. + * + * Reducing the value to 2 bytes means we save 8 bytes per WAL + * record because of alignment issues. + */ + if (XLByteToSegID(RecPtr, state->wal_segment_size) != record->xl_walid) { - /* - * Record's prev-link should exactly match our previous location. This - * check guards against torn WAL pages where a stale but valid-looking - * WAL record starts on a sector boundary. - */ - if (record->xl_prev != PrevRecPtr) - { - report_invalid_record(state, - "record with incorrect prev-link %X/%X at %X/%X", - (uint32) (record->xl_prev >> 32), - (uint32) record->xl_prev, - (uint32) (RecPtr >> 32), (uint32) RecPtr); - return false; - } + report_invalid_record(state, + "record with incorrect walid %u at %X/%X, expected walid %u", + record->xl_walid, + (uint32) (RecPtr >> 32), (uint32) RecPtr, + XLByteToSegID(RecPtr, state->wal_segment_size)); + return false; } return true; diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 9f93385..af0cab5 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -1176,7 +1176,7 @@ WriteEmptyXLOG(void) /* Insert the initial checkpoint record */ recptr = (char *) page + SizeOfXLogLongPHD; record = (XLogRecord *) recptr; - record->xl_prev = 0; + record->xl_walid = newXlogSegNo & XLogSegNoLowOrderMask; record->xl_xid = InvalidTransactionId; record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 0fc71d2..4349728 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -164,6 +164,9 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogReaderState *xlogreader; char *errormsg; XLogPageReadPrivate private; + bool first_call; + XLogSegNo current_segno; + bool checkpoint_found = false; /* * The given fork pointer points to the end of the last common record, @@ -186,21 +189,46 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, if (xlogreader == NULL) pg_fatal("out of memory\n"); - searchptr = forkptr; + /* + * Previously, we used to store pointer to the previous WAL record. But + * that has been removed in PG11. So we must work harder to find the + * previous checkpoint record. We start from the start of the WAL segment + * and read forward until we either we reach "forkptr" or end of the WAL + * segment. If the checkpoint record is found in the same segment as + * "forkptr", then we are done. Otherwise we must back off and search in + * the previous segment. + */ + XLByteToSeg(forkptr, current_segno, WalSegSz); + + first_call = true; + for (;;) { uint8 info; - record = XLogReadRecord(xlogreader, searchptr, &errormsg); + /* + * If this is the first call for the current WAL segment, we must use + * XLogFindNextRecord to get the first valid WAL record in this + * segment. Otherwise we continue to fetch the next WAL record. + */ + if (first_call) + { + XLogSegNoOffsetToRecPtr(current_segno, 0, searchptr, WalSegSz); + searchptr = XLogFindNextRecord(xlogreader, searchptr); + record = XLogReadRecord(xlogreader, searchptr, &errormsg); + first_call = false; + } + else + record = XLogReadRecord(xlogreader, InvalidXLogRecPtr, &errormsg); if (record == NULL) { if (errormsg) - pg_fatal("could not find previous WAL record at %X/%X: %s\n", + pg_fatal("could not find WAL record at %X/%X: %s\n", (uint32) (searchptr >> 32), (uint32) (searchptr), errormsg); else - pg_fatal("could not find previous WAL record at %X/%X\n", + pg_fatal("could not find WAL record at %X/%X\n", (uint32) (searchptr >> 32), (uint32) (searchptr)); } @@ -210,7 +238,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, * where the master has been stopped to be rewinded. */ info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; - if (searchptr < forkptr && + if (xlogreader->ReadRecPtr < forkptr && XLogRecGetRmid(xlogreader) == RM_XLOG_ID && (info == XLOG_CHECKPOINT_SHUTDOWN || info == XLOG_CHECKPOINT_ONLINE)) @@ -218,14 +246,41 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, CheckPoint checkPoint; memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); - *lastchkptrec = searchptr; + *lastchkptrec = xlogreader->ReadRecPtr; *lastchkpttli = checkPoint.ThisTimeLineID; *lastchkptredo = checkPoint.redo; - break; + checkpoint_found = true; + + /* + * Continue to search since there might be another checkpoint + * record in the WAL segment. + */ } - /* Walk backwards to previous record. */ - searchptr = record->xl_prev; + /* + * If we've reached the forkptr without seeing a checkpoint then we + * must back off and search in the previous WAL segment. Similarly, + * while searching in any previous WAL segment, if we reach the end of + * the segment, we must continue in the segment prior to that. + */ + if (xlogreader->ReadRecPtr >= forkptr || + !XLByteInSeg(xlogreader->ReadRecPtr, current_segno, WalSegSz)) + { + if (checkpoint_found) + break; + else + current_segno = current_segno - 1; + + /* + * We must always find a checkpoint record. + */ + Assert(current_segno > 0); + + /* + * Must find the first valid WAL record in the previous segment. + */ + first_call = true; + } } XLogReaderFree(xlogreader); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 6443eda..489c103 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -536,7 +536,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) BlockNumber blk; int block_id; uint8 info = XLogRecGetInfo(record); - XLogRecPtr xl_prev = XLogRecGetPrev(record); + uint16 xl_walid = XLogRecGetWALId(record); XLogDumpRecordLen(record, &rec_len, &fpi_len); @@ -544,12 +544,12 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) if (id == NULL) id = psprintf("UNKNOWN (%x)", info & ~XLR_INFO_MASK); - printf("rmgr: %-11s len (rec/tot): %6u/%6u, tx: %10u, lsn: %X/%08X, prev %X/%08X, ", + printf("rmgr: %-11s len (rec/tot): %6u/%6u, tx: %10u, lsn: %X/%08X, prev %u, ", desc->rm_name, rec_len, XLogRecGetTotalLen(record), XLogRecGetXid(record), (uint32) (record->ReadRecPtr >> 32), (uint32) record->ReadRecPtr, - (uint32) (xl_prev >> 32), (uint32) xl_prev); + xl_walid); printf("desc: %s ", id); /* the desc routine will printf the description directly to stdout */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 7805c3c..8cdc51c 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -122,6 +122,18 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader; logSegNo = ((xlrp) - 1) / (wal_segsz_bytes) /* + * Get the WAL segment identifier given the XLogRecPtr. + * + * The identifier is just the lower order 16-bits of the WAL segno. So the + * value will be repeated after 2^16 WAL segments. But we use this identifier + * just to detect invalid WAL records and hence can live with that limitation. + */ +#define XLogSegNoLowOrderMask 0xFFFF +#define XLogSegNoToSegID(segno) ((uint16) ((segno) & XLogSegNoLowOrderMask)) +#define XLByteToSegID(xlrp, wal_segsz_bytes) \ + (XLogSegNoToSegID((xlrp) / (wal_segsz_bytes))) + +/* * Is an XLogRecPtr within a particular XLOG segment? * * For XLByteInSeg, do the computation at face value. For XLByteInPrevSeg, diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 3a9ebd4..3f32961 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -218,7 +218,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errmsg); #define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len) -#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev) +#define XLogRecGetWALId(decoder) ((decoder)->decoded_record->xl_walid) #define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index b53960e..a8ade92 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -42,7 +42,8 @@ typedef struct XLogRecord { uint32 xl_tot_len; /* total len of entire record */ TransactionId xl_xid; /* xact id */ - XLogRecPtr xl_prev; /* ptr to previous record in log */ + uint16 xl_walid; /* lowest 16 bits of the WAL file to which this + record belongs */ uint8 xl_info; /* flag bits, see below */ RmgrId xl_rmid; /* resource manager for this record */ /* 2 bytes of padding here, initialize to zero */