Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.210
retrieving revision 1.211
diff -c -r1.210 -r1.211
*** src/backend/access/transam/xlog.c	23 Jul 2005 15:31:16 -0000	1.210
--- src/backend/access/transam/xlog.c	29 Jul 2005 03:22:33 -0000	1.211
***************
*** 48,77 ****
  
  
  /*
   * This chunk of hackery attempts to determine which file sync methods
   * are available on the current platform, and to choose an appropriate
   * default method.	We assume that fsync() is always available, and that
   * configure determined whether fdatasync() is.
   */
  #if defined(O_SYNC)
! #define OPEN_SYNC_FLAG			O_SYNC
  #else
  #if defined(O_FSYNC)
! #define OPEN_SYNC_FLAG			O_FSYNC
  #endif
  #endif
  
  #if defined(O_DSYNC)
  #if defined(OPEN_SYNC_FLAG)
! #if O_DSYNC != OPEN_SYNC_FLAG
! #define OPEN_DATASYNC_FLAG		O_DSYNC
  #endif
  #else /* !defined(OPEN_SYNC_FLAG) */
  /* Win32 only has O_DSYNC */
! #define OPEN_DATASYNC_FLAG		O_DSYNC
  #endif
  #endif
  
  #if defined(OPEN_DATASYNC_FLAG)
  #define DEFAULT_SYNC_METHOD_STR	"open_datasync"
  #define DEFAULT_SYNC_METHOD		SYNC_METHOD_OPEN
--- 48,117 ----
  
  
  /*
+  *	Becauase O_DIRECT bypasses the kernel buffers, and because we never
+  *	read those buffers except during crash recovery, it is a win to use
+  *	it in all cases where we sync on each write().  We could allow O_DIRECT
+  *	with fsync(), but because skipping the kernel buffer forces writes out
+  *	quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
+  *	how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
+  */
+ #ifdef O_DIRECT
+ #define PG_O_DIRECT				O_DIRECT
+ #else
+ #define PG_O_DIRECT				0
+ #endif
+ 
+ /*
   * This chunk of hackery attempts to determine which file sync methods
   * are available on the current platform, and to choose an appropriate
   * default method.	We assume that fsync() is always available, and that
   * configure determined whether fdatasync() is.
   */
  #if defined(O_SYNC)
! #define CMP_OPEN_SYNC_FLAG		O_SYNC
  #else
  #if defined(O_FSYNC)
! #define CMP_OPEN_SYNC_FLAG		O_FSYNC
  #endif
  #endif
+ #define OPEN_SYNC_FLAG			(CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
  
  #if defined(O_DSYNC)
  #if defined(OPEN_SYNC_FLAG)
! #if O_DSYNC != CMP_OPEN_SYNC_FLAG
! #define OPEN_DATASYNC_FLAG		(O_DSYNC | PG_O_DIRECT)
  #endif
  #else /* !defined(OPEN_SYNC_FLAG) */
  /* Win32 only has O_DSYNC */
! #define OPEN_DATASYNC_FLAG		(O_DSYNC | PG_O_DIRECT)
  #endif
  #endif
  
+ /*
+  * Limitation of buffer-alignment for direct io depend on OS and filesystem,
+  * but BLCKSZ is assumed to be enough for it. 
+  */
+ #ifdef O_DIRECT
+ #define ALIGNOF_XLOG_BUFFER		BLCKSZ
+ #else
+ #define ALIGNOF_XLOG_BUFFER		MAXIMUM_ALIGNOF
+ #endif
+ 
+ /*
+  * Switch the alignment routine because ShmemAlloc() returns a max-aligned
+  * buffer and ALIGNOF_XLOG_BUFFER may be greater than MAXIMUM_ALIGNOF.
+  */
+ #if ALIGNOF_XLOG_BUFFER <= MAXIMUM_ALIGNOF
+ #define XLOG_BUFFER_ALIGN(LEN)	MAXALIGN((LEN))
+ #else
+ #define XLOG_BUFFER_ALIGN(LEN)	((LEN) + (ALIGNOF_XLOG_BUFFER))
+ #endif
+ /* assume sizeof(ptrdiff_t) == sizeof(void*) */
+ #define POINTERALIGN(ALIGNVAL,PTR)	\
+ 	((char *)(((ptrdiff_t) (PTR) + (ALIGNVAL-1)) & ~((ptrdiff_t) (ALIGNVAL-1))))
+ #define XLOG_BUFFER_POINTERALIGN(PTR)	\
+ 	POINTERALIGN((ALIGNOF_XLOG_BUFFER), (PTR))
+ 
  #if defined(OPEN_DATASYNC_FLAG)
  #define DEFAULT_SYNC_METHOD_STR	"open_datasync"
  #define DEFAULT_SYNC_METHOD		SYNC_METHOD_OPEN
***************
*** 469,474 ****
--- 509,525 ----
  static char *str_time(time_t tnow);
  static void issue_xlog_fsync(void);
  
+ /* XLog gather-write staffs */
+ typedef struct XLogPages
+ {
+ 	char	*head;		/* Head of first page */
+ 	int		 size;		/* Total bytes of pages == count(pages) * BLCKSZ */
+ 	int		 offset;	/* Offset in xlog segment file  */
+ } XLogPages;
+ static void XLogPageReset(XLogPages *pages);
+ static void XLogPageWrite(XLogPages *pages, int index);
+ static void XLogPageFlush(XLogPages *pages, int index);
+ 
  #ifdef WAL_DEBUG
  static void xlog_outrec(char *buf, XLogRecord *record);
  #endif
***************
*** 1245,1253 ****
  XLogWrite(XLogwrtRqst WriteRqst)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
- 	char	   *from;
  	bool		ispartialpage;
  	bool		use_existent;
  
  	/* We should always be inside a critical section here */
  	Assert(CritSectionCount > 0);
--- 1296,1305 ----
  XLogWrite(XLogwrtRqst WriteRqst)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
  	bool		use_existent;
+ 	int			currentIndex = Write->curridx;
+ 	XLogPages	pages;
  
  	/* We should always be inside a critical section here */
  	Assert(CritSectionCount > 0);
***************
*** 1258,1263 ****
--- 1310,1317 ----
  	 */
  	LogwrtResult = Write->LogwrtResult;
  
+ 	XLogPageReset(&pages);
+ 
  	while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
  	{
  		/*
***************
*** 1266,1279 ****
  		 * end of the last page that's been initialized by
  		 * AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[Write->curridx].xlogid,
! 				 XLogCtl->xlblocks[Write->curridx].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
--- 1320,1333 ----
  		 * end of the last page that's been initialized by
  		 * AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[currentIndex].xlogid,
! 				 XLogCtl->xlblocks[currentIndex].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
***************
*** 1281,1286 ****
--- 1335,1341 ----
  			/*
  			 * Switch to new logfile segment.
  			 */
+ 			XLogPageFlush(&pages, currentIndex);
  			if (openLogFile >= 0)
  			{
  				if (close(openLogFile))
***************
*** 1354,1384 ****
  			openLogOff = 0;
  		}
  
! 		/* Need to seek in the file? */
! 		if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
! 		{
! 			openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
! 			if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
! 				ereport(PANIC,
! 						(errcode_for_file_access(),
! 						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! 								openLogId, openLogSeg, openLogOff)));
! 		}
! 
! 		/* OK to write the page */
! 		from = XLogCtl->pages + Write->curridx * BLCKSZ;
! 		errno = 0;
! 		if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
! 		{
! 			/* if write didn't set errno, assume problem is no disk space */
! 			if (errno == 0)
! 				errno = ENOSPC;
! 			ereport(PANIC,
! 					(errcode_for_file_access(),
! 					 errmsg("could not write to log file %u, segment %u at offset %u: %m",
! 							openLogId, openLogSeg, openLogOff)));
! 		}
! 		openLogOff += BLCKSZ;
  
  		/*
  		 * If we just wrote the whole last page of a logfile segment,
--- 1409,1416 ----
  			openLogOff = 0;
  		}
  
! 		/* Add a page to buffer */
! 		XLogPageWrite(&pages, currentIndex);
  
  		/*
  		 * If we just wrote the whole last page of a logfile segment,
***************
*** 1390,1397 ****
  		 * This is also the right place to notify the Archiver that the
  		 * segment is ready to copy to archival storage.
  		 */
! 		if (openLogOff >= XLogSegSize && !ispartialpage)
  		{
  			issue_xlog_fsync();
  			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
  
--- 1422,1430 ----
  		 * This is also the right place to notify the Archiver that the
  		 * segment is ready to copy to archival storage.
  		 */
! 		if (openLogOff + pages.size >= XLogSegSize && !ispartialpage)
  		{
+ 			XLogPageFlush(&pages, currentIndex);
  			issue_xlog_fsync();
  			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
  
***************
*** 1405,1412 ****
  			LogwrtResult.Write = WriteRqst.Write;
  			break;
  		}
! 		Write->curridx = NextBufIdx(Write->curridx);
  	}
  
  	/*
  	 * If asked to flush, do so
--- 1438,1446 ----
  			LogwrtResult.Write = WriteRqst.Write;
  			break;
  		}
! 		currentIndex = NextBufIdx(currentIndex);
  	}
+ 	XLogPageFlush(&pages, currentIndex);
  
  	/*
  	 * If asked to flush, do so
***************
*** 3584,3590 ****
  	if (XLOGbuffers < MinXLOGbuffers)
  		XLOGbuffers = MinXLOGbuffers;
  
! 	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
  		+ BLCKSZ * XLOGbuffers +
  		MAXALIGN(sizeof(ControlFileData));
  }
--- 3618,3624 ----
  	if (XLOGbuffers < MinXLOGbuffers)
  		XLOGbuffers = MinXLOGbuffers;
  
! 	return XLOG_BUFFER_ALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
  		+ BLCKSZ * XLOGbuffers +
  		MAXALIGN(sizeof(ControlFileData));
  }
***************
*** 3601,3607 ****
  
  	XLogCtl = (XLogCtlData *)
  		ShmemInitStruct("XLOG Ctl",
! 						MAXALIGN(sizeof(XLogCtlData) +
  								 sizeof(XLogRecPtr) * XLOGbuffers)
  						+ BLCKSZ * XLOGbuffers,
  						&foundXLog);
--- 3635,3641 ----
  
  	XLogCtl = (XLogCtlData *)
  		ShmemInitStruct("XLOG Ctl",
! 						XLOG_BUFFER_ALIGN(sizeof(XLogCtlData) +
  								 sizeof(XLogRecPtr) * XLOGbuffers)
  						+ BLCKSZ * XLOGbuffers,
  						&foundXLog);
***************
*** 3630,3638 ****
  	 * Here, on the other hand, we must MAXALIGN to ensure the page
  	 * buffers have worst-case alignment.
  	 */
! 	XLogCtl->pages =
! 		((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
! 									  sizeof(XLogRecPtr) * XLOGbuffers);
  	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
  
  	/*
--- 3664,3672 ----
  	 * Here, on the other hand, we must MAXALIGN to ensure the page
  	 * buffers have worst-case alignment.
  	 */
! 	XLogCtl->pages = XLOG_BUFFER_POINTERALIGN(
! 		((char *) XLogCtl)
! 		+ sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers);
  	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
  
  	/*
***************
*** 3690,3699 ****
  	/* First timeline ID is always 1 */
  	ThisTimeLineID = 1;
  
! 	/* Use malloc() to ensure buffer is MAXALIGNED */
! 	buffer = (char *) malloc(BLCKSZ);
! 	page = (XLogPageHeader) buffer;
! 	memset(buffer, 0, BLCKSZ);
  
  	/* Set up information for the initial checkpoint record */
  	checkPoint.redo.xlogid = 0;
--- 3724,3732 ----
  	/* First timeline ID is always 1 */
  	ThisTimeLineID = 1;
  
!  	buffer = (char *) malloc(BLCKSZ + ALIGNOF_XLOG_BUFFER);
! 	page = (XLogPageHeader) XLOG_BUFFER_POINTERALIGN(buffer);
! 	memset(page, 0, BLCKSZ);
  
  	/* Set up information for the initial checkpoint record */
  	checkPoint.redo.xlogid = 0;
***************
*** 3745,3751 ****
  
  	/* Write the first page with the initial record */
  	errno = 0;
! 	if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
  	{
  		/* if write didn't set errno, assume problem is no disk space */
  		if (errno == 0)
--- 3778,3784 ----
  
  	/* Write the first page with the initial record */
  	errno = 0;
! 	if (write(openLogFile, page, BLCKSZ) != BLCKSZ)
  	{
  		/* if write didn't set errno, assume problem is no disk space */
  		if (errno == 0)
***************
*** 5837,5839 ****
--- 5870,5940 ----
  					 errmsg("could not remove file \"%s\": %m",
  							BACKUP_LABEL_FILE)));
  }
+ 
+ 
+ /* XLog gather-write staffs */
+ 
+ static void
+ XLogPageReset(XLogPages *pages)
+ {
+ 	memset(pages, 0, sizeof(*pages));
+ }
+ 
+ static void
+ XLogPageWrite(XLogPages *pages, int index)
+ {
+ 	char *page = XLogCtl->pages + index * BLCKSZ;
+ 	int size = BLCKSZ;
+ 	int offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+ 
+ 	if (pages->head + pages->size == page
+ 		&& pages->offset + pages->size == offset)
+ 	{	/* Pages are continuous. Append new page. */
+ 		pages->size += size;
+ 	}
+ 	else
+ 	{	/* Pages are not continuous. Flush and clear. */
+ 		XLogPageFlush(pages, PrevBufIdx(index));
+ 		pages->head = page;
+ 		pages->size = size;
+ 		pages->offset = offset;
+ 	}
+ }
+ 
+ static void
+ XLogPageFlush(XLogPages *pages, int index)
+ {
+ 	if (!pages->head)
+ 	{	/* No needs to write pages. */
+ 		XLogCtl->Write.curridx = index;
+ 		return;
+ 	}
+ 	
+ 	/* Need to seek in the file? */
+ 	if (openLogOff != pages->offset)
+ 	{
+ 		openLogOff = pages->offset;
+ 		if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
+ 			ereport(PANIC,
+ 					(errcode_for_file_access(),
+ 					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+ 							openLogId, openLogSeg, openLogOff)));
+ 	}
+ 
+ 	/* OK to write the page */
+ 	errno = 0;
+ 	if (write(openLogFile, pages->head, pages->size) != pages->size)
+ 	{
+ 		/* if write didn't set errno, assume problem is no disk space */
+ 		if (errno == 0)
+ 			errno = ENOSPC;
+ 		ereport(PANIC,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not write to log file %u, segment %u at offset %u: %m",
+ 						openLogId, openLogSeg, openLogOff)));
+ 	}
+ 
+ 	openLogOff += pages->size;
+ 	XLogCtl->Write.curridx = index;
+ 	XLogPageReset(pages);
+ }
