*** xlog.org.c Sat Jan 1 06:59:30 2005 --- xlog.c Tue Feb 1 18:28:16 2005 *************** *** 43,48 **** --- 43,82 ---- #include "utils/guc.h" #include "utils/relcache.h" + /*-------------------------------------------------------------------------*/ + + #ifdef O_DIRECT + # define OPEN_DIRECT_FLAG O_DIRECT + #endif + + #define XLOG_MULTIPAGE_WRITER_DEBUG + + #define ISSUE_BOOTSTRAP_MEMORYLEAK /* XXX: memory leak? */ + + /*-------------------------------------------------------------------------*/ + + /* O_DIRECT : BEGIN */ + + /* TODO: Aligment depends on OS and filesystem. */ + #define O_DIRECT_BUFFER_ALIGN 4096 + + /* assume sizeof(ptrdiff_t) == sizeof(void*) */ + #define POINTERALIGN(ALIGNVAL,PTR) \ + (((ptrdiff_t) (PTR) + (ALIGNVAL-1)) & ~((ptrdiff_t) (ALIGNVAL-1))) + + #ifdef XLOG_MULTIPAGE_WRITER_DEBUG + static char STATIC_ASSERT_POINTERSIZE[((int)(sizeof(ptrdiff_t) == sizeof(void*))) - 1]; + #endif + + #ifdef OPEN_DIRECT_FLAG + # define XLOG_EXTRA_BUFFERS O_DIRECT_BUFFER_ALIGN + # define XLOG_BUFFERS_ALIGN(PTR) POINTERALIGN(XLOG_EXTRA_BUFFERS, (PTR)) + #else + # define XLOG_EXTRA_BUFFERS 0 + # define XLOG_BUFFERS_ALIGN(PTR) POINTERALIGN(MAXIMUM_ALIGNOF, (PTR)) + #endif + + /* O_DIRECT : END */ /* * This chunk of hackery attempts to determine which file sync methods *************** *** 465,470 **** --- 499,583 ---- static bool read_backup_label(XLogRecPtr *checkPointLoc); static void remove_backup_label(void); + /* BEGIN : XLOG_MULTIPAGE_WRITER */ + + static struct XLogMultipageData + { + char *pages; /* Head of first page */ + int size; /* Total bytes of pages == count(pages) * BLCKSZ */ + int offset; /* Offset in xlog segment file */ + } XLogMultipage; + + static void XLogMultipageFlush(int index) + { + if (!XLogMultipage.pages) + { /* No needs to write pages. */ + XLogCtl->Write.curridx = index; + return; + } + + #ifdef XLOG_MULTIPAGE_WRITER_DEBUG + { + int i = (XLogCtl->Write.curridx + XLogMultipage.size / BLCKSZ + XLOGbuffers - index) % XLOGbuffers; + if (i != 0 && i != 1) + elog(PANIC, "XLogMultipageFlush (%d)", __LINE__); + } + #endif + + /* Need to seek in the file? */ + if (openLogOff != XLogMultipage.offset) + { + openLogOff = XLogMultipage.offset; + if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, segment %u to offset %u: %m", + openLogId, openLogSeg, openLogOff))); + } + + /* OK to write the page */ + errno = 0; + if (write(openLogFile, XLogMultipage.pages, XLogMultipage.size) != XLogMultipage.size) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log file %u, segment %u at offset %u: %m", + openLogId, openLogSeg, openLogOff))); + } + + #ifdef XLOG_MULTIPAGE_WRITER_DEBUG + elog(LOG, "XLogMultipageFlush writes %d pages.", XLogMultipage.size / BLCKSZ); + #endif + + openLogOff += XLogMultipage.size; + XLogCtl->Write.curridx = index; + memset(&XLogMultipage, 0, sizeof(XLogMultipage)); + } + + static void XLogMultipageWrite(int index) + { + char *page = XLogCtl->pages + index * BLCKSZ; + int size = BLCKSZ; + int offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; + + if (XLogMultipage.pages + XLogMultipage.size == page + && XLogMultipage.offset + XLogMultipage.size == offset) + { /* Pages are continuous. Append new page. */ + XLogMultipage.size += size; + } + else + { /* Pages are not continuous. Flush and clear. */ + XLogMultipageFlush(index); + XLogMultipage.pages = page; + XLogMultipage.size = size; + XLogMultipage.offset = offset; + } + } + + /* END : XLOG_MULTIPAGE_WRITER */ /* * Insert an XLOG record having the specified RMID and info bytes, *************** *** 1139,1147 **** XLogWrite(XLogwrtRqst WriteRqst) { XLogCtlWrite *Write = &XLogCtl->Write; - char *from; bool ispartialpage; bool use_existent; /* * Update local LogwrtResult (caller probably did this already, --- 1252,1265 ---- XLogWrite(XLogwrtRqst WriteRqst) { XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; bool use_existent; + int currentIndex = Write->curridx; + + #ifdef XLOG_MULTIPAGE_WRITER_DEBUG + if (XLogMultipage.pages) + elog(PANIC, "XLogMultipage.pages not null (%d) : size=%d", __LINE__, XLogMultipage.size); + #endif /* * Update local LogwrtResult (caller probably did this already, *************** *** 1157,1170 **** * end of the last page that's been initialized by * AdvanceXLInsertBuffer. */ ! if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, ! XLogCtl->xlblocks[Write->curridx].xlogid, ! XLogCtl->xlblocks[Write->curridx].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ ! LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) --- 1275,1288 ---- * end of the last page that's been initialized by * AdvanceXLInsertBuffer. */ ! if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, ! XLogCtl->xlblocks[currentIndex].xlogid, ! XLogCtl->xlblocks[currentIndex].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ ! LogwrtResult.Write = XLogCtl->xlblocks[currentIndex]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) *************** *** 1172,1177 **** --- 1290,1296 ---- /* * Switch to new logfile segment. */ + XLogMultipageFlush(currentIndex); if (openLogFile >= 0) { if (close(openLogFile)) *************** *** 1242,1275 **** { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg); - openLogOff = 0; } ! /* Need to seek in the file? */ ! if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize) ! { ! openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; ! if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not seek in log file %u, segment %u to offset %u: %m", ! openLogId, openLogSeg, openLogOff))); ! } ! ! /* OK to write the page */ ! from = XLogCtl->pages + Write->curridx * BLCKSZ; ! errno = 0; ! if (write(openLogFile, from, BLCKSZ) != BLCKSZ) ! { ! /* if write didn't set errno, assume problem is no disk space */ ! if (errno == 0) ! errno = ENOSPC; ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not write to log file %u, segment %u at offset %u: %m", ! openLogId, openLogSeg, openLogOff))); ! } ! openLogOff += BLCKSZ; /* * If we just wrote the whole last page of a logfile segment, --- 1361,1370 ---- { XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); openLogFile = XLogFileOpen(openLogId, openLogSeg); } ! /* Add a page to buffer */ ! XLogMultipageWrite(currentIndex); /* * If we just wrote the whole last page of a logfile segment, *************** *** 1281,1288 **** * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage. */ ! if (openLogOff >= XLogSegSize && !ispartialpage) { issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ --- 1376,1384 ---- * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage. */ ! if (openLogOff + XLogMultipage.size >= XLogSegSize && !ispartialpage) { + XLogMultipageFlush(currentIndex); issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ *************** *** 1296,1303 **** LogwrtResult.Write = WriteRqst.Write; break; } ! Write->curridx = NextBufIdx(Write->curridx); } /* * If asked to flush, do so --- 1392,1400 ---- LogwrtResult.Write = WriteRqst.Write; break; } ! currentIndex = NextBufIdx(currentIndex); } + XLogMultipageFlush(currentIndex); /* * If asked to flush, do so *************** *** 1333,1338 **** --- 1430,1440 ---- LogwrtResult.Flush = LogwrtResult.Write; } + #ifdef XLOG_MULTIPAGE_WRITER_DEBUG + if (Write->curridx != currentIndex) + elog(PANIC, "Write->curridx != currentIndex (%d) : %d != %d", __LINE__, Write->curridx, currentIndex); + #endif + /* * Update shared-memory status * *************** *** 1354,1359 **** --- 1456,1466 ---- } Write->LogwrtResult = LogwrtResult; + + #ifdef XLOG_MULTIPAGE_WRITER_DEBUG + if (XLogMultipage.pages) + elog(PANIC, "XLogMultipage.pages not null (%d) : size=%d", __LINE__, XLogMultipage.size); + #endif } /* *************** *** 1476,1481 **** --- 1583,1591 ---- "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", record.xlogid, record.xrecoff, LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff); + + if (XLogMultipage.pages) + elog(PANIC, "xlog multipage-write usage error at XLogFlush"); } /* *************** *** 3380,3386 **** XLOGbuffers = MinXLOGbuffers; return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) ! + BLCKSZ * XLOGbuffers + MAXALIGN(sizeof(ControlFileData)); } --- 3490,3496 ---- XLOGbuffers = MinXLOGbuffers; return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) ! + XLOG_EXTRA_BUFFERS + BLCKSZ * XLOGbuffers + MAXALIGN(sizeof(ControlFileData)); } *************** *** 3398,3404 **** ShmemInitStruct("XLOG Ctl", MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) ! + BLCKSZ * XLOGbuffers, &foundXLog); ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); --- 3508,3514 ---- ShmemInitStruct("XLOG Ctl", MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) ! + XLOG_EXTRA_BUFFERS + BLCKSZ * XLOGbuffers, &foundXLog); ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); *************** *** 3426,3433 **** * buffers have worst-case alignment. */ XLogCtl->pages = ! ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) + ! sizeof(XLogRecPtr) * XLOGbuffers); memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers); /* --- 3536,3543 ---- * buffers have worst-case alignment. */ XLogCtl->pages = ! (char*)XLOG_BUFFERS_ALIGN(((char *) XLogCtl) ! + sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers); memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers); /* *************** *** 3465,3470 **** --- 3575,3584 ---- struct timeval tv; crc64 crc; + #ifdef ISSUE_BOOTSTRAP_MEMORYLEAK + void* buffer0; + #endif + /* * Select a hopefully-unique system identifier code for this * installation. We use the result of gettimeofday(), including the *************** *** 3485,3492 **** --- 3599,3616 ---- /* First timeline ID is always 1 */ ThisTimeLineID = 1; + /* XXX: Does buffer leak? */ + #ifdef ISSUE_BOOTSTRAP_MEMORYLEAK + buffer0 = malloc(BLCKSZ + XLOG_EXTRA_BUFFERS); + buffer = (char *) XLOG_BUFFERS_ALIGN(buffer0); + #elif 1 + buffer = (char *) XLOG_BUFFERS_ALIGN( + malloc(BLCKSZ + XLOG_EXTRA_BUFFERS) ); + #else /* Use malloc() to ensure buffer is MAXALIGNED */ buffer = (char *) malloc(BLCKSZ); + #endif + page = (XLogPageHeader) buffer; memset(buffer, 0, BLCKSZ); *************** *** 3576,3581 **** --- 3700,3709 ---- /* Bootstrap the commit log, too */ BootStrapCLOG(); BootStrapSUBTRANS(); + + #ifdef ISSUE_BOOTSTRAP_MEMORYLEAK + free(buffer0); + #endif } static char * *************** *** 5180,5185 **** --- 5308,5320 ---- new_sync_bit = OPEN_DATASYNC_FLAG; } #endif + #ifdef OPEN_DIRECT_FLAG + else if (pg_strcasecmp(method, "open_direct") == 0) + { + new_sync_method = SYNC_METHOD_OPEN; + new_sync_bit = OPEN_DIRECT_FLAG; + } + #endif else return NULL;