WIP(!) Double Writes

Started by David Fetterabout 14 years ago3 messages
#1David Fetter
david@fetter.org
2 attachment(s)

Folks,

Please find attached two patches, each under the PostgreSQL license,
one which implements page checksums vs. REL9_0_STABLE, the other which
depends on the first (i.e. requires that it be applied first) and
implements double writes. They're vs. REL9_0_STABLE because they're
extracted from vPostgres 1.0, a proprietary product currently based on
PostgreSQL 9.0.

I had wanted the first patch set to be:

- Against git head, and
- Based on feedback from Simon's patch.

The checksum part does the wrong thing, namely changes the page format
and has some race conditions that Simon's latest page checksum patch
removes. There are doubtless other warts, but I decided not to let
the perfect be the enemy of the good. If that's a mistake, it's all
mine.

I tested with "make check," which I realize isn't the most thorough,
but again, this is mostly to get out the general ideas of the patches
so people have actual code to poke at.

Dan Scales <scales@vmware.com> wrote the double write part and
extracted the page checksums from previous work by Ganesh
Venkitachalam, who's written here before. Dan will be answering
questions if I can't :) Jignesh Shah may be able to answer
performance questions, as he has been doing yeoman work on vPostgres
in that arena.

Let the brickbats begin!

Cheers,
David.

Caveats (from Dan):

The attached patch implements a "double_write" option. The idea of
this option (as has been discussed) is to handle the problem of torn
writes for buffer pages by writing (almost) all buffers twice, once to
a double-write file and once to the data file. If a crash occurs,
then a buffer should always have a correct copy either in the
double-write file or in the data file, so the double-write file can be
used to correct any torn writes to the data files. The "double_write"
option can therefore be used in place of "full_page_writes", and can
not only improve performance, but also reduce the size of the WAL log.

The patch currently makes use of checksums on the data pages. As has
been pointed out, double writes only strictly require that the pages
in the double write file be checksummed, and we can fairly easily make
data checksums optional. However, if data checksums are used, then
Postgres can provide more useful messages on exactly when torn pages
have occurred. It is very likely that a torn page happened if, during
recovery, the checksum of a data page is incorrect, but a copy of the
page with a valid checksum is in the double-write file.

To achieve efficiency, the checkpoint writer and bgwriter should batch
writes to multiple pages together. Currently, there is an option
"batched_buffer_writes" that specifies how many buffers to batch at a
time. However, we may want to remove that option from view, and just
force batched_buffer_writes to a default (32) if double_writes is
enabled.

In order to batch, the checkpoint writer must acquire multiple buffer
locks simultaneously as it is building up the batch. The patch does
simple deadlock detection that ends a batch early if the lock for the
next buffer that it wants to include in the batch is held. This
situation almost never happens.

Given the batching functionality, double writes by the checkpoint
writer (and bgwriter) is implemented efficiently by writing a batch of
pages to the double-write file and fsyncing, and then writing the
pages to the appropriate data files, and fsyncing all the necessary
data files. While the data fsyncing might be viewed as expensive, it
does help eliminate a lot of the fsync overhead at the end of
checkpoints. FlushRelationBuffers() and FlushDatabaseBuffers() can be
similarly batched.

We have some other code (not included) that sorts buffers to be
checkpointed in file/block order -- this can reduce fsync overhead
further by ensuring that each batch writes to only one or a few data
files.

The actual batch writes are done using writev(), which might have to
be replaced with equivalent code, if this is a portability issue. A
struct iocb structure is currently used for bookkeeping during the
low-level batching, since it is compatible with an async IO approach
as well (not included).

We do have to do the same double write for dirty buffer evictions by
individual backends (in BufferAlloc). This could be expensive, if
there are a lot of dirty buffer evictions (i.e. where the
checkpoint/bgwriter can generate enough clean pages for the backends).

Double writes must be done for any page which might be used after
recovery even if there was a full crash while writing the page. This
includes all writes to such pages in a checkpoint, not just the first,
since Postgres cannot do correct WAL recovery on a torn page (I
believe). Pages in temporary tables and some unlogged operations do
not require double writes. Feedback is especially welcome on whether
we have missed some kinds of pages that do/do not require double
writes.

As Jignesh has mentioned on this list, we see significant performance
gains when enabling double writes & disabling full_page_writes for
OLTP runs with sufficient buffer cache size. We are now trying to
measure some runs where the dirty buffer eviction rate by the backends
is high.

--
David Fetter <david@fetter.org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david.fetter@gmail.com
iCal: webcal://www.tripit.com/feed/ical/people/david74/tripit.ics

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate

Attachments:

checksum_90.difftext/plain; charset=us-asciiDownload
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 312,317 **** extern char *optarg;
--- 312,320 ----
  extern int	optind,
  			opterr;
  
+ extern int page_checksum;
+ extern bool fullPageWrites;
+ 
  #ifdef HAVE_INT_OPTRESET
  extern int	optreset;			/* might not be declared by system headers */
  #endif
***************
*** 736,741 **** PostmasterMain(int argc, char *argv[])
--- 739,765 ----
  				(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
  
  	/*
+ 	 * The idea here is that there will be checksum mismatches if there
+ 	 * are partial writes to pages during hardware crashes.  So, the user
+ 	 * should have full_page_writes enabled if page_checksum is enabled,
+ 	 * so that these pages are automatically fixed, else Postgres may
+ 	 * often get checksum errors after crashes (on pages that are in fact
+ 	 * partially written and hence corrupted).  With full_page_writes
+ 	 * enabled, Postgres will replace each page without ever looking at
+ 	 * the partially-written page and seeing an incorrect checksum.
+ 	 * Hence, checksums will detect only real disk corruptions (where the
+ 	 * disk reported a successful write but the data was still corrupted
+ 	 * at some point).
+ 	 *
+ 	 * Alternatively, we may want to leave this check out, for those
+ 	 * sophisticated users who know that their hardware/software setup
+ 	 * can never produce partial writes during crashes.
+ 	 */
+ 	if (page_checksum && !fullPageWrites)
+ 		ereport(ERROR,
+ 				(errmsg("full_page_writes must be enabled if page_checksum is enabled")));
+ 
+ 	/*
  	 * Other one-time internal sanity checks can go here, if they are fast.
  	 * (Put any slow processing further down, after postmaster.pid creation.)
  	 */
*** a/src/backend/storage/smgr/smgr.c
--- b/src/backend/storage/smgr/smgr.c
***************
*** 81,86 **** static const int NSmgr = lengthof(smgrsw);
--- 81,92 ----
   */
  static HTAB *SMgrRelationHash = NULL;
  
+ /* Page checksumming. */
+ static uint64 tempbuf[BLCKSZ/sizeof(uint64)];
+ extern bool page_checksum;
+ 
+ #define INVALID_CKSUM 0x1b0af034
+ 
  /* local function prototypes */
  static void smgrshutdown(int code, Datum arg);
  static void smgr_internal_unlink(RelFileNode rnode, ForkNumber forknum,
***************
*** 375,380 **** smgr_internal_unlink(RelFileNode rnode, ForkNumber forknum,
--- 381,439 ----
  }
  
  /*
+  * The initial value when computing the checksum for a data page.
+  */
+ static inline uint64
+ ChecksumInit(SMgrRelation reln, ForkNumber f, BlockNumber b)
+ {
+ 	return b + f;
+ }
+ 
+ /*
+  * Compute a checksum of buffer (with length len), using initial value
+  * cksum.  We use a relatively simple checksum calculation to avoid
+  * overhead, but could replace with some kind of CRC calculation.
+  */
+ static inline uint32 
+ ComputeChecksum(uint64 *buffer, uint32 len, uint64 cksum)
+ {
+ 	int i;
+ 
+ 	for (i = 0; i < len/sizeof(uint64); i += 4) {
+ 		cksum += (cksum << 5) + *buffer;
+ 		cksum += (cksum << 5) + *(buffer+1);
+ 		cksum += (cksum << 5) + *(buffer+2);
+ 		cksum += (cksum << 5) + *(buffer+3);
+ 		buffer += 4;
+ 	}
+ 	cksum = (cksum & 0xFFFFFFFF) + (cksum >> 32);
+ 	return cksum;
+ }
+ 
+ /*
+  * Copy buffer to dst and compute the checksum during the copy (so that
+  * the checksum is correct for the final contents of dst).
+  */
+ static inline uint32 
+ CopyAndComputeChecksum(uint64 *dst, volatile uint64 *buffer,
+ 					   uint32 len, uint64 cksum)
+ {
+ 	int i;
+ 
+ 	for (i = 0; i < len/sizeof(uint64); i += 4) {
+ 		cksum += (cksum << 5) + (*dst = *buffer);
+ 		cksum += (cksum << 5) + (*(dst+1) = *(buffer+1));
+ 		cksum += (cksum << 5) + (*(dst+2) = *(buffer+2));
+ 		cksum += (cksum << 5) + (*(dst+3) = *(buffer+3));
+ 		dst += 4;
+ 		buffer += 4;
+ 	}
+ 	cksum = (cksum & 0xFFFFFFFF) + (cksum >> 32);
+ 	return cksum;
+ }
+ 
+ 
+ /*
   *	smgrextend() -- Add a new block to a file.
   *
   *		The semantics are nearly the same as smgrwrite(): write at the
***************
*** 387,394 **** void
  smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  		   char *buffer, bool isTemp)
  {
! 	(*(smgrsw[reln->smgr_which].smgr_extend)) (reln, forknum, blocknum,
! 											   buffer, isTemp);
  }
  
  /*
--- 446,470 ----
  smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  		   char *buffer, bool isTemp)
  {
! 	PageHeader p;
! 	Assert(PageGetPageLayoutVersion(((PageHeader)buffer)) == PG_PAGE_LAYOUT_VERSION ||
! 		   PageIsNew(buffer));
! 	if (page_checksum) {
! 		p = (PageHeader)tempbuf;
! 		((PageHeader)buffer)->cksum = 0;
! 		/*
! 		 * We copy and compute the checksum, and then write out the data
! 		 * from the copy, so that we avoid any problem with hint bits
! 		 * changing after we compute the checksum.
! 		 */
! 		p->cksum = CopyAndComputeChecksum(tempbuf, (uint64 *)buffer, BLCKSZ,
! 										  ChecksumInit(reln, forknum, blocknum));
! 	} else {
! 		p = (PageHeader)buffer;
! 		p->cksum = INVALID_CKSUM;
! 	}
! 	(*(smgrsw[reln->smgr_which].smgr_extend))(reln, forknum, blocknum,
! 											  (char *)p, isTemp);
  }
  
  /*
***************
*** 412,418 **** void
--- 488,516 ----
  smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  		 char *buffer)
  {
+ 	PageHeader p = (PageHeader) buffer;
+ 
  	(*(smgrsw[reln->smgr_which].smgr_read)) (reln, forknum, blocknum, buffer);
+ 	Assert(PageIsNew(p) || PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION);
+ 	if (page_checksum && p->cksum != INVALID_CKSUM) {
+ 		const uint32 diskCksum = p->cksum;
+ 		uint32 cksum;
+ 
+ 		p->cksum = 0;
+ 		cksum = ComputeChecksum((uint64 *)buffer, BLCKSZ,
+ 								ChecksumInit(reln, forknum, blocknum));
+ 		if (cksum != diskCksum) {
+ 			ereport(PANIC, (0, errmsg("checksum mismatch: disk has %#x, should be %#x\n"
+ 									  "filename %s, BlockNum %u, block specifier %d/%d/%d/%d/%u",
+ 									  diskCksum, (uint32)cksum,
+ 									  relpath(reln->smgr_rnode, forknum),
+ 									  blocknum,
+ 									  reln->smgr_rnode.spcNode,
+ 									  reln->smgr_rnode.dbNode,
+ 									  reln->smgr_rnode.relNode,
+ 									  forknum, blocknum)));
+ 		}
+ 	}
  }
  
  /*
***************
*** 434,441 **** void
  smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  		  char *buffer, bool isTemp)
  {
! 	(*(smgrsw[reln->smgr_which].smgr_write)) (reln, forknum, blocknum,
! 											  buffer, isTemp);
  }
  
  /*
--- 532,556 ----
  smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  		  char *buffer, bool isTemp)
  {
! 	PageHeader p;
! 
! 	if (page_checksum) {
! 		p = (PageHeader)tempbuf;
! 		((PageHeader)buffer)->cksum = 0;
! 		/*
! 		 * We copy and compute the checksum, and then write out the data
! 		 * from the copy, so that we avoid any problem with hint bits
! 		 * changing after we compute the checksum.
! 		 */
! 		p->cksum = CopyAndComputeChecksum(tempbuf, (uint64 *)buffer, BLCKSZ,
! 										  ChecksumInit(reln, forknum, blocknum));
! 	} else {
! 		p = (PageHeader)buffer;
! 		p->cksum = INVALID_CKSUM;
! 	}
! 	Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION);
! 	(*(smgrsw[reln->smgr_which].smgr_write))(reln, forknum, blocknum,
! 											 (char *)p, isTemp);
  }
  
  /*
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 367,372 **** bool		default_with_oids = false;
--- 367,374 ----
  bool		SQL_inheritance = true;
  
  bool		Password_encryption = true;
+ bool		page_checksum = true;
+ 
  
  int			log_min_error_statement = ERROR;
  int			log_min_messages = WARNING;
***************
*** 1270,1275 **** static struct config_bool ConfigureNamesBool[] =
--- 1272,1285 ----
  		false, NULL, NULL
  	},
  
+ 	{
+ 		{"page_checksum", PGC_POSTMASTER, CUSTOM_OPTIONS,
+ 			gettext_noop("enable disk page checksumming"),
+ 			NULL
+ 		},
+ 		&page_checksum, true, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 526,528 ****
--- 526,529 ----
  #------------------------------------------------------------------------------
  
  #custom_variable_classes = ''		# list of custom variable class names
+ #page_cksum = on
*** a/src/include/storage/bufpage.h
--- b/src/include/storage/bufpage.h
***************
*** 132,137 **** typedef struct PageHeaderData
--- 132,138 ----
  	LocationIndex pd_special;	/* offset to start of special space */
  	uint16		pd_pagesize_version;
  	TransactionId pd_prune_xid; /* oldest prunable XID, or zero if none */
+ 	uint32		cksum;			/* page checksum */
  	ItemIdData	pd_linp[1];		/* beginning of line pointer array */
  } PageHeaderData;
  
***************
*** 154,160 **** typedef PageHeaderData *PageHeader;
  										 * tuple? */
  #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
  										 * everyone */
- 
  #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
  /*
--- 155,160 ----
***************
*** 165,172 **** typedef PageHeaderData *PageHeader;
   * Release 8.3 uses 4; it changed the HeapTupleHeader layout again, and
   *		added the pd_flags field (by stealing some bits from pd_tli),
   *		as well as adding the pd_prune_xid field (which enlarges the header).
   */
! #define PG_PAGE_LAYOUT_VERSION		4
  
  
  /* ----------------------------------------------------------------
--- 165,173 ----
   * Release 8.3 uses 4; it changed the HeapTupleHeader layout again, and
   *		added the pd_flags field (by stealing some bits from pd_tli),
   *		as well as adding the pd_prune_xid field (which enlarges the header).
+  * Release x.y uses 5; we added checksums to heap/index/fsm files.
   */
! #define PG_PAGE_LAYOUT_VERSION		5
  
  
  /* ----------------------------------------------------------------
double_writes_90.difftext/plain; charset=us-asciiDownload
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 5008,5021 **** heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
   * to be done here.)
   */
  void
! heap_sync(Relation rel)
  {
  	/* temp tables never need fsync */
  	if (rel->rd_istemp)
  		return;
  
  	/* main heap */
! 	FlushRelationBuffers(rel);
  	/* FlushRelationBuffers will have opened rd_smgr */
  	smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM);
  
--- 5008,5021 ----
   * to be done here.)
   */
  void
! heap_sync(Relation rel, bool wasLogged)
  {
  	/* temp tables never need fsync */
  	if (rel->rd_istemp)
  		return;
  
  	/* main heap */
! 	FlushRelationBuffers(rel, wasLogged);
  	/* FlushRelationBuffers will have opened rd_smgr */
  	smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM);
  
***************
*** 5027,5033 **** heap_sync(Relation rel)
  		Relation	toastrel;
  
  		toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
! 		FlushRelationBuffers(toastrel);
  		smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM);
  		heap_close(toastrel, AccessShareLock);
  	}
--- 5027,5033 ----
  		Relation	toastrel;
  
  		toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
! 		FlushRelationBuffers(toastrel, wasLogged);
  		smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM);
  		heap_close(toastrel, AccessShareLock);
  	}
*** a/src/backend/access/heap/rewriteheap.c
--- b/src/backend/access/heap/rewriteheap.c
***************
*** 290,296 **** end_heap_rewrite(RewriteState state)
  	 * wrote before the checkpoint.
  	 */
  	if (!state->rs_new_rel->rd_istemp)
! 		heap_sync(state->rs_new_rel);
  
  	/* Deleting the context frees everything */
  	MemoryContextDelete(state->rs_cxt);
--- 290,296 ----
  	 * wrote before the checkpoint.
  	 */
  	if (!state->rs_new_rel->rd_istemp)
! 		heap_sync(state->rs_new_rel, state->rs_use_wal);
  
  	/* Deleting the context frees everything */
  	MemoryContextDelete(state->rs_cxt);
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 5980,5985 **** StartupXLOG(void)
--- 5980,5991 ----
  		InRecovery = true;
  	}
  
+ 	/*
+ 	 * If double write file exists, see if there are any pages to
+ 	 * be recovered because of torn writes.
+ 	 */
+ 	smgr_recoverdoublewritefile();
+ 
  	/* REDO */
  	if (InRecovery)
  	{
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 2223,2229 **** CopyFrom(CopyState cstate)
  	 * indexes since those use WAL anyway)
  	 */
  	if (hi_options & HEAP_INSERT_SKIP_WAL)
! 		heap_sync(cstate->rel);
  }
  
  
--- 2223,2229 ----
  	 * indexes since those use WAL anyway)
  	 */
  	if (hi_options & HEAP_INSERT_SKIP_WAL)
! 		heap_sync(cstate->rel, false);
  }
  
  
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 3294,3300 **** ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
  
  		/* If we skipped writing WAL, then we need to sync the heap. */
  		if (hi_options & HEAP_INSERT_SKIP_WAL)
! 			heap_sync(newrel);
  
  		heap_close(newrel, NoLock);
  	}
--- 3294,3300 ----
  
  		/* If we skipped writing WAL, then we need to sync the heap. */
  		if (hi_options & HEAP_INSERT_SKIP_WAL)
! 			heap_sync(newrel, false);
  
  		heap_close(newrel, NoLock);
  	}
***************
*** 7114,7120 **** ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
  	 * in shared buffers.  We assume no new changes will be made while we are
  	 * holding exclusive lock on the rel.
  	 */
! 	FlushRelationBuffers(rel);
  
  	/*
  	 * Relfilenodes are not unique across tablespaces, so we need to allocate
--- 7114,7120 ----
  	 * in shared buffers.  We assume no new changes will be made while we are
  	 * holding exclusive lock on the rel.
  	 */
! 	FlushRelationBuffers(rel, true);
  
  	/*
  	 * Relfilenodes are not unique across tablespaces, so we need to allocate
***************
*** 7213,7219 **** copy_relation_data(SMgrRelation src, SMgrRelation dst,
  		/* If we got a cancel signal during the copy of the data, quit */
  		CHECK_FOR_INTERRUPTS();
  
! 		smgrread(src, forkNum, blkno, buf);
  
  		/* XLOG stuff */
  		if (use_wal)
--- 7213,7219 ----
  		/* If we got a cancel signal during the copy of the data, quit */
  		CHECK_FOR_INTERRUPTS();
  
! 		smgrread(src, forkNum, blkno, buf, NULL);
  
  		/* XLOG stuff */
  		if (use_wal)
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 2351,2357 **** CloseIntoRel(QueryDesc *queryDesc)
  
  		/* If we skipped using WAL, must heap_sync before commit */
  		if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
! 			heap_sync(myState->rel);
  
  		/* close rel, but keep lock until commit */
  		heap_close(myState->rel, NoLock);
--- 2351,2357 ----
  
  		/* If we skipped using WAL, must heap_sync before commit */
  		if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
! 			heap_sync(myState->rel, false);
  
  		/* close rel, but keep lock until commit */
  		heap_close(myState->rel, NoLock);
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 314,319 **** extern int	optind,
--- 314,320 ----
  
  extern int page_checksum;
  extern bool fullPageWrites;
+ extern int doubleWrites;
  
  #ifdef HAVE_INT_OPTRESET
  extern int	optreset;			/* might not be declared by system headers */
***************
*** 737,763 **** PostmasterMain(int argc, char *argv[])
  	if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
  		ereport(ERROR,
  				(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
! 
! 	/*
! 	 * The idea here is that there will be checksum mismatches if there
! 	 * are partial writes to pages during hardware crashes.  So, the user
! 	 * should have full_page_writes enabled if page_checksum is enabled,
! 	 * so that these pages are automatically fixed, else Postgres may
! 	 * often get checksum errors after crashes (on pages that are in fact
! 	 * partially written and hence corrupted).  With full_page_writes
! 	 * enabled, Postgres will replace each page without ever looking at
! 	 * the partially-written page and seeing an incorrect checksum.
! 	 * Hence, checksums will detect only real disk corruptions (where the
! 	 * disk reported a successful write but the data was still corrupted
! 	 * at some point).
! 	 *
! 	 * Alternatively, we may want to leave this check out, for those
! 	 * sophisticated users who know that their hardware/software setup
! 	 * can never produce partial writes during crashes.
! 	 */
! 	if (page_checksum && !fullPageWrites)
  		ereport(ERROR,
! 				(errmsg("full_page_writes must be enabled if page_checksum is enabled")));
  
  	/*
  	 * Other one-time internal sanity checks can go here, if they are fast.
--- 738,747 ----
  	if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
  		ereport(ERROR,
  				(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
! 	if (doubleWrites && !page_checksum) {
  		ereport(ERROR,
! 				(errmsg("page_checksum must be enabled if double_writes is enabled")));
! 	}
  
  	/*
  	 * Other one-time internal sanity checks can go here, if they are fast.
*** a/src/backend/storage/buffer/buf_init.c
--- b/src/backend/storage/buffer/buf_init.c
***************
*** 127,132 **** InitBufferPool(void)
--- 127,133 ----
  
  	/* Init other shared buffer-management stuff */
  	StrategyInitialize(!foundDescs);
+ 	Bufmgr_DoubleWriteInit();
  }
  
  /*
*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
***************
*** 41,46 ****
--- 41,47 ----
  #include "postmaster/bgwriter.h"
  #include "storage/buf_internals.h"
  #include "storage/bufmgr.h"
+ #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "storage/proc.h"
  #include "storage/smgr.h"
***************
*** 60,65 ****
--- 61,67 ----
  /* Bits in SyncOneBuffer's return value */
  #define BUF_WRITTEN				0x01
  #define BUF_REUSABLE			0x02
+ #define BUF_RETRY_AFTER_FLUSH   0x20
  
  
  /* GUC variables */
***************
*** 74,81 **** double		bgwriter_lru_multiplier = 2.0;
   */
  int			target_prefetch_pages = 0;
  
  /* local state for StartBufferIO and related functions */
! static volatile BufferDesc *InProgressBuf = NULL;
  static bool IsForInput;
  
  /* local state for LockBufferForCleanup */
--- 76,86 ----
   */
  int			target_prefetch_pages = 0;
  
+ extern bool doubleWrites;
+ 
  /* local state for StartBufferIO and related functions */
! static volatile BufferDesc *InProgressBuf[MAX_BATCHED_WRITES];
! static int InProgressIndex = 0;
  static bool IsForInput;
  
  /* local state for LockBufferForCleanup */
***************
*** 90,96 **** static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
--- 95,106 ----
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
  static void BufferSync(int flags);
+ static int	SyncOneBufferPrepare(int buf_id, bool skip_recently_used);
  static int	SyncOneBuffer(int buf_id, bool skip_recently_used);
+ static int WriteOneBufferPrepare(int buf_id, char *callerBuf,
+ 							   bool skip_recently_used);
+ static void WriteOneBufferDone(int buf_id, bool doUnpin);
+ 
  static void WaitIO(volatile BufferDesc *buf);
  static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
  static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
***************
*** 102,107 **** static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, ForkNumber forkNum,
--- 112,122 ----
  			bool *foundPtr);
  static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
  static void AtProcExit_Buffers(int code, Datum arg);
+ static bool FlushBufferPrepare(int buf_id, volatile BufferDesc *buf,
+ 							   char *callerBuf,
+ 							   SMgrRelation reln);
+ static void CheckDoubleWriteFile(void);
+ static void FlushBatch(bool doUnpin);
  
  
  /*
***************
*** 426,432 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
  			MemSet((char *) bufBlock, 0, BLCKSZ);
  		else
  		{
! 			smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
  
  			/* check for garbage data */
  			if (!PageHeaderIsValid((PageHeader) bufBlock))
--- 441,447 ----
  			MemSet((char *) bufBlock, 0, BLCKSZ);
  		else
  		{
! 			smgrread(smgr, forkNum, blockNum, (char *) bufBlock, NULL);
  
  			/* check for garbage data */
  			if (!PageHeaderIsValid((PageHeader) bufBlock))
***************
*** 639,646 **** BufferAlloc(SMgrRelation smgr, ForkNumber forkNum,
  													 smgr->smgr_rnode.dbNode,
  												   smgr->smgr_rnode.relNode);
  
! 				FlushBuffer(buf, NULL);
! 				LWLockRelease(buf->content_lock);
  
  				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
  													smgr->smgr_rnode.spcNode,
--- 654,676 ----
  													 smgr->smgr_rnode.dbNode,
  												   smgr->smgr_rnode.relNode);
  
! 				if (doubleWrites) {
! 					CheckDoubleWriteFile();
! 					/*
! 					 * Make a one-element batch, so we can do the double
! 					 * writes.  We are serializing on the double write
! 					 * file here.  We could also have a small double
! 					 * write file for each backend.
! 					 */
! 					if (FlushBufferPrepare(buf->buf_id, buf, NULL, NULL)) {
! 						FlushBatch(false);
! 					} else {
! 						LWLockRelease(buf->content_lock);
! 					}
! 				} else {
! 					FlushBuffer(buf, NULL);
! 					LWLockRelease(buf->content_lock);
! 				}
  
  				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
  													smgr->smgr_rnode.spcNode,
***************
*** 1146,1151 **** UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
--- 1176,1343 ----
  	}
  }
  
+ extern bool page_checksum;
+ extern int batched_buffer_writes;
+ 
+ /* File to which double writes are going */
+ File doubleWriteFile = 0;
+ LWLockId doubleWriteLock;
+ /* Buffers in to which to do checksumming of batched buffers */
+ char *origCksumBuf, *cksumBuf;
+ 
+ /* Array of buffer writes that are being batched. */
+ struct SMgrWriteList writeList[MAX_BATCHED_WRITES];
+ int writeIndex;
+ 
+ 
+ 
+ /*
+  * Unlock the buffers in the writeList[] (called after all IO is
+  * complete).  Release them in reverse order, so InProgressBuf[] is
+  * managed efficiently.
+  */
+ static void
+ UnlockBufs(bool doUnpin)
+ {
+ 	int j;
+ 
+ 	for (j = writeIndex-1; j >= 0; j--) {
+ 		WriteOneBufferDone(writeList[j].buf_id, doUnpin);
+ 	}
+ 	writeIndex = 0;
+ }
+ 
+ /*
+  * Initialize the lock on the double write file (run once in postmaster).
+  */
+ void
+ Bufmgr_DoubleWriteInit()
+ {
+ 	doubleWriteLock = LWLockAssign();
+ }
+ 
+ /*
+  * Pre-allocate file with zeros in BLOCKSZ pages to at least len.
+  */
+ static void
+ ZeroFile(FileName name, int len)
+ {
+ 	char *zbuffer;
+ 	int nbytes;
+ 	
+ 	int fd = BasicOpenFile(name, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
+ 	if (fd < 0) {
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not create file \"%s\": %m", name)));
+ 	}
+ 	zbuffer = (char *) palloc0(BLCKSZ);
+ 	for (nbytes = 0; nbytes < len; nbytes += BLCKSZ) {
+ 		errno = 0;
+ 		if ((int) write(fd, zbuffer, BLCKSZ) != (int) BLCKSZ)
+ 		{
+ 			int			save_errno = errno;
+ 
+ 			/*
+ 			 * If we fail to make the file, delete it to release disk space
+ 			 */
+ 			unlink(name);
+ 			/* if write didn't set errno, assume problem is no disk space */
+ 			errno = save_errno ? save_errno : ENOSPC;
+ 
+ 			ereport(ERROR,
+ 					(errcode_for_file_access(),
+ 					 errmsg("could not write to file \"%s\": %m", name)));
+ 		}
+ 	}
+ 	pfree(zbuffer);
+ 
+ 	if (pg_fsync(fd) != 0)
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not fsync file \"%s\": %m", name)));
+ 
+ 	if (close(fd))
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not close file \"%s\": %m", name)));
+ }
+ 
+ /*
+  * Allocate the buffers used for checksumming a batch of writes.
+  */
+ static bool
+ AllocCksumBuf()
+ {
+ 	if (batched_buffer_writes > 0 && page_checksum && cksumBuf == NULL) {
+ 		origCksumBuf = (char *)palloc(batched_buffer_writes * BLCKSZ + ALIGNOF_BUFFER);
+ 		cksumBuf = (char *)TYPEALIGN(ALIGNOF_BUFFER, origCksumBuf);
+ 		return true;
+ 	}
+ 	return false;
+ }
+ 
+ static void
+ FreeCksumBuf()
+ {
+ 	if (cksumBuf != NULL) {
+ 		pfree(origCksumBuf);
+ 		cksumBuf = origCksumBuf = NULL;
+ 	}
+ }
+ 
+ /* Round up a to next multiple of b */
+ #define ROUNDUP(a, b)  ((((a)+(b)-1)/(b))*(b))
+ 
+ /*
+  * Open up the doubleWrite file and zero it out, if necessary.
+  */
+ static void
+ CheckDoubleWriteFile()
+ {
+ 	if (doubleWrites) {
+ 		if (doubleWriteFile == 0) {
+ 			struct stat stat_buf;
+ 			char *name = smgr_doublewritefilename();
+ 			int len = ROUNDUP(batched_buffer_writes * BLCKSZ + DOUBLE_WRITE_HEADER_SIZE,
+ 							  BLCKSZ);
+ 
+ 			/* Make sure that the file is completely allocated */
+ 			LWLockAcquire(doubleWriteLock, LW_EXCLUSIVE);
+ 			if (stat(name, &stat_buf) == -1 || stat_buf.st_size < len) {
+ 				/* Fill out file if it is not big enough */
+ 				ZeroFile(name, len);
+ 			}
+ 			LWLockRelease(doubleWriteLock);
+ 
+ 			doubleWriteFile = PathNameOpenFile(name,
+ 											   O_RDWR | O_CREAT,
+ 											   S_IRUSR | S_IWUSR);
+ 			if (doubleWriteFile < 0) {
+ 				elog(PANIC, "Couldn't open double-write file %s", name);
+ 			}
+ 			pfree(name);
+ 		}
+ 	}
+ }
+ 
+ /*
+  * Write out the current batch of buffers, doing a double write if
+  * required and releasing the buffer locks.
+  */
+ static void
+ FlushBatch(bool doUnpin)
+ {
+ 	if (doubleWrites) {
+ 		LWLockAcquire(doubleWriteLock, LW_EXCLUSIVE);
+ 	}
+ 	smgrbwrite(writeIndex, writeList, doubleWriteFile, false);
+ 	if (doubleWrites) {
+ 		LWLockRelease(doubleWriteLock);
+ 	}
+ 	UnlockBufs(doUnpin);
+ }
+ 
  /*
   * BufferSync -- Write out all dirty buffers in the pool.
   *
***************
*** 1160,1165 **** BufferSync(int flags)
--- 1352,1363 ----
  	int			num_to_scan;
  	int			num_to_write;
  	int			num_written;
+ 	int64		targetTime, now, start;
+ 	int			cur_num_written;
+ 	bool		batched;
+ 
+ 	AllocCksumBuf();
+ 	CheckDoubleWriteFile();
  
  	/* Make sure we can handle the pin inside SyncOneBuffer */
  	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
***************
*** 1205,1210 **** BufferSync(int flags)
--- 1403,1411 ----
  
  	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
  
+ 	if (log_checkpoints) {
+ 		elog(LOG, "checkpoint: %d buffers to write", num_to_write);
+ 	}
  	/*
  	 * Loop over all buffers again, and write the ones (still) marked with
  	 * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
***************
*** 1213,1218 **** BufferSync(int flags)
--- 1414,1426 ----
  	 * Note that we don't read the buffer alloc count here --- that should be
  	 * left untouched till the next BgBufferSync() call.
  	 */
+ 	if (writeIndex != 0) {
+ 		elog(LOG, "Incomplete batch");
+ 	}
+ 	start = GetCurrentTimestamp();
+ 	targetTime = start + 30000000;
+ 	cur_num_written = 0;
+ 	batched = (batched_buffer_writes > 0);
  	buf_id = StrategySyncStart(NULL, NULL);
  	num_to_scan = NBuffers;
  	num_written = 0;
***************
*** 1234,1245 **** BufferSync(int flags)
  		 */
  		if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
  		{
! 			if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
  			{
  				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
  				BgWriterStats.m_buf_written_checkpoints++;
  				num_written++;
  
  				/*
  				 * We know there are at most num_to_write buffers with
  				 * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
--- 1442,1489 ----
  		 */
  		if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
  		{
! 			int r;
! 			if (!batched) {
! 				r = SyncOneBuffer(buf_id, false);
! 			} else {
! retry:
! 				r = WriteOneBufferPrepare(buf_id,
! 										  cksumBuf + writeIndex * BLCKSZ,
! 										  false);
! 				if (r & BUF_RETRY_AFTER_FLUSH) {
! 					if (writeIndex > 0) {
! 						elog(LOG, "Couldn't get context_lock, flushing");
! 						FlushBatch(true);
! 					} else {
! 						elog(LOG, "Couldn't get context_lock, sleeping");
! 						pg_usleep(10000);
! 					}
! 					goto retry;
! 				}
! 				if (0) elog(LOG, "async io %d", buf_id);
! 					
! 			}
! 			
! 			if (r & BUF_WRITTEN)
  			{
  				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
  				BgWriterStats.m_buf_written_checkpoints++;
  				num_written++;
  
+ 				if (!batched) {
+ 					/*
+ 					 * Perform normal bgwriter duties and sleep to throttle our
+ 					 * I/O rate.
+ 					 */
+ 					CheckpointWriteDelay(flags,
+ 										 (double)num_written / num_to_write);
+ 				}
+ 			}
+ 		}
+ 		if (batched) {
+ 			if (writeIndex == batched_buffer_writes ||
+ 				(writeIndex > 0 && num_to_scan == 0)) {
+ 				FlushBatch(true);
  				/*
  				 * We know there are at most num_to_write buffers with
  				 * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
***************
*** 1262,1267 **** BufferSync(int flags)
--- 1506,1523 ----
  									 (double) num_written / num_to_write);
  			}
  		}
+ 		if (log_checkpoints &&
+ 			((now = GetCurrentTimestamp()) >= targetTime ||
+ 			 num_to_scan == 0)) {
+ 			/* Print out a checkpoint progress message */
+ 			int progress = num_written * 100 / num_to_write;
+ 			int target = (now - start) * 100 / (int)(1000000 *CheckPointTimeout * CheckPointCompletionTarget);
+ 			elog(LOG, "%d written/s, %d%%, target %d%%",
+ 				 (num_written - cur_num_written) / 30,
+ 				 progress, target);
+ 			cur_num_written  = num_written;
+ 			targetTime = now + 30000000;
+ 		}
  
  		if (++buf_id >= NBuffers)
  			buf_id = 0;
***************
*** 1274,1279 **** BufferSync(int flags)
--- 1530,1536 ----
  	CheckpointStats.ckpt_bufs_written += num_written;
  
  	TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
+ 	FreeCksumBuf();
  }
  
  /*
***************
*** 1320,1325 **** BgBufferSync(void)
--- 1577,1584 ----
  	int			num_to_scan;
  	int			num_written;
  	int			reusable_buffers;
+ 	bool		batched;
+ 	bool		alloc;
  
  	/*
  	 * Find out where the freelist clock sweep currently is, and how many
***************
*** 1500,1509 **** BgBufferSync(void)
  	num_written = 0;
  	reusable_buffers = reusable_buffers_est;
  
  	/* Execute the LRU scan */
  	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
  	{
! 		int			buffer_state = SyncOneBuffer(next_to_clean, true);
  
  		if (++next_to_clean >= NBuffers)
  		{
--- 1759,1792 ----
  	num_written = 0;
  	reusable_buffers = reusable_buffers_est;
  
+ 	alloc = AllocCksumBuf();
+ 	CheckDoubleWriteFile();
+ 
+ 	batched = (batched_buffer_writes > 0);
  	/* Execute the LRU scan */
  	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
  	{
! 		int   buffer_state;
! 
! 		if (!batched) {
! 			buffer_state = SyncOneBuffer(next_to_clean, true);
! 		} else {
! retry:
! 			buffer_state = WriteOneBufferPrepare(next_to_clean,
! 												 cksumBuf + writeIndex * BLCKSZ,
! 												 false);
! 			if (buffer_state & BUF_RETRY_AFTER_FLUSH) {
! 				if (writeIndex > 0) {
! 					elog(LOG, "Couldn't get context_lock, flushing");
! 					FlushBatch(true);
! 				} else {
! 					elog(LOG, "Couldn't get context_lock, sleeping");
! 					pg_usleep(10000);
! 				}
! 				goto retry;
! 			}
! 		}
! 				
  
  		if (++next_to_clean >= NBuffers)
  		{
***************
*** 1523,1528 **** BgBufferSync(void)
--- 1806,1823 ----
  		}
  		else if (buffer_state & BUF_REUSABLE)
  			reusable_buffers++;
+ 
+ 		/* If we've filled a batch, write it out */
+ 		if (batched && writeIndex == batched_buffer_writes) {
+ 			FlushBatch(true);
+ 		}
+ 	}
+ 	/* If we have an incomplete batch, write it out. */
+ 	if (batched && writeIndex > 0) {
+ 		FlushBatch(true);
+ 	}
+ 	if (alloc) {
+ 		FreeCksumBuf();
  	}
  
  	BgWriterStats.m_buf_written_clean += num_written;
***************
*** 1560,1582 **** BgBufferSync(void)
  }
  
  /*
!  * SyncOneBuffer -- process a single buffer during syncing.
!  *
!  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
!  * buffers marked recently used, as these are not replacement candidates.
!  *
!  * Returns a bitmask containing the following flag bits:
!  *	BUF_WRITTEN: we wrote the buffer.
!  *	BUF_REUSABLE: buffer is available for replacement, ie, it has
!  *		pin count 0 and usage count 0.
!  *
!  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
!  * after locking it, but we don't care all that much.)
!  *
!  * Note: caller must have done ResourceOwnerEnlargeBuffers.
   */
  static int
! SyncOneBuffer(int buf_id, bool skip_recently_used)
  {
  	volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
  	int			result = 0;
--- 1855,1864 ----
  }
  
  /*
!  * Prepare for syncing out one buffer.
   */
  static int
! SyncOneBufferPrepare(int buf_id, bool skip_recently_used)
  {
  	volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
  	int			result = 0;
***************
*** 1613,1619 **** SyncOneBuffer(int buf_id, bool skip_recently_used)
  	 * buffer is clean by the time we've locked it.)
  	 */
  	PinBuffer_Locked(bufHdr);
! 	LWLockAcquire(bufHdr->content_lock, LW_SHARED);
  
  	FlushBuffer(bufHdr, NULL);
  
--- 1895,1942 ----
  	 * buffer is clean by the time we've locked it.)
  	 */
  	PinBuffer_Locked(bufHdr);
! 
! 	/*
! 	 * Since we are acquiring multiple buffers, we must do a
! 	 * conditional-acquire.  If we can't get the lock (very infrequent),
! 	 * then we force a flush of the current batch of buffers, so we can
! 	 * release our current set of locks before trying to get this one
! 	 * again.
! 	 */
! 	if (!LWLockConditionalAcquire(bufHdr->content_lock, LW_SHARED)) {
! 		UnpinBuffer(bufHdr, true);
! 		return result | BUF_RETRY_AFTER_FLUSH;
! 	}
! 	return result | BUF_WRITTEN;
! }	
! 
! /*
!  * SyncOneBuffer -- process a single buffer during syncing.
!  *
!  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
!  * buffers marked recently used, as these are not replacement candidates.
!  *
!  * Returns a bitmask containing the following flag bits:
!  *	BUF_WRITTEN: we wrote the buffer.
!  *	BUF_REUSABLE: buffer is available for replacement, ie, it has
!  *	         pin count 0 and usage count 0.
!  *	BUF_CHECKPOINT: buffer has BM_CHECKPOINT_NEEDED set.
!  *
!  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
!  * after locking it, but we don't care all that much.)
!  *
!  * Note: caller must have done ResourceOwnerEnlargeBuffers.
!  */
! static int
! SyncOneBuffer(int buf_id, bool skip_recently_used)
! {
! 	volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
! 	int result;
! 
! 	result = SyncOneBufferPrepare(buf_id, skip_recently_used);
! 	if ((result & BUF_WRITTEN) == 0) {
! 		return result;
! 	}
  
  	FlushBuffer(bufHdr, NULL);
  
***************
*** 1625,1630 **** SyncOneBuffer(int buf_id, bool skip_recently_used)
--- 1948,1991 ----
  
  
  /*
+  * Prepare for writing out one buffer.
+  */
+ static int
+ WriteOneBufferPrepare(int buf_id, char *callerBuf, bool skip_recently_used)
+ {
+ 	volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
+ 	int result;
+ 	
+ 	result = SyncOneBufferPrepare(buf_id, skip_recently_used);
+ 	if ((result & BUF_WRITTEN) == 0) {
+ 		return result;
+ 	}
+ 	if (!FlushBufferPrepare(buf_id, bufHdr, callerBuf, NULL)) {
+ 		LWLockRelease(bufHdr->content_lock);
+ 		UnpinBuffer(bufHdr, true);
+ 		elog(LOG, "Somebody else flushed...");
+ 		result &= ~BUF_WRITTEN;
+ 	}
+ 	return result;
+ }
+ 
+ /*
+  * Release locks after writing out a buffer.
+  */
+ static void
+ WriteOneBufferDone(int buf_id, bool doUnpin)
+ {
+ 	volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
+ 
+ 	pgBufferUsage.shared_blks_written++;
+ 	TerminateBufferIO(bufHdr, true, 0);
+ 	LWLockRelease(bufHdr->content_lock);
+ 	if (doUnpin) {
+ 		UnpinBuffer(bufHdr, true);
+ 	}
+ }
+ 
+ /*
   *		AtEOXact_Buffers - clean up at end of transaction.
   *
   *		As of PostgreSQL 8.0, buffer pins should get released by the
***************
*** 1901,1906 **** FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
--- 2262,2319 ----
  }
  
  /*
+  * Prepare to flush out a buffer and record it on the writeList.
+  */
+ static bool
+ FlushBufferPrepare(int buf_id, volatile BufferDesc *buf, char *callerBuf,
+ 				   SMgrRelation reln)
+ {
+ 	XLogRecPtr	recptr;
+ 
+ 	Assert(LWLockHeldByMe(buf->content_lock));
+ 	/*
+ 	 * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
+ 	 * false, then someone else flushed the buffer before we could, so we need
+ 	 * not do anything.
+ 	 */
+ 	if (!StartBufferIO(buf, false))
+ 		return FALSE;
+ 
+ 	/* Find smgr relation for buffer */
+ 	if (reln == NULL)
+ 		reln = smgropen(buf->tag.rnode);
+ 
+ 	/*
+ 	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
+ 	 * rule that log updates must hit disk before any of the data-file changes
+ 	 * they describe do.
+ 	 */
+ 	recptr = BufferGetLSN(buf);
+ 	XLogFlush(recptr);
+ 
+ 	/*
+ 	 * Now it's safe to write buffer to disk. Note that no one else should
+ 	 * have been able to write it while we were busy with log flushing because
+ 	 * we have the io_in_progress lock.
+ 	 */
+ 
+ 	/* To check if block content changes while flushing. - vadim 01/17/97 */
+ 	LockBufHdr(buf);
+ 	buf->flags &= ~BM_JUST_DIRTIED;
+ 	UnlockBufHdr(buf);
+ 
+ 	writeList[writeIndex].buf_id = buf_id;
+ 	writeList[writeIndex].reln = reln;
+ 	writeList[writeIndex].forkNum = buf->tag.forkNum;
+ 	writeList[writeIndex].blockNum = buf->tag.blockNum;
+ 	writeList[writeIndex].buffer = (char *)BufHdrGetBlock(buf);
+ 	writeList[writeIndex].callerBuf = callerBuf;
+ 	writeIndex++;
+ 	return TRUE;
+ }
+ 
+ 
+ /*
   * RelationGetNumberOfBlocks
   *		Determines the current number of pages in the relation.
   */
***************
*** 2050,2055 **** PrintPinnedBufs(void)
--- 2463,2504 ----
  }
  #endif
  
+ /*
+  * Flush a buffer, doing it in a batch if doBatch is true.  Requires that
+  * bufHdr is already locked.
+  */
+ static void
+ FlushBufferBatched(volatile BufferDesc *bufHdr, SMgrRelation reln, bool doBatch)
+ {
+ 	PinBuffer_Locked(bufHdr);
+ 	if (doBatch) {
+ retry:
+ 		Assert(batched_buffer_writes > 0);
+ 		if (writeIndex == 0) {
+ 			LWLockAcquire(bufHdr->content_lock, LW_SHARED);
+ 		} else {
+ 			if (!LWLockConditionalAcquire(bufHdr->content_lock,
+ 										  LW_SHARED)) {
+ 				FlushBatch(true);
+ 				goto retry;
+ 			}
+ 		}
+ 		if (!FlushBufferPrepare(bufHdr->buf_id, bufHdr,
+ 								cksumBuf + writeIndex * BLCKSZ, reln)) {
+ 			LWLockRelease(bufHdr->content_lock);
+ 			UnpinBuffer(bufHdr, true);
+ 		}
+ 		if (writeIndex == batched_buffer_writes) {
+ 			FlushBatch(true);
+ 		}
+ 	} else {
+ 		LWLockAcquire(bufHdr->content_lock, LW_SHARED);
+ 		FlushBuffer(bufHdr, reln);
+ 		LWLockRelease(bufHdr->content_lock);
+ 		UnpinBuffer(bufHdr, true);
+ 	}
+ }
+ 
  /* ---------------------------------------------------------------------
   *		FlushRelationBuffers
   *
***************
*** 2070,2076 **** PrintPinnedBufs(void)
   * --------------------------------------------------------------------
   */
  void
! FlushRelationBuffers(Relation rel)
  {
  	int			i;
  	volatile BufferDesc *bufHdr;
--- 2519,2525 ----
   * --------------------------------------------------------------------
   */
  void
! FlushRelationBuffers(Relation rel, bool wasLogged)
  {
  	int			i;
  	volatile BufferDesc *bufHdr;
***************
*** 2110,2115 **** FlushRelationBuffers(Relation rel)
--- 2559,2566 ----
  		return;
  	}
  
+ 	AllocCksumBuf();
+ 	CheckDoubleWriteFile();
  	/* Make sure we can handle the pin inside the loop */
  	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
  
***************
*** 2120,2134 **** FlushRelationBuffers(Relation rel)
  		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
  			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
  		{
! 			PinBuffer_Locked(bufHdr);
! 			LWLockAcquire(bufHdr->content_lock, LW_SHARED);
! 			FlushBuffer(bufHdr, rel->rd_smgr);
! 			LWLockRelease(bufHdr->content_lock);
! 			UnpinBuffer(bufHdr, true);
  		}
  		else
  			UnlockBufHdr(bufHdr);
  	}
  }
  
  /* ---------------------------------------------------------------------
--- 2571,2591 ----
  		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
  			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
  		{
! 			/*
! 			 * Don't batch if the pages were not logged, since in
! 			 * that case we don't want to do double writes, even if
! 			 * the double write option is on.
! 			 */
! 			FlushBufferBatched(bufHdr, rel->rd_smgr, wasLogged && doubleWrites);
  		}
  		else
  			UnlockBufHdr(bufHdr);
  	}
+ 	if (wasLogged && doubleWrites && writeIndex > 0) {
+ 		/* Write out any incomplete batch */
+ 		FlushBatch(true);
+ 	}
+ 	FreeCksumBuf();
  }
  
  /* ---------------------------------------------------------------------
***************
*** 2152,2157 **** FlushDatabaseBuffers(Oid dbid)
--- 2609,2616 ----
  	int			i;
  	volatile BufferDesc *bufHdr;
  
+ 	AllocCksumBuf();
+ 	CheckDoubleWriteFile();
  	/* Make sure we can handle the pin inside the loop */
  	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
  
***************
*** 2162,2176 **** FlushDatabaseBuffers(Oid dbid)
  		if (bufHdr->tag.rnode.dbNode == dbid &&
  			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
  		{
! 			PinBuffer_Locked(bufHdr);
! 			LWLockAcquire(bufHdr->content_lock, LW_SHARED);
! 			FlushBuffer(bufHdr, NULL);
! 			LWLockRelease(bufHdr->content_lock);
! 			UnpinBuffer(bufHdr, true);
  		}
  		else
  			UnlockBufHdr(bufHdr);
  	}
  }
  
  /*
--- 2621,2636 ----
  		if (bufHdr->tag.rnode.dbNode == dbid &&
  			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
  		{
! 			FlushBufferBatched(bufHdr, NULL, doubleWrites);
  		}
  		else
  			UnlockBufHdr(bufHdr);
  	}
+ 	if (doubleWrites && writeIndex > 0) {
+ 		/* Write out any incomplete batch */
+ 		FlushBatch(true);
+ 	}
+ 	FreeCksumBuf();
  }
  
  /*
***************
*** 2583,2590 **** WaitIO(volatile BufferDesc *buf)
  static bool
  StartBufferIO(volatile BufferDesc *buf, bool forInput)
  {
- 	Assert(!InProgressBuf);
- 
  	for (;;)
  	{
  		/*
--- 3043,3048 ----
***************
*** 2623,2629 **** StartBufferIO(volatile BufferDesc *buf, bool forInput)
  
  	UnlockBufHdr(buf);
  
! 	InProgressBuf = buf;
  	IsForInput = forInput;
  
  	return true;
--- 3081,3087 ----
  
  	UnlockBufHdr(buf);
  
! 	InProgressBuf[InProgressIndex++] = buf;
  	IsForInput = forInput;
  
  	return true;
***************
*** 2650,2656 **** static void
  TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
  				  int set_flag_bits)
  {
! 	Assert(buf == InProgressBuf);
  
  	LockBufHdr(buf);
  
--- 3108,3115 ----
  TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
  				  int set_flag_bits)
  {
! 	/* BufferSync() will release batched buffers in reverse order */
! 	Assert(InProgressBuf[InProgressIndex-1] == buf);
  
  	LockBufHdr(buf);
  
***************
*** 2662,2669 **** TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
  
  	UnlockBufHdr(buf);
  
! 	InProgressBuf = NULL;
! 
  	LWLockRelease(buf->io_in_progress_lock);
  }
  
--- 3121,3128 ----
  
  	UnlockBufHdr(buf);
  
! 	InProgressIndex--;
! 	InProgressBuf[InProgressIndex] = NULL;
  	LWLockRelease(buf->io_in_progress_lock);
  }
  
***************
*** 2679,2688 **** TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
  void
  AbortBufferIO(void)
  {
! 	volatile BufferDesc *buf = InProgressBuf;
  
! 	if (buf)
! 	{
  		/*
  		 * Since LWLockReleaseAll has already been called, we're not holding
  		 * the buffer's io_in_progress_lock. We have to re-acquire it so that
--- 3138,3148 ----
  void
  AbortBufferIO(void)
  {
! 	volatile BufferDesc *buf;
! 	int i;
  
! 	for (i = InProgressIndex - 1; i >= 0; i--) {
! 		buf = InProgressBuf[i];
  		/*
  		 * Since LWLockReleaseAll has already been called, we're not holding
  		 * the buffer's io_in_progress_lock. We have to re-acquire it so that
***************
*** 2723,2728 **** AbortBufferIO(void)
--- 3183,3189 ----
  		}
  		TerminateBufferIO(buf, false, BM_IO_ERROR);
  	}
+ 	InProgressIndex = 0;
  }
  
  /*
*** a/src/backend/storage/file/fd.c
--- b/src/backend/storage/file/fd.c
***************
*** 1260,1265 **** retry:
--- 1260,1454 ----
  	return returnCode;
  }
  
+ struct io_iocb_common {
+ 	void *buf;
+ 	unsigned nbytes;
+ 	long long offset;
+ };
+ 	
+ struct iocb {
+ 	void *data;
+ 	int aio_fildes;
+     union {
+ 		struct io_iocb_common  c;
+ 	} u;
+ };
+ 
+ #if 0
+ struct iovec {
+ 	void *iov_base;
+ 	size_t iov_len;
+ };
+ #endif
+ 
+ /* Array of iocbs used for io_submit */
+ struct iocb ioc[MAX_BATCHED_WRITES];
+ int iocIndex = 0;
+ 
+ /* Array of iovecs use for iocbs and for writev.  One extra entry for
+  * double-write header. */
+ struct iovec iov[MAX_BATCHED_WRITES+1];
+ int iovIndex = 0;
+ 
+ static int
+ iovlen(struct iocb *ioc)
+ {
+ 	int i, len;
+ 	struct iovec *iov = (struct iovec *)ioc->u.c.buf;
+ 
+ 	len = 0;
+ 	for (i = 0; i < ioc->u.c.nbytes; i++) {
+ 		len += iov->iov_len;
+ 		iov++;
+ 	}
+ 	return len;
+ }
+ 
+ /*
+  * Flush existing batched IOs, doing a write to a temp area first if
+  * double_writes option is turned on.
+  */
+ static int
+ FlushIOs(File doubleWriteFile)
+ {
+ 	int returnCode;
+ 	int i, j;
+ 
+ 	if (doubleWriteFile > 0) {
+ 		returnCode = FileAccess(doubleWriteFile);
+ 		if (returnCode < 0)
+ 			return returnCode;
+ 		returnCode = lseek(VfdCache[doubleWriteFile].fd, 0, SEEK_SET);
+ 		if (returnCode < 0) {
+ 			elog(LOG, "lseek error errno %d, rc %d",
+ 				 errno, returnCode);
+ 			abort();
+ 			return returnCode;
+ 		}
+ 		/* Write out all the blocks sequentially in double-write file,
+ 		 * starting with the double-write file header describing all the
+ 		 * buffers.  Double-write file should be open with O_DIRECT. */
+ 		returnCode = writev(VfdCache[doubleWriteFile].fd, &iov[0], iovIndex);
+ 		if (returnCode < 0) {
+ 			elog(LOG, "writev to tmp, errno %d, rc %d, iovIndex %d",
+ 				 errno, returnCode, iovIndex);
+ 			abort();
+ 			return returnCode;
+ 		}
+ 		returnCode = fdatasync(VfdCache[doubleWriteFile].fd);
+ 		if (returnCode < 0) {
+ 			elog(LOG, "fdatasync error errno %d, rc %d", errno,
+ 				 returnCode);
+ 			abort();
+ 			return returnCode;
+ 		}
+ 	}
+ 
+ 	for (i = 0; i < iocIndex; i++) {
+ 		returnCode = lseek(ioc[i].aio_fildes, ioc[i].u.c.offset, SEEK_SET);
+ 		if (returnCode < 0) {
+ 			elog(LOG, "lseek error errno %d, rc %d, offset %Ld",
+ 				 errno, returnCode, ioc[i].u.c.offset);
+ 			abort();
+ 			return returnCode;
+ 		}
+ 		/* Use writev to do one IO per consecutive blocks to the disk. */
+ 		returnCode = writev(ioc[i].aio_fildes, ioc[i].u.c.buf,
+ 							ioc[i].u.c.nbytes);
+ 		if (returnCode < 0) {
+ 			elog(LOG, "writev error errno %d, rc %d", errno, returnCode);
+ 			abort();
+ 			return returnCode;
+ 		}
+ 	}
+ 	if (doubleWriteFile > 0) {
+ 		/* If we are doing double writes, must make sure that the
+ 		 * blocks was just wrote are forced to disk, so we can re-use
+ 		 * double-write buffer next time. */
+ 		for (i = 0; i < iocIndex; i++) {
+ 			for (j = 0; j < i; j++) {
+ 				if (ioc[j].aio_fildes == ioc[i].aio_fildes) {
+ 					break;
+ 				}
+ 			}
+ 			if (j == i) {
+ 				returnCode = fdatasync(ioc[i].aio_fildes);
+ 				if (returnCode < 0) {
+ 					elog(LOG, "fdatasync error errno %d, rc %d", errno,
+ 						 returnCode);
+ 			abort();
+ 					return returnCode;
+ 				}
+ 			}
+ 		}
+ 	}
+ 	return 0;
+ }	
+ 
+ #include "storage/smgr.h"
+ 
+ /*
+  * Write out a list of memory regions to locations in files, as specified
+  * in writeList.  If doubleWriteFile is >= 0, then also do double writes
+  * to the specified file (so full_page_writes can be avoided).
+  */
+ int
+ FileBwrite(int writeLen, struct SMgrWriteList *writeList,
+ 		   File doubleWriteFile, char *doubleBuf)
+ {
+ 	int			returnCode;
+ 	int i;
+ 
+ 	for (i = 0; i < writeLen; i++) {
+ 		struct SMgrWriteList *w = &(writeList[i]);
+ 
+ 		Assert(FileIsValid(w->fd));
+ 		DO_DB(elog(LOG, "FileBwrite: %d (%s) " INT64_FORMAT " %d %p",
+ 				   file, VfdCache[w->fd].fileName,
+ 				   (int64) VfdCache[w->fd].seekPos,
+ 				   w->len, w->callerBuf));
+ 
+ 		returnCode = FileAccess(w->fd);
+ 		if (returnCode < 0)
+ 			return returnCode;
+ 	}
+ 
+ 	iovIndex = 0;
+ 	iocIndex = 0;
+ 
+ 	if (doubleWriteFile > 0) {
+ 		/* Write out the block directory in 4K chunk at the beginning */
+ 		iov[iovIndex].iov_base = doubleBuf;
+ 		iov[iovIndex].iov_len = DOUBLE_WRITE_HEADER_SIZE;
+ 		iovIndex++;
+ 	}
+ 
+ 	for (i = 0; i < writeLen; i++) {
+ 		struct SMgrWriteList *w = &(writeList[i]);
+ 		struct iocb *last;
+ 
+ 		iov[iovIndex].iov_base = w->callerBuf;
+ 		iov[iovIndex].iov_len = w->len;
+ 		last = (iocIndex > 0) ? &(ioc[iocIndex - 1]) : NULL;
+ 		if (iocIndex > 0 && w->seekPos == last->u.c.offset + iovlen(last) &&
+ 			last->aio_fildes == VfdCache[w->fd].fd) {
+ 			last->u.c.nbytes++;
+ 		} else {
+ 			ioc[iocIndex].aio_fildes = VfdCache[w->fd].fd;
+ 			ioc[iocIndex].u.c.buf = (void *)&iov[iovIndex];
+ 			ioc[iocIndex].u.c.nbytes = 1;
+ 			ioc[iocIndex].u.c.offset = w->seekPos;
+ 			iocIndex++;
+ 		}
+ 		iovIndex++;
+ 		VfdCache[w->fd].seekPos = FileUnknownPos;
+ 	}
+ 
+ 	returnCode = FlushIOs(doubleWriteFile);
+ 
+ 	return returnCode;
+ }
+ 
  int
  FileSync(File file)
  {
*** a/src/backend/storage/lmgr/lwlock.c
--- b/src/backend/storage/lmgr/lwlock.c
***************
*** 80,87 **** NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
   * during error recovery.  The maximum size could be determined at runtime
   * if necessary, but it seems unlikely that more than a few locks could
   * ever be held simultaneously.
   */
! #define MAX_SIMUL_LWLOCKS	100
  
  static int	num_held_lwlocks = 0;
  static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
--- 80,90 ----
   * during error recovery.  The maximum size could be determined at runtime
   * if necessary, but it seems unlikely that more than a few locks could
   * ever be held simultaneously.
+  *
+  * Must be at least 2 * MAX_BATCHED_WRITES, since bgwriter holds 2
+  * lwlocks for each buffer that it is batching.
   */
! #define MAX_SIMUL_LWLOCKS	150
  
  static int	num_held_lwlocks = 0;
  static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
*** a/src/backend/storage/smgr/md.c
--- b/src/backend/storage/smgr/md.c
***************
*** 727,732 **** mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
--- 727,774 ----
  }
  
  /*
+  * Write out a list of buffers, as specified in writeList.  If
+  * doubleWriteFile is >= 0, then also do double writes to the specified
+  * file (so full_page_writes can be avoided).
+  */
+ void
+ mdbwrite(int writeLen, struct SMgrWriteList *writeList,
+ 		 File doubleWriteFile, char *doubleBuf, bool isTemp)
+ {
+ 	off_t		seekpos;
+ 	int			nbytes;
+ 	MdfdVec    *v;
+ 	File fd;
+ 	int i;
+ 
+ 	for (i = 0; i < writeLen; i++) {
+ 		struct SMgrWriteList *w = &(writeList[i]);
+ 
+ 		/* This assert is too expensive to have on normally ... */
+ #ifdef CHECK_WRITE_VS_EXTEND
+ 		Assert(w->blockNum < mdnblocks(w->reln, w->forknum));
+ #endif
+ 		v = _mdfd_getseg(w->reln, w->forkNum, w->blockNum, isTemp, EXTENSION_FAIL);
+ 		fd = v->mdfd_vfd;
+ 
+ 		seekpos = (off_t) BLCKSZ *(w->blockNum % ((BlockNumber) RELSEG_SIZE));
+ 
+ 		Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+ 		w->fd = fd;
+ 		w->seekPos = seekpos;
+ 		w->len = BLCKSZ;
+ 	}
+ 
+ 	nbytes = FileBwrite(writeLen, writeList, doubleWriteFile, doubleBuf);
+ 	if (nbytes < 0) {
+ 			ereport(ERROR,
+ 					(errcode_for_file_access(),
+ 					 errmsg("FileBwrite error %d", errno)));
+ 	}
+ }
+ 
+ 
+ /*
   *	mdnblocks() -- Get the number of blocks stored in a relation.
   *
   *		Important side effect: all active segments of the relation are opened
*** a/src/backend/storage/smgr/smgr.c
--- b/src/backend/storage/smgr/smgr.c
***************
*** 55,60 **** typedef struct f_smgr
--- 55,63 ----
  										  BlockNumber blocknum, char *buffer);
  	void		(*smgr_write) (SMgrRelation reln, ForkNumber forknum,
  							BlockNumber blocknum, char *buffer, bool isTemp);
+ 	void		(*smgr_bwrite) (int writeLen, struct SMgrWriteList *writeList,
+ 								File doubleWriteFile, char *doubleBuf,
+ 								bool isTemp);
  	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
  	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
  										   BlockNumber nblocks, bool isTemp);
***************
*** 68,74 **** typedef struct f_smgr
  static const f_smgr smgrsw[] = {
  	/* magnetic disk */
  	{mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
! 		mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
  		mdpreckpt, mdsync, mdpostckpt
  	}
  };
--- 71,77 ----
  static const f_smgr smgrsw[] = {
  	/* magnetic disk */
  	{mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
! 	 mdprefetch, mdread, mdwrite, mdbwrite, mdnblocks, mdtruncate, mdimmedsync,
  		mdpreckpt, mdsync, mdpostckpt
  	}
  };
***************
*** 92,97 **** static void smgrshutdown(int code, Datum arg);
--- 95,102 ----
  static void smgr_internal_unlink(RelFileNode rnode, ForkNumber forknum,
  					 int which, bool isTemp, bool isRedo);
  
+ /* Buffer used to write the double-write header */
+ static char doubleBuf[DOUBLE_WRITE_HEADER_SIZE];
  
  /*
   *	smgrinit(), smgrshutdown() -- Initialize or shut down storage
***************
*** 486,492 **** smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
   */
  void
  smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
! 		 char *buffer)
  {
  	PageHeader p = (PageHeader) buffer;
  
--- 491,497 ----
   */
  void
  smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
! 		 char *buffer, bool *cksumMismatch)
  {
  	PageHeader p = (PageHeader) buffer;
  
***************
*** 554,559 **** smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
--- 559,636 ----
  }
  
  /*
+  * Write out a list of buffers, as specified in writeList.  If
+  * doubleWriteFile is >= 0, then also do double writes to the specified
+  * file (so full_page_writes can be avoided).
+  */
+ void
+ smgrbwrite(int writeLen, struct SMgrWriteList *writeList,
+ 		   File doubleWriteFile, bool isTemp)
+ {
+ 	PageHeader p = NULL;
+ 	int i;
+ 
+ 	if (page_checksum) {
+ 		for (i = 0; i < writeLen; i++) {
+ 			struct SMgrWriteList *w = &(writeList[i]);
+ 			p = (PageHeader)w->callerBuf;
+ 			if (p == NULL) {
+ 				/* Use tempbuf in the 1-page case (BufferAlloc) */
+ 				Assert(writeLen == 1);
+ 				p = (PageHeader) tempbuf;
+ 				w->callerBuf = (char *)tempbuf;
+ 			}
+ 			((PageHeader)w->buffer)->cksum = 0;
+ 			p->cksum = CopyAndComputeChecksum((uint64 *)p,
+ 											  (uint64 *)w->buffer,
+ 											  BLCKSZ,
+ 											  ChecksumInit(w->reln,
+ 														   w->forkNum,
+ 														   w->blockNum));
+ 			Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION);
+ 		}
+ 	} else {
+ 		for (i = 0; i < writeLen; i++) {
+ 			p = (PageHeader)writeList[i].buffer;;
+ 			writeList[i].callerBuf = (char *)p;
+ 			p->cksum = INVALID_CKSUM;
+ 			Assert(PageGetPageLayoutVersion(p) == PG_PAGE_LAYOUT_VERSION);
+ 		}
+ 	}
+ 
+ 	if (doubleWriteFile > 0) {
+ 		/*
+ 		 * Set up the initial double-write page that lists all the
+ 		 * buffers that will be written to the double write file
+ 		 * This list includes the checksums of all the buffers and is
+ 		 * checksummed itself.
+ 		 */
+ 		struct DoubleBufHeader *hdr = (struct DoubleBufHeader *)doubleBuf;
+ 		struct DoubleBufItem *item = hdr->items;
+ 		for (i = 0; i < writeLen; i++) {
+ 			item->rnode = writeList[i].reln->smgr_rnode;
+ 			item->forkNum = writeList[i].forkNum;
+ 			item->blockNum = writeList[i].blockNum;
+ 			p = (PageHeader)writeList[i].callerBuf;
+ 			item->cksum = p->cksum;
+ 			item->pd_lsn = p->pd_lsn;
+ 			item++;
+ 		}
+ 		hdr->writeLen = writeLen;
+ 		hdr->cksum = ComputeChecksum((uint64 *)hdr, DOUBLE_WRITE_HEADER_SIZE, 0);
+ 	}
+ 
+ 	(*(smgrsw[writeList[0].reln->smgr_which].smgr_bwrite))(writeLen, writeList,
+ 														   doubleWriteFile, doubleBuf,
+ 														   isTemp);
+ 	if (doubleWriteFile > 0) {
+ 		/* Zero out part of header that we filled in. */
+ 		memset(doubleBuf, 0,
+ 			   (char *)&(((struct DoubleBufHeader *) doubleBuf)->items[writeLen]) - doubleBuf);
+ 	}
+ }
+ 
+ /*
   *	smgrnblocks() -- Calculate the number of blocks in the
   *					 supplied relation.
   */
***************
*** 672,674 **** smgrpostckpt(void)
--- 749,865 ----
  			(*(smgrsw[i].smgr_post_ckpt)) ();
  	}
  }
+ 
+ extern char *double_write_directory;
+ 
+ /*
+  * Return the name of the double write file.  Call must use pfree().
+  */
+ char *
+ smgr_doublewritefilename()
+ {
+ 	char *name;
+ 	if (double_write_directory != NULL) {
+ 		name = palloc(strlen(double_write_directory) + strlen("/double") + 1);
+ 		sprintf(name, "%s/double", double_write_directory);
+ 	} else {
+ 		name = pstrdup("base/double");
+ 	}
+ 	return name;
+ }
+ 
+ /*
+  * Called by postmaster at startup during recovery to read double-write
+  * file and see if there are any data blocks to be recovered.
+  */
+ void
+ smgr_recoverdoublewritefile()
+ {
+ 	char *name;
+ 	struct stat stat_buf;
+ 	File fd;
+ 	int r;
+ 	struct DoubleBufHeader *hdr;
+ 	int i;
+ 	uint32 savedCksum;
+ 
+ 	name = smgr_doublewritefilename();
+ 	if (stat(name, &stat_buf) == -1) {
+ 		elog(LOG, "No double-write file");
+ 		pfree(name);
+ 		return;
+ 	}
+ 	fd = PathNameOpenFile(name, O_RDONLY, S_IRUSR | S_IWUSR);
+ 	if (fd < 0) {
+ 		elog(PANIC, "Double-write file %s exists but can't be opened", name);
+ 	}
+ 	r = FileRead(fd, doubleBuf, DOUBLE_WRITE_HEADER_SIZE);
+ 	if (r < 0) {
+ 		goto badformat;
+ 	}
+ 	hdr = (struct DoubleBufHeader *)doubleBuf;
+ 	if (hdr->writeLen == 0 && hdr->cksum == 0) {
+ 		elog(LOG, "Double-write file %s is zeroed out", name);
+ 		FileClose(fd);
+ 		pfree(name);
+ 		return;
+ 	}
+ 	if (hdr->writeLen < 1 || hdr->writeLen > MAX_BATCHED_WRITES) {
+ 		goto badformat;
+ 	}
+ 	savedCksum = hdr->cksum;
+ 	hdr->cksum = 0;
+ 	if (savedCksum !=
+ 		ComputeChecksum((uint64 *)hdr, DOUBLE_WRITE_HEADER_SIZE, 0)) {
+ 		goto badformat;
+ 	}
+ 
+ 	for (i = 0; i < hdr->writeLen; i++) {
+ 		struct DoubleBufItem *it = &(hdr->items[i]);
+ 		SMgrRelation smgr;
+ 		bool mismatch;
+ 		PageHeader p;
+ 
+ 		/*
+ 		 * For each block described in double-write file header, see if
+ 		 * the block in the database file has a checksum mismatch.  If
+ 		 * so, restore the block from the double-write file if that entry
+ 		 * has correct checksum.
+ 		 */
+ 		smgr = smgropen(it->rnode);
+ 		mismatch = false;
+ 		smgrread(smgr, it->forkNum, it->blockNum, (char *)tempbuf, &mismatch);
+ 		if (mismatch) {
+ 			FileSeek(fd, DOUBLE_WRITE_HEADER_SIZE + i * BLCKSZ, SEEK_SET);
+ 			FileRead(fd, (char *)tempbuf, BLCKSZ);
+ 			p = (PageHeader)tempbuf;
+ 			savedCksum = p->cksum;
+ 			p->cksum = 0;
+ 			if (savedCksum != it->cksum ||
+ 				savedCksum != ComputeChecksum((uint64 *)tempbuf, BLCKSZ,
+ 											  ChecksumInit(smgr, it->forkNum, 
+ 														 it->blockNum))) {
+ 				elog(LOG, "Block %s/%d has checksum error, but can't be fixed, because slot %d of double-write file %s looks invalid",
+ 					 relpath(it->rnode, it->forkNum), it->blockNum, i, name);
+ 			} else {
+ 				Assert(XLByteEQ(p->pd_lsn, it->pd_lsn));
+ 				smgrwrite(smgr, it->forkNum, it->blockNum, (char *)tempbuf, false);
+ 				elog(LOG, "Fixed block %s/%d (which had checksum error) from double-write file %s slot %d",
+ 					 relpath(it->rnode, it->forkNum), it->blockNum, name, i);
+ 			}
+ 		} else {
+ 			elog(DEBUG1, "Skipping slot %d of double-write file because block %s/%d is correct",
+ 				 i, relpath(it->rnode, it->forkNum), it->blockNum);
+ 		}
+ 		smgrclose(smgr);
+ 	}
+ 	FileClose(fd);
+ 	pfree(name);
+ 	return;
+ 
+ badformat:
+ 	elog(LOG, "Double-write file %s has bad format", name);
+ 	FileClose(fd);
+ 	pfree(name);
+ 	return;
+ }
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 116,121 **** extern int	CommitSiblings;
--- 116,123 ----
  extern char *default_tablespace;
  extern char *temp_tablespaces;
  extern bool synchronize_seqscans;
+ bool doubleWrites;
+ char *double_write_directory;
  extern bool fullPageWrites;
  extern int	ssl_renegotiation_limit;
  
***************
*** 392,397 **** int			tcp_keepalives_idle;
--- 394,401 ----
  int			tcp_keepalives_interval;
  int			tcp_keepalives_count;
  
+ int			batched_buffer_writes = 0;
+ 
  /*
   * These variables are all dummies that don't do anything, except in some
   * cases provide the value for SHOW to display.  The real state is elsewhere
***************
*** 767,772 **** static struct config_bool ConfigureNamesBool[] =
--- 771,784 ----
  		true, NULL, NULL
  	},
  	{
+ 		{"double_writes", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+ 			gettext_noop("Page writes are written first to a temporary file before being written to the database file."),
+ 			NULL
+ 		},
+ 		&doubleWrites,
+ 		false, NULL, NULL
+ 	},
+ 	{
  		{"silent_mode", PGC_POSTMASTER, LOGGING_WHERE,
  			gettext_noop("Runs the server silently."),
  			gettext_noop("If this parameter is set, the server will automatically run in the "
***************
*** 2088,2093 **** static struct config_int ConfigureNamesInt[] =
--- 2100,2115 ----
  		1024, 100, 102400, NULL, NULL
  	},
  
+ 	{
+ 		{"batched_buffer_writes", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+ 			gettext_noop("Bgwriter writes buffers in batches."),
+ 			NULL,
+ 			GUC_NOT_IN_SAMPLE
+ 		},
+ 		&batched_buffer_writes,
+ 		0, 0, MAX_BATCHED_WRITES, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL
***************
*** 2668,2673 **** static struct config_string ConfigureNamesString[] =
--- 2690,2705 ----
  		"", assign_application_name, NULL
  	},
  
+ 	{
+ 		{"double_write_directory", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+ 			gettext_noop("Location of the double write file."),
+ 			NULL,
+ 			GUC_REPORT | GUC_NOT_IN_SAMPLE
+ 		},
+ 		&double_write_directory,
+ 		NULL, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL
*** a/src/backend/utils/resowner/resowner.c
--- b/src/backend/utils/resowner/resowner.c
***************
*** 515,521 **** ResourceOwnerEnlargeBuffers(ResourceOwner owner)
  
  	if (owner->buffers == NULL)
  	{
! 		newmax = 16;
  		owner->buffers = (Buffer *)
  			MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer));
  		owner->maxbuffers = newmax;
--- 515,523 ----
  
  	if (owner->buffers == NULL)
  	{
! 		/* This should be at least MAX_BATCHED_WRITES, since bgwriter can
! 		 * lock that many buffers at one time. */
! 		newmax = 64;
  		owner->buffers = (Buffer *)
  			MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer));
  		owner->maxbuffers = newmax;
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 119,125 **** extern void simple_heap_update(Relation relation, ItemPointer otid,
  extern void heap_markpos(HeapScanDesc scan);
  extern void heap_restrpos(HeapScanDesc scan);
  
! extern void heap_sync(Relation relation);
  
  extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr);
  extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec);
--- 119,125 ----
  extern void heap_markpos(HeapScanDesc scan);
  extern void heap_restrpos(HeapScanDesc scan);
  
! extern void heap_sync(Relation relation, bool wasLogged);
  
  extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr);
  extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec);
*** a/src/include/storage/bufmgr.h
--- b/src/include/storage/bufmgr.h
***************
*** 178,184 **** extern void PrintBufferLeakWarning(Buffer buffer);
  extern void CheckPointBuffers(int flags);
  extern BlockNumber BufferGetBlockNumber(Buffer buffer);
  extern BlockNumber RelationGetNumberOfBlocks(Relation relation);
! extern void FlushRelationBuffers(Relation rel);
  extern void FlushDatabaseBuffers(Oid dbid);
  extern void DropRelFileNodeBuffers(RelFileNode rnode, ForkNumber forkNum,
  					   bool istemp, BlockNumber firstDelBlock);
--- 178,184 ----
  extern void CheckPointBuffers(int flags);
  extern BlockNumber BufferGetBlockNumber(Buffer buffer);
  extern BlockNumber RelationGetNumberOfBlocks(Relation relation);
! extern void FlushRelationBuffers(Relation rel, bool wasLogged);
  extern void FlushDatabaseBuffers(Oid dbid);
  extern void DropRelFileNodeBuffers(RelFileNode rnode, ForkNumber forkNum,
  					   bool istemp, BlockNumber firstDelBlock);
***************
*** 211,214 **** extern void AtProcExit_LocalBuffers(void);
--- 211,216 ----
  extern BufferAccessStrategy GetAccessStrategy(BufferAccessStrategyType btype);
  extern void FreeAccessStrategy(BufferAccessStrategy strategy);
  
+ extern void Bufmgr_DoubleWriteInit(void);
+ 
  #endif
*** a/src/include/storage/fd.h
--- b/src/include/storage/fd.h
***************
*** 53,63 **** typedef int File;
--- 53,67 ----
  /* GUC parameter */
  extern int	max_files_per_process;
  
+ #define MAX_BATCHED_WRITES		64
+ #define DOUBLE_WRITE_HEADER_SIZE 4096
  
  /*
   * prototypes for functions in fd.c
   */
  
+ struct SMgrWriteList;
+ 
  /* Operations on virtual Files --- equivalent to Unix kernel file ops */
  extern File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode);
  extern File OpenTemporaryFile(bool interXact);
***************
*** 66,71 **** extern int	FilePrefetch(File file, off_t offset, int amount);
--- 70,77 ----
  extern int	FileRead(File file, char *buffer, int amount);
  extern int	FileWrite(File file, char *buffer, int amount);
  extern int	FileSync(File file);
+ extern int FileBwrite(int writeLen, struct SMgrWriteList *writeList,
+                       File doubleWriteFile, char *doubleBuf);
  extern off_t FileSeek(File file, off_t offset, int whence);
  extern int	FileTruncate(File file, off_t offset);
  extern char *FilePathName(File file);
*** a/src/include/storage/smgr.h
--- b/src/include/storage/smgr.h
***************
*** 16,21 ****
--- 16,22 ----
  
  #include "access/xlog.h"
  #include "fmgr.h"
+ #include "fd.h"
  #include "storage/block.h"
  #include "storage/relfilenode.h"
  
***************
*** 68,73 **** typedef struct SMgrRelationData
--- 69,115 ----
  
  typedef SMgrRelationData *SMgrRelation;
  
+ /*
+  * Element of an array specifying a buffer write in a list of buffer
+  * writes being batched.
+  */
+ struct SMgrWriteList {
+ 	int buf_id;
+ 	SMgrRelation reln;
+ 	ForkNumber forkNum;
+ 	BlockNumber blockNum;
+ 	char *buffer;
+ 	char *callerBuf;
+ 
+ 	/* Filled in by mdawrite */
+ 	File fd;
+ 	off_t seekPos;
+ 	int len;
+ };
+ 
+ /*
+  * Description of one buffer in the double-write file, as listed in the
+  * header.
+  */
+ struct DoubleBufItem {
+ 	/* Specification of where this buffer is in the database */
+ 	RelFileNode rnode;			/* physical relation identifier */
+ 	ForkNumber	forkNum;
+ 	BlockNumber blockNum;		/* blknum relative to begin of reln */
+ 	/* Checksum of the buffer. */
+ 	int32 cksum;
+ 	/* LSN of the buffer */
+ 	XLogRecPtr pd_lsn;
+ };
+ 
+ /* Format of the header of the double-write file */
+ struct DoubleBufHeader {
+ 	uint32 cksum;
+ 	int32 writeLen;
+ 	uint32 pad1;
+ 	uint32 pad2;
+ 	struct DoubleBufItem items[0];
+ };
  
  extern void smgrinit(void);
  extern SMgrRelation smgropen(RelFileNode rnode);
***************
*** 84,92 **** extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
  extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum,
  			 BlockNumber blocknum);
  extern void smgrread(SMgrRelation reln, ForkNumber forknum,
! 		 BlockNumber blocknum, char *buffer);
  extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
  		  BlockNumber blocknum, char *buffer, bool isTemp);
  extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
  extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum,
  			 BlockNumber nblocks, bool isTemp);
--- 126,136 ----
  extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum,
  			 BlockNumber blocknum);
  extern void smgrread(SMgrRelation reln, ForkNumber forknum,
!                      BlockNumber blocknum, char *buffer, bool *cksumMismatch);
  extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
  		  BlockNumber blocknum, char *buffer, bool isTemp);
+ extern void smgrbwrite(int writeLen, struct SMgrWriteList *writeList,
+ 					   File doubleWriteFile, bool isTemp);
  extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
  extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum,
  			 BlockNumber nblocks, bool isTemp);
***************
*** 94,99 **** extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
--- 138,145 ----
  extern void smgrpreckpt(void);
  extern void smgrsync(void);
  extern void smgrpostckpt(void);
+ extern char *smgr_doublewritefilename(void);
+ extern void smgr_recoverdoublewritefile(void);
  
  
  /* internals: move me elsewhere -- ay 7/94 */
***************
*** 112,117 **** extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
--- 158,165 ----
  	   char *buffer);
  extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
  		BlockNumber blocknum, char *buffer, bool isTemp);
+ extern void mdbwrite(int writeLen, struct SMgrWriteList *writeList,
+ 					 File doubleWriteFile, char *doublebuf, bool isTemp);
  extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
  extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
  		   BlockNumber nblocks, bool isTemp);
#2David Fetter
david@fetter.org
In reply to: David Fetter (#1)
Re: WIP(!) Double Writes

On Wed, Jan 04, 2012 at 10:19:16PM -0800, David Fetter wrote:

Folks,

Please find attached two patches, each under the PostgreSQL license,
one which implements page checksums vs. REL9_0_STABLE, the other which
depends on the first (i.e. requires that it be applied first) and
implements double writes. They're vs. REL9_0_STABLE because they're
extracted from vPostgres 1.0, a proprietary product currently based on
PostgreSQL 9.0.

I had wanted the first patch set to be:

- Against git head, and
- Based on feedback from Simon's patch.

Simon's now given some feedback during a fruitful discussion, and has
sent an updated checksum patch which will be the basis for the
double-write stuff Dan's working on.

Stay tuned!

Cheers,
David.
--
David Fetter <david@fetter.org> http://fetter.org/
Phone: +1 415 235 3778 AIM: dfetter666 Yahoo!: dfetter
Skype: davidfetter XMPP: david.fetter@gmail.com
iCal: webcal://www.tripit.com/feed/ical/people/david74/tripit.ics

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate

#3Greg Smith
greg@2ndQuadrant.com
In reply to: David Fetter (#1)
Re: WIP(!) Double Writes

On 1/5/12 1:19 AM, David Fetter wrote:

To achieve efficiency, the checkpoint writer and bgwriter should batch
writes to multiple pages together. Currently, there is an option
"batched_buffer_writes" that specifies how many buffers to batch at a
time. However, we may want to remove that option from view, and just
force batched_buffer_writes to a default (32) if double_writes is
enabled.

The idea that PostgreSQL has better information about how to batch
writes than the layers below it is controversial, and has failed to
match expectations altogether for me in many cases. The nastiest
regressions here I ran into were in VACUUM, where the ring buffer
implementation means the database has extremely limited room to work.
Just dumping the whole write mess of that into a large OS cache as
quickly as possible, and letting it sort things out, was dramatically
faster in some of my test cases. If you don't have one already, I'd
recommend adding a performance test that dirties a lot of pages and then
runs VACUUM against them to your test suite. Since you're not crippling
the OS cache to the same extent I was the problem may not be so bad, but
it's something worth checking.

I scribbled some notes on this problem area at
http://blog.2ndquadrant.com/en/2011/01/tuning-linux-for-low-postgresq.html
; the links that are broken due to our web site being rearranged are now
at http://highperfpostgres.com/pgbench-results/index.htm (test summary)
and http://www.highperfpostgres.com/pgbench-results/435/index.html
(Really bad latency spike example)

Given the batching functionality, double writes by the checkpoint
writer (and bgwriter) is implemented efficiently by writing a batch of
pages to the double-write file and fsyncing, and then writing the
pages to the appropriate data files, and fsyncing all the necessary
data files. While the data fsyncing might be viewed as expensive, it
does help eliminate a lot of the fsync overhead at the end of
checkpoints. FlushRelationBuffers() and FlushDatabaseBuffers() can be
similarly batched.

There's a fundamental struggle here between latency and throughput. The
longer you delay between writes and their subsequent sync, the more the
OS gets a chance to reorder and combine them for better throughput.
Ditto for any storage level optimizations, controller write caches and
the like. All that increases throughput, and more batching helps move
in that direction. But when you overload those caches and writes won't
squeeze into them anymore...now there's a latency spike. And as
throughput increases, with it goes the amount of dirty cache that needs
to be cleared per unit of time.

Eventually, all this disk I/O turns into a series of random writes. You
can postpone those in various ways, resequence them in ways that help
some tests. But if they're the true bottleneck, eventually all caches
will fill, and clients will be stuck waiting for them. And it's hard to
imagine anything that causes the amount of data written to increase to
ever move that problem in the right direction for the worst case.
Adjusting the sync sequence just moves the problem to somewhere else.
If you get lucky, that's a better place most of the time; how that bet
turns out will be very workload dependent though. I've lost a lot of
those bets when trying to resequence syncs in the last two years, where
benefits were extremely test dependent.

We have some other code (not included) that sorts buffers to be
checkpointed in file/block order -- this can reduce fsync overhead
further by ensuring that each batch writes to only one or a few data
files.

Again, the database doesn't necessarily have the information to make
this level of decision better than the underlying layers do. We've been
through two runs at this idea already that ended inconclusively. The
one I did last year you can see at
http://highperfpostgres.com/pgbench-results/index.htm ; set 9 and 11 are
the same test without (9) and with (11) write sorting. If there's
really a difference there, it's below the noise floor as far as I could
see. Whether sorting helps or hurts is both workload and hardware
dependent.

As Jignesh has mentioned on this list, we see significant performance
gains when enabling double writes& disabling full_page_writes for
OLTP runs with sufficient buffer cache size. We are now trying to
measure some runs where the dirty buffer eviction rate by the backends
is high.

We'd need to have positive results published along with a publicly
reproducible benchmark to go at this usefully. I aimed for a much
smaller goal than this in a similar area, around this same time last
year. I didn't get very far down that path before 9.1 development
closed; it just takes too long to run enough benchmarks to really
validate performance code in the write path. This is a pretty obtrusive
change to drop into the codebase for 9.2 at this point in the
development cycle.

P.S. I got the impression you're testing these changes primarily against
a modified 9.0. One of the things that came out of the 9.1 performance
testing was the "compact fsync queue" modification. That significant
improvement rippled out enough that several things that used to matter
in my tests didn't anymore, once it was committed. If your baseline
doesn't include that feature already, you may have an uphill battle to
prove any performance gains you've been seeing will still happen in the
current 9.2 code. Performance for that version has advanced even
further forward in ways 9.0 can't emulate.

--
Greg Smith 2ndQuadrant US greg@2ndQuadrant.com Baltimore, MD
PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.com