diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a39a98f..887946c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -60,6 +60,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" +#include "storage/pmem.h" #include "storage/pmsignal.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -90,6 +91,7 @@ static int recvFile = -1; static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; static uint32 recvOff = 0; +void *mappedFileAddr = NULL; /* * Flags set by interrupt handlers of walreceiver for later service in the @@ -604,12 +606,12 @@ WalReceiverMain(void) * End of WAL reached on the requested timeline. Close the last * segment, and await for new orders from the startup process. */ - if (recvFile >= 0) + if (recvFile >= 0 || mappedFileAddr != NULL) { char xlogfname[MAXFNAMELEN]; XLogWalRcvFlush(false); - if (close(recvFile) != 0) + if (do_XLogFileClose(recvFile, mappedFileAddr) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -626,6 +628,7 @@ WalReceiverMain(void) XLogArchiveNotify(xlogfname); } recvFile = -1; + mappedFileAddr = NULL; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); WalRcvWaitForStartPosition(&startpoint, &startpointTLI); @@ -949,7 +952,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + if ((recvFile < 0 && mappedFileAddr == NULL) || + !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) { bool use_existent; @@ -957,7 +961,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * fsync() and close current file before we switch to next one. We * would otherwise have to reopen this file to fsync it later */ - if (recvFile >= 0) + if (recvFile >= 0 || mappedFileAddr != NULL) { char xlogfname[MAXFNAMELEN]; @@ -968,7 +972,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * process soon, so we don't advise the OS to release cache * pages associated with the file like XLogFileClose() does. */ - if (close(recvFile) != 0) + if (do_XLogFileClose(recvFile, mappedFileAddr) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -985,11 +989,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) XLogArchiveNotify(xlogfname); } recvFile = -1; + mappedFileAddr = NULL; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); use_existent = true; - recvFile = XLogFileInit(recvSegNo, &use_existent, true); + recvFile = XLogFileInit(recvSegNo, &use_existent, true, &mappedFileAddr); recvFileTLI = ThisTimeLineID; recvOff = 0; } @@ -1005,30 +1010,39 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* Need to seek in the file? */ if (recvOff != startoff) { - if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - startoff))); + if (!mappedFileAddr) + if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log segment %s to offset %u: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + startoff))); recvOff = startoff; } - /* OK to write the logs */ - errno = 0; - - byteswritten = write(recvFile, buf, segbytes); - if (byteswritten <= 0) + if (mappedFileAddr) { - /* if write didn't set errno, assume no disk space */ - if (errno == 0) - errno = ENOSPC; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log segment %s " - "at offset %u, length %lu: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - recvOff, (unsigned long) segbytes))); + PmemFileWrite((char *)mappedFileAddr+startoff, buf, segbytes); + byteswritten = segbytes; + } + else + { + /* OK to write the logs */ + errno = 0; + + byteswritten = write(recvFile, buf, segbytes); + if (byteswritten <= 0) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log segment %s " + "at offset %u, length %lu: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + recvOff, (unsigned long) segbytes))); + } } /* Update state for write */