diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a1256a1..520bcd2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -644,6 +644,9 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ + XLogRecPtr *CrossBoundaryEndRecPtrs; + XLogSegNo latestArchiveNotifiedSegNo; + /* * Shared copy of ThisTimeLineID. Does not change after end-of-recovery. * If we created a new timeline when the system was started up, @@ -968,6 +971,11 @@ static void WALInsertLockAcquireExclusive(void); static void WALInsertLockRelease(void); static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); +static void XLogMarkEndRecPtrIfNeeded(XLogRecPtr start, XLogRecPtr end); +static void XLogArchiveNotifySegmentsInPrimary(void); +static void XLogArchiveNotifySegInStandby(XLogSegNo segno); +static void XLogArchiveNotifySegmentsInStandby(XLogRecPtr start, XLogRecPtr end); + /* * Insert an XLOG record represented by an already-constructed chain of data * chunks. This is a low-level routine; to construct the WAL record header @@ -1641,6 +1649,9 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, CurrPos = MAXALIGN64(CurrPos); } + if (XLogArchivingActive()) + XLogMarkEndRecPtrIfNeeded(StartPos, EndPos); + if (CurrPos != EndPos) elog(PANIC, "space reserved for WAL record does not match what was written"); } @@ -2567,11 +2578,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * later. Doing it here ensures that one and only one backend will * perform this fsync. * - * This is also the right place to notify the Archiver that the - * segment is ready to copy to archival storage, and to update the - * timer for archive_timeout, and to signal for a checkpoint if - * too many logfile segments have been used since the last - * checkpoint. + * This is also the right place to update the timer for + * archive_timeout, and to signal for a checkpoint if too many + * logfile segments have been used since the last checkpoint. */ if (finishing_seg) { @@ -2583,7 +2592,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogSegNo); + XLogArchiveNotifySegmentsInPrimary(); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; @@ -2653,6 +2662,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) WalSndWakeupRequest(); LogwrtResult.Flush = LogwrtResult.Write; + + if (XLogArchivingActive()) + XLogArchiveNotifySegmentsInPrimary(); } /* @@ -2673,6 +2685,89 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) } } +static void +XLogMarkEndRecPtrIfNeeded(XLogRecPtr start, XLogRecPtr end) +{ + XLogSegNo start_segno; + XLogSegNo end_segno; + + XLByteToSeg(start, start_segno, wal_segment_size); + XLByteToSeg(end, end_segno, wal_segment_size); + if (start_segno != end_segno) + XLogCtl->CrossBoundaryEndRecPtrs[XLogRecPtrToBufIdx(end)] = end; +} + +static void +XLogArchiveNotifySegmentsInPrimary(void) +{ + XLogSegNo target_segno; + XLogSegNo flushing_segno; + XLogSegNo latest_target_segno; + int idx; + + idx = XLogRecPtrToBufIdx(LogwrtResult.Flush); + + /* Is WAL record crossing from previous segment flushed completely? */ + if (XLogRecPtrIsInvalid(XLogCtl->CrossBoundaryEndRecPtrs[idx]) || + LogwrtResult.Flush < XLogCtl->CrossBoundaryEndRecPtrs[idx]) + return; /* No, the WAL record is not flushed completely. */ + + /* OK. Notification against previous segment is safe. */ + XLByteToSeg(LogwrtResult.Flush, flushing_segno, wal_segment_size); + latest_target_segno = flushing_segno - 1; + + /* These segments have to be notified. */ + for (target_segno = XLogCtl->latestArchiveNotifiedSegNo + 1; + target_segno <= latest_target_segno; ++target_segno) + XLogArchiveNotifySeg(target_segno); + + XLogCtl->CrossBoundaryEndRecPtrs[idx] = InvalidXLogRecPtr; + XLogCtl->latestArchiveNotifiedSegNo = latest_target_segno; +} + +static void +XLogArchiveNotifySegInStandby(XLogSegNo segno) +{ + char xlogfname[MAXFNAMELEN]; + char xlogPath[MAXPGPATH]; + struct stat stat_buf; + + XLogFileName(xlogfname, ThisTimeLineID, segno, wal_segment_size); + snprintf(xlogPath, MAXPGPATH, XLOGDIR "/%s", xlogfname); + + /* + * In case of switching TLI, the last segment file in previous TLI + * may have been deleted because it is not needed. (Same segment + * file in new TLI is read, but one in previous TLI is not read.) + */ + if (stat(xlogPath, &stat_buf) != 0) + return; + + if (XLogArchiveIsReadyOrDone(xlogfname)) + return; + + if (!XLogArchivingAlways()) + XLogArchiveForceDone(xlogfname); + else + XLogArchiveNotify(xlogfname); +} + +static void +XLogArchiveNotifySegmentsInStandby(XLogRecPtr start, XLogRecPtr end) +{ + XLogSegNo target_segno; + XLogSegNo start_segno; + XLogSegNo end_segno; + + XLByteToSeg(start, start_segno, wal_segment_size); + XLByteToSeg(end, end_segno, wal_segment_size); + + for (target_segno = start_segno; target_segno < end_segno; ++target_segno) + XLogArchiveNotifySegInStandby(target_segno); + + XLogCtl->latestArchiveNotifiedSegNo = target_segno; +} + /* * Record the LSN for an asynchronous transaction commit/abort * and nudge the WALWriter if there is work for it to do. @@ -5062,6 +5157,8 @@ XLOGShmemSize(void) /* WAL insertion locks, plus alignment */ size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1)); + /* CrossBoundaryEndRecPtrs array */ + size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); /* xlblocks array */ size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ @@ -5145,6 +5242,9 @@ XLOGShmemInit(void) memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); allocptr += sizeof(XLogRecPtr) * XLOGbuffers; + XLogCtl->CrossBoundaryEndRecPtrs = (XLogRecPtr *) allocptr; + memset(XLogCtl->CrossBoundaryEndRecPtrs, 0, sizeof(XLogRecPtr) * XLOGbuffers); + allocptr += sizeof(XLogRecPtr) * XLOGbuffers; /* WAL insertion locks. Ensure they're aligned to the full padded size */ allocptr += sizeof(WALInsertLockPadded) - @@ -7296,6 +7396,18 @@ StartupXLOG(void) /* Check that it's OK to switch to this TLI */ checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI); + /* + * The last segment in current TLI has to be notified + * before TLI switching. + */ + if (StandbyMode) + { + XLogSegNo lastSegNo; + + XLByteToPrevSeg(ReadRecPtr, lastSegNo, wal_segment_size); + XLogArchiveNotifySegInStandby(lastSegNo); + } + /* Following WAL records should be run with new TLI */ ThisTimeLineID = newTLI; switchedTLI = true; @@ -7303,6 +7415,13 @@ StartupXLOG(void) } /* + * If the record is a cross-segment-boundary, we can notify to + * archive it. + */ + if (StandbyMode) + XLogArchiveNotifySegmentsInStandby(ReadRecPtr, EndRecPtr); + + /* * Update shared replayEndRecPtr before replaying this record, * so that XLogFlush will update minRecoveryPoint correctly. */ @@ -7703,6 +7822,13 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; + if (XLogArchivingActive()) + { + XLogSegNo endLogSegNo; + XLByteToSeg(EndOfLog, endLogSegNo, wal_segment_size); + XLogCtl->latestArchiveNotifiedSegNo = endLogSegNo - 1; + } + /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE * record before resource manager writes cleanup WAL records or checkpoint diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index d1ad75d..cb6b3bf 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -621,15 +621,6 @@ WalReceiverMain(void) (errcode_for_file_access(), errmsg("could not close log segment %s: %m", xlogfname))); - - /* - * Create .done file forcibly to prevent the streamed segment from - * being archived later. - */ - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else - XLogArchiveNotify(xlogfname); } recvFile = -1; @@ -928,15 +919,6 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) (errcode_for_file_access(), errmsg("could not close log segment %s: %m", xlogfname))); - - /* - * Create .done file forcibly to prevent the streamed segment - * from being archived later. - */ - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else - XLogArchiveNotify(xlogfname); } recvFile = -1;