*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 42,47 ****
--- 42,48 ----
  #include "postmaster/startup.h"
  #include "replication/walreceiver.h"
  #include "replication/walsender.h"
+ #include "storage/barrier.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
***************
*** 272,277 **** static XLogRecPtr RedoRecPtr;
--- 273,290 ----
   */
  static XLogRecPtr RedoStartLSN = {0, 0};
  
+ /*
+  * We have one of these for each backend, plus one that is shared by all
+  * auxiliary processes. WALAuxSlotLock is used to coordinate access to the
+  * shared slot.
+  */
+ typedef struct
+ {
+ 	XLogRecPtr	CurrPos;	/* current position this process is inserting to */
+ } BackendXLogInsertSlot;
+ 
+ #define NumXLogInsertSlots	(MaxBackends + 1)
+ 
  /*----------
   * Shared-memory data structures for XLOG control
   *
***************
*** 282,292 **** static XLogRecPtr RedoStartLSN = {0, 0};
   * slightly different functions.
   *
   * We do a lot of pushups to minimize the amount of access to lockable
!  * shared memory values.  There are actually three shared-memory copies of
   * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
   *		XLogCtl->LogwrtResult is protected by info_lck
   *		XLogCtl->Write.LogwrtResult is protected by WALWriteLock
-  *		XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
   * One must hold the associated lock to read or write any of these, but
   * of course no lock is needed to read/write the unshared LogwrtResult.
   *
--- 295,304 ----
   * slightly different functions.
   *
   * We do a lot of pushups to minimize the amount of access to lockable
!  * shared memory values.  There are actually two shared-memory copies of
   * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
   *		XLogCtl->LogwrtResult is protected by info_lck
   *		XLogCtl->Write.LogwrtResult is protected by WALWriteLock
   * One must hold the associated lock to read or write any of these, but
   * of course no lock is needed to read/write the unshared LogwrtResult.
   *
***************
*** 296,307 **** static XLogRecPtr RedoStartLSN = {0, 0};
   * is that it can be examined/modified by code that already holds WALWriteLock
   * without needing to grab info_lck as well.
   *
!  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
!  * but is updated when convenient.	Again, it exists for the convenience of
!  * code that is already holding WALInsertLock but not the other locks.
!  *
!  * The unshared LogwrtResult may lag behind any or all of these, and again
!  * is updated when convenient.
   *
   * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
   * (protected by info_lck), but we don't need to cache any copies of it.
--- 308,315 ----
   * is that it can be examined/modified by code that already holds WALWriteLock
   * without needing to grab info_lck as well.
   *
!  * The unshared LogwrtResult may lag behind either or both of these, and is
!  * updated when convenient.
   *
   * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
   * (protected by info_lck), but we don't need to cache any copies of it.
***************
*** 311,320 **** static XLogRecPtr RedoStartLSN = {0, 0};
   * values is "more up to date".
   *
   * info_lck is only held long enough to read/update the protected variables,
!  * so it's a plain spinlock.  The other locks are held longer (potentially
!  * over I/O operations), so we use LWLocks for them.  These locks are:
   *
!  * WALInsertLock: must be held to insert a record into the WAL buffers.
   *
   * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
   * XLogFlush).
--- 319,333 ----
   * values is "more up to date".
   *
   * info_lck is only held long enough to read/update the protected variables,
!  * so it's a plain spinlock.  insertpos_lck protects the current logical
!  * insert location, ie. the head of reserved WAL space.  The other locks are
!  * held longer (potentially over I/O operations), so we use LWLocks for them.
!  * These locks are:
   *
!  * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
!  * This is only held while initializing and changing the mapping. If the
!  * contents of the buffer being replaced haven't been written yet, the mapping
!  * lock is released while the write is done, and reacquired afterwards.
   *
   * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
   * XLogFlush).
***************
*** 326,331 **** static XLogRecPtr RedoStartLSN = {0, 0};
--- 339,392 ----
   * only one checkpointer at a time; currently, with all checkpoints done by
   * the checkpointer, this is just pro forma).
   *
+  *
+  *
+  * Inserting a new WAL record is a two-step process:
+  *
+  * 1. Reserve the right amount of space from the WAL. The current head
+  *    of reserved space is kept in Insert->CurrPos, and is protected by
+  *    infopos_lck. Try to keep this section as short as possible, infopos_lck
+  *    can be heavily contended on a busy system
+  *
+  * 2. Copy the record to the reserved WAL space. This involves finding the
+  *    correct WAL buffer containing the reserved space, and copying the
+  *    record in place. This can be done concurrently in multiple processes.
+  *
+  * To allow as much parallelism as possible for step 2, we try hard to avoid
+  * lock contention in that code path. Each backend has its own "XLog insertion
+  * slot", which is used to indicate the position the backend is writing to.
+  * The slot is marked as in-use in step 1, while holding infopos_lck, by
+  * setting the position field in the slot. When the backend is finished with
+  * the insertion, it can clear its slot without a lock.
+  *
+  * Before 9.2, WAL insertion was serialized on one big lock, so that once
+  * you finished inserting your record you knew that all the previous records
+  * were inserted too. That is no longer true, there can be insertions to
+  * earlier positions still in-progress when your insertion finishes. To wait
+  * for them to finish, use WaitXLogInsertionsToFinish(). It polls (FIXME:
+  * busy-waiting is bad) the array of per-backend XLog insertion slots until
+  * it sees that all of the insertions to earlier locations have finished.
+  *
+  *
+  * Deadlock analysis
+  * -----------------
+  *
+  * It's important to call WaitXLogInsertionsToFinish() *before* acquiring
+  * WALWriteLock. Otherwise you might get stuck waiting for a backend to finish
+  * (or at least advance to next uninitialized page), while you're holding
+  * WALWriteLock. That would be bad, because the backend you're waiting for
+  * might need to acquire WALWriteLock, too, to evict an old buffer, so you'd
+  * get deadlock.
+  *
+  * WaitXLogInsertionsToFinish() will not get stuck indefinitely, as long as
+  * its called with a location that's known to be already allocated in the WAL
+  * buffers. Calling it with the position of a record you've already inserted
+  * satisfies that condition. It can't get stuck, because an insertion to a
+  * WAL page that's already initialized in cache can always proceed without
+  * waiting on a lock. However, if the page has *just* been initialized, the
+  * insertion might still briefly acquire WALBufMappingLock to observe that
+  * fact.
+  *
   *----------
   */
  
***************
*** 346,356 **** typedef struct XLogwrtResult
   */
  typedef struct XLogCtlInsert
  {
! 	XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
! 	XLogRecPtr	PrevRecord;		/* start of previously-inserted record */
! 	int			curridx;		/* current block index in cache */
! 	XLogPageHeader currpage;	/* points to header of block in cache */
! 	char	   *currpos;		/* current insertion point in cache */
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  
--- 407,431 ----
   */
  typedef struct XLogCtlInsert
  {
! 	slock_t		insertpos_lck;	/* protects all the fields in this struct. */
! 
! 	/*
! 	 * CurrPos is the very tip of the reserved WAL space at the moment. The
! 	 * next record will be inserted there (or somewhere after it if there's
! 	 * not enough space on the current page). PrevRecord points to the
! 	 * beginning of the last record already reserved. It might not be fully
! 	 * copied into place yet, but we know its exact location.
! 	 */
! 	XLogRecPtr	CurrPos;
! 	XLogRecPtr	PrevRecord;
! 
! 	/*
! 	 * padding to push RedoRecPtr and forcePageWrites, which rarely change,
! 	 * to a different cache line than the rapidly-changing CurrPos and
! 	 * PrevRecord values.
! 	 */
! 	char		padding[128];
! 
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  
***************
*** 381,389 **** typedef struct XLogCtlWrite
   */
  typedef struct XLogCtlData
  {
! 	/* Protected by WALInsertLock: */
  	XLogCtlInsert Insert;
  
  	/* Protected by info_lck: */
  	XLogwrtRqst LogwrtRqst;
  	XLogwrtResult LogwrtResult;
--- 456,466 ----
   */
  typedef struct XLogCtlData
  {
! 	/* Protected by insertpos_lck: */
  	XLogCtlInsert Insert;
  
+ 	BackendXLogInsertSlot *BackendXLogInsertSlots;
+ 
  	/* Protected by info_lck: */
  	XLogwrtRqst LogwrtRqst;
  	XLogwrtResult LogwrtResult;
***************
*** 397,405 **** typedef struct XLogCtlData
  	XLogCtlWrite Write;
  
  	/*
  	 * These values do not change after startup, although the pointed-to pages
  	 * and xlblocks values certainly do.  Permission to read/write the pages
! 	 * and xlblocks values depends on WALInsertLock and WALWriteLock.
  	 */
  	char	   *pages;			/* buffers for unwritten XLOG pages */
  	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
--- 474,492 ----
  	XLogCtlWrite Write;
  
  	/*
+ 	 * To change curridx and the identity of a buffer, you need to hold
+ 	 * WALBufMappingLock. To change the identity of a buffer that's
+ 	 * still dirty, the old page needs to be written out first, and for that
+ 	 * you need WALWriteLock, and you need to ensure that there's no
+ 	 * in-progress insertions to the page by calling
+ 	 * WaitXLogInsertionsToFinish().
+ 	 */
+ 	int			curridx;		/* current (latest) block index in cache */
+ 
+ 	/*
  	 * These values do not change after startup, although the pointed-to pages
  	 * and xlblocks values certainly do.  Permission to read/write the pages
! 	 * and xlblocks values depends on WALBufMappingLock and WALWriteLock.
  	 */
  	char	   *pages;			/* buffers for unwritten XLOG pages */
  	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
***************
*** 468,497 **** static XLogCtlData *XLogCtl = NULL;
  static ControlFileData *ControlFile = NULL;
  
  /*
!  * Macros for managing XLogInsert state.  In most cases, the calling routine
!  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
!  * so these are passed as parameters instead of being fetched via XLogCtl.
   */
! 
! /* Free space remaining in the current xlog page buffer */
! #define INSERT_FREESPACE(Insert)  \
! 	(XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
! 
! /* Construct XLogRecPtr value for current insertion point */
! #define INSERT_RECPTR(recptr,Insert,curridx)  \
! 	( \
! 	  (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
! 	  (recptr).xrecoff = \
! 		XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
! 	)
! 
! #define PrevBufIdx(idx)		\
! 		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
  
  #define NextBufIdx(idx)		\
  		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
  /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
   * See discussion above.
   */
--- 555,583 ----
  static ControlFileData *ControlFile = NULL;
  
  /*
!  * Calculate the amount of space left on the page after 'endptr'.
!  * Beware multiple evaluation!
   */
! #define INSERT_FREESPACE(endptr)	\
! 	(((endptr).xrecoff % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr).xrecoff % XLOG_BLCKSZ))
  
  #define NextBufIdx(idx)		\
  		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
  /*
+  * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+  * would hold if it was in cache, the page containing 'recptr'.
+  *
+  * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
+  * page is taken to mean the previous page.
+  */
+ #define XLogRecPtrToBufIdx(recptr)	\
+ 	((((((uint64) (recptr).xlogid * (uint64) XLogSegsPerFile * XLogSegSize) + (recptr).xrecoff)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
+ 
+ #define XLogRecEndPtrToBufIdx(recptr)	\
+ 	((((((uint64) (recptr).xlogid * (uint64) XLogSegsPerFile * XLogSegSize) + (recptr).xrecoff - 1)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
+ 
+ /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
   * See discussion above.
   */
***************
*** 614,620 **** static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg);
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(bool new_segment);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
  static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
--- 700,706 ----
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static void AdvanceXLInsertBuffer(bool new_segment, XLogRecPtr upto, bool opportunistic);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
  static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
***************
*** 663,668 **** static bool read_backup_label(XLogRecPtr *checkPointLoc,
--- 749,766 ----
  static void rm_redo_error_callback(void *arg);
  static int	get_sync_bit(int method);
  
+ static XLogRecPtr PerformXLogInsert(int write_len, XLogRecord *rechdr,
+ 				  XLogRecData *rdata, pg_crc32 rdata_crc,
+ 				  bool forcePageWrites);
+ static bool ReserveXLogInsertLocation(int reqsize, bool forcePageWrites,
+ 						  XLogRecPtr *PrevRecord, XLogRecPtr *StartPos,
+ 						  XLogRecPtr *EndPos,
+ 						  volatile BackendXLogInsertSlot *myslot);
+ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto,
+ 						   XLogRecPtr CurrPos);
+ static char *GetXLogBuffer(XLogRecPtr ptr);
+ static XLogRecPtr AdvanceXLogRecPtrToNextPage(XLogRecPtr ptr);
+ 
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 683,695 **** XLogRecPtr
  XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
- 	XLogRecord *record;
- 	XLogContRecord *contrecord;
  	XLogRecPtr	RecPtr;
- 	XLogRecPtr	WriteRqst;
- 	uint32		freespace;
- 	int			curridx;
  	XLogRecData *rdt;
  	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
  	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
  	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
--- 781,789 ----
  XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	RecPtr;
  	XLogRecData *rdt;
+ 	XLogRecData *rdt_lastnormal;
  	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
  	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
  	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
***************
*** 701,709 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  	uint32		len,
  				write_len;
  	unsigned	i;
- 	bool		updrqst;
  	bool		doPageWrites;
  	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
  
  	/* cross-check on whether we should be here or not */
  	if (!XLogInsertAllowed())
--- 795,805 ----
  	uint32		len,
  				write_len;
  	unsigned	i;
  	bool		doPageWrites;
+ 	bool		forcePageWrites;
  	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+ 	uint8		info_final;
+ 	XLogRecord	rechdr;
  
  	/* cross-check on whether we should be here or not */
  	if (!XLogInsertAllowed())
***************
*** 726,751 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  		return RecPtr;
  	}
  
  	/*
  	 * Here we scan the rdata chain, determine which buffers must be backed
! 	 * up, and compute the CRC values for the data.  Note that the record
! 	 * header isn't added into the CRC initially since we don't know the final
! 	 * length or info bits quite yet.  Thus, the CRC will represent the CRC of
! 	 * the whole record in the order "rdata, then backup blocks, then record
! 	 * header".
  	 *
  	 * We may have to loop back to here if a race condition is detected below.
  	 * We could prevent the race by doing all this work while holding the
  	 * insert lock, but it seems better to avoid doing CRC calculations while
! 	 * holding the lock.  This means we have to be careful about modifying the
! 	 * rdata chain until we know we aren't going to loop back again.  The only
! 	 * change we allow ourselves to make earlier is to set rdt->data = NULL in
! 	 * chain items we have decided we will have to back up the whole buffer
! 	 * for.  This is OK because we will certainly decide the same thing again
! 	 * for those items if we do it over; doing it here saves an extra pass
! 	 * over the chain later.
  	 */
  begin:;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
  		dtbuf[i] = InvalidBuffer;
--- 822,850 ----
  		return RecPtr;
  	}
  
+ 	/* TODO */
+ 	if (isLogSwitch)
+ 	{
+ 		elog(LOG, "log switch not implemented");
+ 		return InvalidXLogRecPtr;
+ 	}
+ 
  	/*
  	 * Here we scan the rdata chain, determine which buffers must be backed
! 	 * up.
  	 *
  	 * We may have to loop back to here if a race condition is detected below.
  	 * We could prevent the race by doing all this work while holding the
  	 * insert lock, but it seems better to avoid doing CRC calculations while
! 	 * holding the lock.
! 	 *
! 	 * To avoid modifying the original rdata chain, we copy it into
! 	 * rdata_final. Later we will also add entries for the backup blocks into
! 	 * rdata_final, so that they don't need any special treatment in the
! 	 * critical section where the chunks are copied into the WAL buffers.
  	 */
  begin:;
+ 	info_final = info;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
  		dtbuf[i] = InvalidBuffer;
***************
*** 758,766 **** begin:;
  	 * don't yet have the insert lock, forcePageWrites could change under us,
  	 * but we'll recheck it once we have the lock.
  	 */
! 	doPageWrites = fullPageWrites || Insert->forcePageWrites;
  
- 	INIT_CRC32(rdata_crc);
  	len = 0;
  	for (rdt = rdata;;)
  	{
--- 857,865 ----
  	 * don't yet have the insert lock, forcePageWrites could change under us,
  	 * but we'll recheck it once we have the lock.
  	 */
! 	forcePageWrites = Insert->forcePageWrites;
! 	doPageWrites = fullPageWrites || forcePageWrites;
  
  	len = 0;
  	for (rdt = rdata;;)
  	{
***************
*** 768,774 **** begin:;
  		{
  			/* Simple data, just include it */
  			len += rdt->len;
- 			COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  		}
  		else
  		{
--- 867,872 ----
***************
*** 779,790 **** begin:;
  				{
  					/* Buffer already referenced by earlier chain item */
  					if (dtbuf_bkp[i])
  						rdt->data = NULL;
  					else if (rdt->data)
- 					{
  						len += rdt->len;
- 						COMP_CRC32(rdata_crc, rdt->data, rdt->len);
- 					}
  					break;
  				}
  				if (dtbuf[i] == InvalidBuffer)
--- 877,888 ----
  				{
  					/* Buffer already referenced by earlier chain item */
  					if (dtbuf_bkp[i])
+ 					{
  						rdt->data = NULL;
+ 						rdt->len = 0;
+ 					}
  					else if (rdt->data)
  						len += rdt->len;
  					break;
  				}
  				if (dtbuf[i] == InvalidBuffer)
***************
*** 796,807 **** begin:;
  					{
  						dtbuf_bkp[i] = true;
  						rdt->data = NULL;
  					}
  					else if (rdt->data)
- 					{
  						len += rdt->len;
- 						COMP_CRC32(rdata_crc, rdt->data, rdt->len);
- 					}
  					break;
  				}
  			}
--- 894,903 ----
  					{
  						dtbuf_bkp[i] = true;
  						rdt->data = NULL;
+ 						rdt->len = 0;
  					}
  					else if (rdt->data)
  						len += rdt->len;
  					break;
  				}
  			}
***************
*** 814,852 **** begin:;
  			break;
  		rdt = rdt->next;
  	}
! 
! 	/*
! 	 * Now add the backup block headers and data into the CRC
! 	 */
! 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
! 	{
! 		if (dtbuf_bkp[i])
! 		{
! 			BkpBlock   *bkpb = &(dtbuf_xlg[i]);
! 			char	   *page;
! 
! 			COMP_CRC32(rdata_crc,
! 					   (char *) bkpb,
! 					   sizeof(BkpBlock));
! 			page = (char *) BufferGetBlock(dtbuf[i]);
! 			if (bkpb->hole_length == 0)
! 			{
! 				COMP_CRC32(rdata_crc,
! 						   page,
! 						   BLCKSZ);
! 			}
! 			else
! 			{
! 				/* must skip the hole */
! 				COMP_CRC32(rdata_crc,
! 						   page,
! 						   bkpb->hole_offset);
! 				COMP_CRC32(rdata_crc,
! 						   page + (bkpb->hole_offset + bkpb->hole_length),
! 						   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
! 			}
! 		}
! 	}
  
  	/*
  	 * NOTE: We disallow len == 0 because it provides a useful bit of extra
--- 910,917 ----
  			break;
  		rdt = rdt->next;
  	}
! 	/* Remember that this was the last regular rdata entry */
! 	rdt_lastnormal = rdt;
  
  	/*
  	 * NOTE: We disallow len == 0 because it provides a useful bit of extra
***************
*** 858,922 **** begin:;
  	if (len == 0 && !isLogSwitch)
  		elog(PANIC, "invalid xlog record length %u", len);
  
- 	START_CRIT_SECTION();
- 
- 	/* Now wait to get insert lock */
- 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
- 
- 	/*
- 	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go
- 	 * back and recompute everything.  This can only happen just after a
- 	 * checkpoint, so it's better to be slow in this case and fast otherwise.
- 	 *
- 	 * If we aren't doing full-page writes then RedoRecPtr doesn't actually
- 	 * affect the contents of the XLOG record, so we'll update our local copy
- 	 * but not force a recomputation.
- 	 */
- 	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
- 	{
- 		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
- 		RedoRecPtr = Insert->RedoRecPtr;
- 
- 		if (doPageWrites)
- 		{
- 			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- 			{
- 				if (dtbuf[i] == InvalidBuffer)
- 					continue;
- 				if (dtbuf_bkp[i] == false &&
- 					XLByteLE(dtbuf_lsn[i], RedoRecPtr))
- 				{
- 					/*
- 					 * Oops, this buffer now needs to be backed up, but we
- 					 * didn't think so above.  Start over.
- 					 */
- 					LWLockRelease(WALInsertLock);
- 					END_CRIT_SECTION();
- 					goto begin;
- 				}
- 			}
- 		}
- 	}
- 
- 	/*
- 	 * Also check to see if forcePageWrites was just turned on; if we weren't
- 	 * already doing full-page writes then go back and recompute. (If it was
- 	 * just turned off, we could recompute the record without full pages, but
- 	 * we choose not to bother.)
- 	 */
- 	if (Insert->forcePageWrites && !doPageWrites)
- 	{
- 		/* Oops, must redo it with full-page data */
- 		LWLockRelease(WALInsertLock);
- 		END_CRIT_SECTION();
- 		goto begin;
- 	}
- 
  	/*
  	 * Make additional rdata chain entries for the backup blocks, so that we
! 	 * don't need to special-case them in the write loop.  Note that we have
! 	 * now irrevocably changed the input rdata chain.  At the exit of this
! 	 * loop, write_len includes the backup block data.
  	 *
  	 * Also set the appropriate info bits to show which buffers were backed
  	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
--- 923,936 ----
  	if (len == 0 && !isLogSwitch)
  		elog(PANIC, "invalid xlog record length %u", len);
  
  	/*
  	 * Make additional rdata chain entries for the backup blocks, so that we
! 	 * don't need to special-case them in the write loop.  We have now
! 	 * modified the original rdata chain, but we remembered the last regular
! 	 * entry in rdt_lastnormal, so we can undo this if we have to loop back
! 	 * to the beginning.
! 	 *
! 	 * At the exit of this loop, write_len includes the backup block data.
  	 *
  	 * Also set the appropriate info bits to show which buffers were backed
  	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
***************
*** 931,943 **** begin:;
  		if (!dtbuf_bkp[i])
  			continue;
  
! 		info |= XLR_SET_BKP_BLOCK(i);
  
  		bkpb = &(dtbuf_xlg[i]);
  		page = (char *) BufferGetBlock(dtbuf[i]);
  
  		rdt->next = &(dtbuf_rdt1[i]);
! 		rdt = rdt->next;
  
  		rdt->data = (char *) bkpb;
  		rdt->len = sizeof(BkpBlock);
--- 945,957 ----
  		if (!dtbuf_bkp[i])
  			continue;
  
! 		info_final |= XLR_SET_BKP_BLOCK(i);
  
  		bkpb = &(dtbuf_xlg[i]);
  		page = (char *) BufferGetBlock(dtbuf[i]);
  
  		rdt->next = &(dtbuf_rdt1[i]);
! 		rdt = &(dtbuf_rdt1[i]);
  
  		rdt->data = (char *) bkpb;
  		rdt->len = sizeof(BkpBlock);
***************
*** 971,1044 **** begin:;
  	}
  
  	/*
! 	 * If there isn't enough space on the current XLOG page for a record
! 	 * header, advance to the next page (leaving the unused space as zeroes).
  	 */
! 	updrqst = false;
! 	freespace = INSERT_FREESPACE(Insert);
! 	if (freespace < SizeOfXLogRecord)
! 	{
! 		updrqst = AdvanceXLInsertBuffer(false);
! 		freespace = INSERT_FREESPACE(Insert);
! 	}
! 
! 	/* Compute record's XLOG location */
! 	curridx = Insert->curridx;
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
  
  	/*
! 	 * If the record is an XLOG_SWITCH, and we are exactly at the start of a
! 	 * segment, we need not insert it (and don't want to because we'd like
! 	 * consecutive switch requests to be no-ops).  Instead, make sure
! 	 * everything is written and flushed through the end of the prior segment,
! 	 * and return the prior segment's end address.
  	 */
! 	if (isLogSwitch &&
! 		(RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
! 	{
! 		/* We can release insert lock immediately */
! 		LWLockRelease(WALInsertLock);
! 
! 		RecPtr.xrecoff -= SizeOfXLogLongPHD;
! 		if (RecPtr.xrecoff == 0)
! 		{
! 			/* crossing a logid boundary */
! 			RecPtr.xlogid -= 1;
! 			RecPtr.xrecoff = XLogFileSize;
! 		}
! 
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 		LogwrtResult = XLogCtl->Write.LogwrtResult;
! 		if (!XLByteLE(RecPtr, LogwrtResult.Flush))
! 		{
! 			XLogwrtRqst FlushRqst;
! 
! 			FlushRqst.Write = RecPtr;
! 			FlushRqst.Flush = RecPtr;
! 			XLogWrite(FlushRqst, false, false);
! 		}
! 		LWLockRelease(WALWriteLock);
! 
! 		END_CRIT_SECTION();
! 
! 		return RecPtr;
! 	}
! 
! 	/* Insert record header */
! 
! 	record = (XLogRecord *) Insert->currpos;
! 	record->xl_prev = Insert->PrevRecord;
! 	record->xl_xid = GetCurrentTransactionIdIfAny();
! 	record->xl_tot_len = SizeOfXLogRecord + write_len;
! 	record->xl_len = len;		/* doesn't include backup blocks */
! 	record->xl_info = info;
! 	record->xl_rmid = rmid;
! 
! 	/* Now we can finish computing the record's CRC */
! 	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
! 			   SizeOfXLogRecord - sizeof(pg_crc32));
! 	FIN_CRC32(rdata_crc);
! 	record->xl_crc = rdata_crc;
  
  #ifdef WAL_DEBUG
  	if (XLOG_DEBUG)
--- 985,1013 ----
  	}
  
  	/*
! 	 * Calculate CRC of the data, including all the backup blocks
! 	 *
! 	 * Note that the record header isn't added into the CRC initially since
! 	 * we don't know the prev-link yet.  Thus, the CRC will represent the CRC
! 	 * of the whole record in the order "rdata, then backup blocks, then
! 	 * record header".
  	 */
! 	INIT_CRC32(rdata_crc);
! 	for (rdt = rdata; rdt != NULL; rdt = rdt->next)
! 		COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  
  	/*
! 	 * Construct record header. We can't CRC it yet, because the prev-link
! 	 * needs to be covered by the CRC and we don't know that yet. We will
! 	 * finish computing the CRC when we do.
  	 */
! 	MemSet(&rechdr, 0, sizeof(rechdr));
! 	rechdr.xl_prev = InvalidXLogRecPtr; /* TO BE DETERMINED */
! 	rechdr.xl_xid = GetCurrentTransactionIdIfAny();
! 	rechdr.xl_tot_len = SizeOfXLogRecord + write_len;
! 	rechdr.xl_len = len;		/* doesn't include backup blocks */
! 	rechdr.xl_info = info;
! 	rechdr.xl_rmid = rmid;
  
  #ifdef WAL_DEBUG
  	if (XLOG_DEBUG)
***************
*** 1059,1231 **** begin:;
  	}
  #endif
  
! 	/* Record begin of record in appropriate places */
! 	ProcLastRecPtr = RecPtr;
! 	Insert->PrevRecord = RecPtr;
! 
! 	Insert->currpos += SizeOfXLogRecord;
! 	freespace -= SizeOfXLogRecord;
  
  	/*
! 	 * Append the data, including backup blocks if any
  	 */
! 	while (write_len)
! 	{
! 		while (rdata->data == NULL)
! 			rdata = rdata->next;
  
! 		if (freespace > 0)
! 		{
! 			if (rdata->len > freespace)
! 			{
! 				memcpy(Insert->currpos, rdata->data, freespace);
! 				rdata->data += freespace;
! 				rdata->len -= freespace;
! 				write_len -= freespace;
! 			}
! 			else
! 			{
! 				memcpy(Insert->currpos, rdata->data, rdata->len);
! 				freespace -= rdata->len;
! 				write_len -= rdata->len;
! 				Insert->currpos += rdata->len;
! 				rdata = rdata->next;
! 				continue;
! 			}
! 		}
  
! 		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer(false);
! 		curridx = Insert->curridx;
! 		/* Insert cont-record header */
! 		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
! 		contrecord = (XLogContRecord *) Insert->currpos;
! 		contrecord->xl_rem_len = write_len;
! 		Insert->currpos += SizeOfXLogContRecord;
! 		freespace = INSERT_FREESPACE(Insert);
  	}
  
- 	/* Ensure next record will be properly aligned */
- 	Insert->currpos = (char *) Insert->currpage +
- 		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
- 	freespace = INSERT_FREESPACE(Insert);
- 
  	/*
  	 * The recptr I return is the beginning of the *next* record. This will be
  	 * stored as LSN for changed data pages...
  	 */
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
  
  	/*
! 	 * If the record is an XLOG_SWITCH, we must now write and flush all the
! 	 * existing data, and then forcibly advance to the start of the next
! 	 * segment.  It's not good to do this I/O while holding the insert lock,
! 	 * but there seems too much risk of confusion if we try to release the
! 	 * lock sooner.  Fortunately xlog switch needn't be a high-performance
! 	 * operation anyway...
  	 */
! 	if (isLogSwitch)
  	{
! 		XLogCtlWrite *Write = &XLogCtl->Write;
! 		XLogwrtRqst FlushRqst;
! 		XLogRecPtr	OldSegEnd;
  
! 		TRACE_POSTGRESQL_XLOG_SWITCH();
  
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  
! 		/*
! 		 * Flush through the end of the page containing XLOG_SWITCH, and
! 		 * perform end-of-segment actions (eg, notifying archiver).
! 		 */
! 		WriteRqst = XLogCtl->xlblocks[curridx];
! 		FlushRqst.Write = WriteRqst;
! 		FlushRqst.Flush = WriteRqst;
! 		XLogWrite(FlushRqst, false, true);
! 
! 		/* Set up the next buffer as first page of next segment */
! 		/* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
! 		(void) AdvanceXLInsertBuffer(true);
! 
! 		/* There should be no unwritten data */
! 		curridx = Insert->curridx;
! 		Assert(curridx == Write->curridx);
! 
! 		/* Compute end address of old segment */
! 		OldSegEnd = XLogCtl->xlblocks[curridx];
! 		OldSegEnd.xrecoff -= XLOG_BLCKSZ;
! 		if (OldSegEnd.xrecoff == 0)
! 		{
! 			/* crossing a logid boundary */
! 			OldSegEnd.xlogid -= 1;
! 			OldSegEnd.xrecoff = XLogFileSize;
! 		}
  
! 		/* Make it look like we've written and synced all of old segment */
! 		LogwrtResult.Write = OldSegEnd;
! 		LogwrtResult.Flush = OldSegEnd;
  
! 		/*
! 		 * Update shared-memory status --- this code should match XLogWrite
! 		 */
  		{
! 			/* use volatile pointer to prevent code rearrangement */
! 			volatile XLogCtlData *xlogctl = XLogCtl;
  
! 			SpinLockAcquire(&xlogctl->info_lck);
! 			xlogctl->LogwrtResult = LogwrtResult;
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
! 				xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
! 			if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
! 				xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
! 			SpinLockRelease(&xlogctl->info_lck);
! 		}
  
! 		Write->LogwrtResult = LogwrtResult;
  
! 		LWLockRelease(WALWriteLock);
  
! 		updrqst = false;		/* done already */
! 	}
! 	else
! 	{
! 		/* normal case, ie not xlog switch */
  
! 		/* Need to update shared LogwrtRqst if some block was filled up */
! 		if (freespace < SizeOfXLogRecord)
! 		{
! 			/* curridx is filled and available for writing out */
! 			updrqst = true;
! 		}
! 		else
! 		{
! 			/* if updrqst already set, write through end of previous buf */
! 			curridx = PrevBufIdx(curridx);
  		}
! 		WriteRqst = XLogCtl->xlblocks[curridx];
  	}
  
! 	LWLockRelease(WALInsertLock);
  
! 	if (updrqst)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		SpinLockAcquire(&xlogctl->info_lck);
  		/* advance global request to include new block(s) */
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
! 			xlogctl->LogwrtRqst.Write = WriteRqst;
  		/* update local result copy while I have the chance */
  		LogwrtResult = xlogctl->LogwrtResult;
  		SpinLockRelease(&xlogctl->info_lck);
  	}
  
! 	XactLastRecEnd = RecPtr;
  
- 	END_CRIT_SECTION();
  
! 	return RecPtr;
  }
  
  /*
--- 1028,1505 ----
  	}
  #endif
  
! 	START_CRIT_SECTION();
  
  	/*
! 	 * Try to do the insertion.
  	 */
! 	RecPtr = PerformXLogInsert(write_len, &rechdr, rdata, rdata_crc,
! 							forcePageWrites);
! 	END_CRIT_SECTION();
  
! 	if (XLogRecPtrIsInvalid(RecPtr))
! 	{
! 		/*
! 		 * Oops, have to retry. Unlink the backup blocks from the chain,
! 		 * restoring it to its original state.
! 		 */
! 		rdt_lastnormal->next = NULL;
  
! 		goto begin;
  	}
  
  	/*
  	 * The recptr I return is the beginning of the *next* record. This will be
  	 * stored as LSN for changed data pages...
  	 */
! 	return RecPtr;
! }
! 
! /*
!  * Subroutine of XLogInsert. All the changes to shared state are done here,
!  * XLogInsert only prepares the record for insertion.
!  *
!  * On success, returns pointer to end of inserted record like XLogInsert().
!  * If RedoRecPtr or forcePageWrites had changed, returns InvalidRecPtr, and
!  * the caller must recalculate full-page-images and retry.
!  */
! static XLogRecPtr
! PerformXLogInsert(int write_len, XLogRecord *rechdr,
! 				  XLogRecData *rdata, pg_crc32 rdata_crc,
! 				  bool forcePageWrites)
! {
! 	volatile BackendXLogInsertSlot *myslot;
! 	char	   *currpos;
! 	XLogRecord *record;
! 	int			tot_len;
! 	int			freespace;
! 	int			tot_left;
! 	XLogRecPtr	PrevRecord;
! 	XLogRecPtr	StartPos;
! 	XLogRecPtr	EndPos;
! 	XLogRecPtr	CurrPos;
  
  	/*
! 	 * Our fast insertion method requires each process to have its own
! 	 * "slot", to tell others that it's still busy doing the insertion. Each
! 	 * regular backend has a dedicated slot, but auxiliary processes share
! 	 * one extra slot. Aux processes don't write a lot of WAL so they can
! 	 * well share. WALAuxSlotLock is used to coordinate access to the slot
! 	 * between aux processes.
  	 */
! 	if (MyBackendId == InvalidBackendId)
  	{
! 		LWLockAcquire(WALAuxSlotLock, LW_EXCLUSIVE);
! 		myslot = &XLogCtl->BackendXLogInsertSlots[MaxBackends];
! 	}
! 	else
! 		myslot = &XLogCtl->BackendXLogInsertSlots[MyBackendId];
  
! 	/* Get an insert location  */
! 	tot_len = SizeOfXLogRecord + write_len;
! 	if (!ReserveXLogInsertLocation(tot_len, forcePageWrites,
! 								   &PrevRecord, &StartPos, &EndPos, myslot))
! 	{
! 		if (MyBackendId == InvalidBackendId)
! 			LWLockRelease(WALAuxSlotLock);
! 		return InvalidXLogRecPtr;
! 	}
  
! 	/*
! 	 * Got an insertion location! Now that we know the prev-link, we can
! 	 * finish computing the record's CRC
! 	 */
! 	rechdr->xl_prev = PrevRecord;
! 	COMP_CRC32(rdata_crc, ((char *) rechdr) + sizeof(pg_crc32),
! 			   SizeOfXLogRecord - sizeof(pg_crc32));
! 	FIN_CRC32(rdata_crc);
! 	rechdr->xl_crc = rdata_crc;
  
! 	/* Get the right WAL page to start inserting to */
! 	CurrPos = StartPos;
! 	currpos = GetXLogBuffer(CurrPos);
! 	freespace = XLOG_BLCKSZ - CurrPos.xrecoff % XLOG_BLCKSZ;
  
! 	/* Copy the record header and data */
! 	record = (XLogRecord *) currpos;
  
! 	memcpy(record, rechdr, sizeof(XLogRecord));
! 	currpos += SizeOfXLogRecord;
! 	CurrPos.xrecoff += SizeOfXLogRecord;
! 	freespace -= SizeOfXLogRecord;
! 
! 	tot_left = write_len;
! 	while (rdata != NULL)
! 	{
! 		while (rdata->len > freespace)
  		{
! 			/*
! 			 * Write what fits on this page, then write the continuation
! 			 * record, and continue.
! 			 */
! 			XLogContRecord *contrecord;
  
! 			memcpy(currpos, rdata->data, freespace);
! 			rdata->data += freespace;
! 			rdata->len -= freespace;
! 			tot_left -= freespace;
  
! 			CurrPos = AdvanceXLogRecPtrToNextPage(CurrPos);
  
! 			/*
! 			 * Make sure the memcpy is visible to others before we claim
! 			 * it to be done. It's important to update CurrPos before calling
! 			 * GetXLogBuffer(), because GetXLogBuffer() might need to wait
! 			 * for some insertions to finish so that it can write out a
! 			 * buffer to make room for the new page. Updating CurrPos before
! 			 * waiting for a new buffer ensures that we don't deadlock with
! 			 * ourselves if we run out of clean buffers.
! 			 */
! 			pg_write_barrier();
! 			myslot->CurrPos = CurrPos;
  
! 			currpos = GetXLogBuffer(CurrPos);
  
! 			contrecord = (XLogContRecord *) currpos;
! 			contrecord->xl_rem_len = tot_len - tot_left;
! 
! 			currpos += SizeOfXLogContRecord;
! 			CurrPos.xrecoff += SizeOfXLogContRecord;
! 
! 			freespace = XLOG_BLCKSZ - CurrPos.xrecoff % XLOG_BLCKSZ;
  		}
! 
! 		memcpy(currpos, rdata->data, rdata->len);
! 		currpos += rdata->len;
! 		CurrPos.xrecoff += rdata->len;
! 		freespace -= rdata->len;
! 		tot_left -= rdata->len;
! 
! 		rdata = rdata->next;
  	}
+ 	Assert(tot_left == 0);
+ 
+ 	CurrPos.xrecoff = MAXALIGN(CurrPos.xrecoff);
+ 	Assert(XLByteEQ(CurrPos, EndPos));
+ 
+ 	/*
+ 	 * Done! Clear CurrPos in our slot to let others know that we're finished,
+ 	 * but first make sure the changes we made to the WAL pages are visible
+ 	 * to everyone.
+ 	 */
+ 	pg_write_barrier();
+ 	myslot->CurrPos = InvalidXLogRecPtr;
+ 	if (MyBackendId == InvalidBackendId)
+ 		LWLockRelease(WALAuxSlotLock);
  
! 	/* update our global variables */
! 	ProcLastRecPtr = StartPos;
! 	XactLastRecEnd = EndPos;
  
! 	/* update shared LogwrtRqst.Write, if we crossed page boundary */
! 	if (StartPos.xrecoff / XLOG_BLCKSZ != EndPos.xrecoff / XLOG_BLCKSZ)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		SpinLockAcquire(&xlogctl->info_lck);
  		/* advance global request to include new block(s) */
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, EndPos))
! 			xlogctl->LogwrtRqst.Write = EndPos;
  		/* update local result copy while I have the chance */
  		LogwrtResult = xlogctl->LogwrtResult;
  		SpinLockRelease(&xlogctl->info_lck);
  	}
  
! 	return EndPos;
! }
  
  
! /*
!  * Reserves the right amount of space for a record of the given size from
!  * the WAL. *StartPos_p is set to the beginning of the reserved section,
!  * *EndPos_p to its end, and *Prev_record_p points to the beginning of the
!  * previous record to set to the prev-link of the record header.
!  *
!  * While holding insertpos_lck, sets myslot->CurrPos to the starting position,
!  * to let others know that we're busy inserting to the reserved area. The
!  * caller must clear it when the insertion is finished.
!  *
!  * Returns true on success, or false if RedoRecPtr or forcePageWrites was
!  * changed. On failure, the shared state is not modified.
!  *
!  * This is the performance critical part of XLogInsert that must be
!  * serialized across backends. The rest can happen mostly in parallel.
!  *
!  * NB: The space calculation here must match the code in PerformXLogInsert,
!  * where we actually copy the record to the reserved space.
!  */
! static bool
! ReserveXLogInsertLocation(int size, bool forcePageWrites,
! 						  XLogRecPtr *PrevRecord_p, XLogRecPtr *StartPos_p,
! 						  XLogRecPtr *EndPos_p,
! 						  volatile BackendXLogInsertSlot *myslot)
! {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			freespace;
! 	XLogRecPtr	ptr;
! 	XLogRecPtr	StartPos;
! 	int			sizeleft;
! 
! 	sizeleft = size;
! 
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 
! 	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr) ||
! 		Insert->forcePageWrites != forcePageWrites)
! 	{
! 		/*
! 		 * Oops, forcePageWrites was just turned on, or a checkpoint
! 		 * just happened. Loop back to the beginning, because we might have
! 		 * to include more full-page images in the record
! 		 */
! 		RedoRecPtr = Insert->RedoRecPtr;
! 		SpinLockRelease(&Insert->insertpos_lck);
! 		return false;
! 	}
! 
! 	/*
! 	 * Now reserve the right amount of space from the WAL for our record.
! 	 */
! 	ptr = Insert->CurrPos;
! 	*PrevRecord_p = Insert->PrevRecord;
! 
! 	/*
! 	 * If there isn't enough space on the current XLOG page for a record
! 	 * header, advance to the next page (leaving the unused space as zeroes).
! 	 */
! 	freespace = INSERT_FREESPACE(ptr);
! 	if (freespace < SizeOfXLogRecord)
! 	{
! 		ptr = AdvanceXLogRecPtrToNextPage(ptr);
! 		freespace = INSERT_FREESPACE(ptr);
! 	}
! 	StartPos = ptr;
! 
! 	/*
! 	 * Set our slot's CurrPos to the starting position, to let others know
! 	 * that we're busy inserting to this area.
! 	 */
! 	myslot->CurrPos = StartPos;
! 
! 	while (freespace < sizeleft)
! 	{
! 		/* fill this page, and continue on next page */
! 		sizeleft -= freespace;
! 		ptr = AdvanceXLogRecPtrToNextPage(ptr);
! 
! 		/* account for continuation record header */
! 		ptr.xrecoff += SizeOfXLogContRecord;
! 		freespace = INSERT_FREESPACE(ptr);
! 	}
! 	/* the rest fits on this page */
! 	ptr.xrecoff += sizeleft;
! 	sizeleft = 0;
! 
! 	/* Align the end position, so that the next record starts aligned */
! 	ptr.xrecoff = MAXALIGN(ptr.xrecoff);
! 
! 	/* Update the shared state, and our slot, before releasing the lock */
! 	Insert->CurrPos = ptr;
! 	Insert->PrevRecord = StartPos;
! 
! 	SpinLockRelease(&Insert->insertpos_lck);
! 
! #ifdef RESERVE_XLOGINSERT_LOCATION_DEBUG
! 	elog(DEBUG2, "reserved xlog: prev %X/%X, start %X/%X, end %X/%X (len %d)",
! 		 result->PrevRecord.xlogid, result->PrevRecord.xrecoff,
! 		 StartPos.xlogid, StartPos.xrecoff,
! 		 ptr.xlogid, ptr.xrecoff,
! 		 reqsize);
! #endif
! 
! 	*EndPos_p = ptr;
! 	*StartPos_p = StartPos;
! 
! 	return true;
! }
! 
! /*
!  * Get a pointer to the right location in the WAL buffer corresponding a
!  * given XLogRecPtr.
!  *
!  * If the page is not initialized yet, it is initialized. That might also
!  * require evicting an old diry buffer from the buffer cache, which means I/O.
!  *
!  * The caller must ensure that the page containing the requested location
!  * isn't evicted yet, and won't be evicted, by holding onto a
!  * BackendXLogInsertSlot with CurrPos set to 'ptr'. Setting it to some value
!  * less than 'ptr' would suffice for GetXLogBuffer(), but risks deadlock:
!  * if we have to evict a buffer, we might have to wait for someone else to
!  * finish a write. And that someone else might not be able to finish the write
!  * if our CurrPos points to a buffer that's still in the buffer cache.
!  */
! static char *
! GetXLogBuffer(XLogRecPtr ptr)
! {
! 	int			idx;
! 	XLogRecPtr	endptr;
! 
! 	/*
! 	 * The XLog buffer cache is organized so that we can easily calculate the
! 	 * buffer a given page must be loaded into from the XLogRecPtr alone.
! 	 * A page must always be loaded to a particular buffer.
! 	 */
! 	idx = XLogRecPtrToBufIdx(ptr);
! 
! 	/*
! 	 * See what page is loaded in the buffer at the moment. It could be the
! 	 * page we're looking for, or something older. It can't be anything
! 	 * newer - that would imply the page we're looking for has already
! 	 * been written out to disk, which shouldn't happen as long as the caller
! 	 * has set its slot's CurrPos correctly.
! 	 *
! 	 * However, we don't hold a lock while we read the value. If someone has
! 	 * just initialized the page, it's possible that we get a "torn read",
! 	 * and read a bogus value. That's ok, we'll grab the mapping lock (in
! 	 * AdvanceXLInsertBuffer) and retry if we see anything else than the page
! 	 * we're looking for. But it means that when we do this unlocked read, we
! 	 * might see a value that *is* ahead of the page we're looking for. So
! 	 * don't PANIC on that, until we've verified the value while holding the
! 	 * lock.
! 	 */
! 	endptr = XLogCtl->xlblocks[idx];
! 	if (ptr.xlogid != endptr.xlogid ||
! 		!(ptr.xrecoff < endptr.xrecoff &&
! 		  ptr.xrecoff >= endptr.xrecoff - XLOG_BLCKSZ))
! 	{
! 		AdvanceXLInsertBuffer(false, ptr, false);
! 		endptr = XLogCtl->xlblocks[idx];
! 
! 		if (ptr.xlogid != endptr.xlogid ||
! 			!(ptr.xrecoff < endptr.xrecoff &&
! 			  ptr.xrecoff >= endptr.xrecoff - XLOG_BLCKSZ))
! 		{
! 			elog(PANIC, "could not find WAL buffer for %X/%X", ptr.xlogid, ptr.xrecoff);
! 		}
! 	}
! 
! 	/*
! 	 * Found the buffer holding this page. Return a pointer to the right
! 	 * offset within the page.
! 	 */
! 	return (char *) XLogCtl->pages + idx * (Size) XLOG_BLCKSZ +
! 		ptr.xrecoff % XLOG_BLCKSZ;
! }
! 
! /*
!  * Advance an XLogRecPtr to the first valid insertion location on the next
!  * page, right after the page header. An XLogRecPtr pointing to a boundary,
!  * ie. the first byte of a page, is taken to belong to the previous page, 
!  */
! static XLogRecPtr
! AdvanceXLogRecPtrToNextPage(XLogRecPtr ptr)
! {
! 	int			freespace;
! 
! 	freespace = INSERT_FREESPACE(ptr);
! 	XLByteAdvance(ptr, freespace);
! 	if (ptr.xrecoff % XLogSegSize == 0)
! 		ptr.xrecoff += SizeOfXLogLongPHD;
! 	else
! 		ptr.xrecoff += SizeOfXLogShortPHD;
! 
! 	return ptr;
! }
! 
! /*
!  * Wait for any insertions < upto to finish.
!  *
!  * Returns a value >= upto, which indicates the oldest in-progress insertion
!  * that we saw in the array. All insertions upto that point are guaranteed to
!  * be finished. Note that it is just a conservative guess, there are race
!  * conditions where we return a bogus, too-low, value. If you care about the
!  * return value, you must get the current Insert->CurrPos value *before*
!  * calling this function, and pass that as the CurrPos argument.
!  */
! static XLogRecPtr
! WaitXLogInsertionsToFinish(XLogRecPtr upto, XLogRecPtr CurrPos)
! {
! 	int			i;
! 	int			nbusyslots = 0;
! 	/* FIXME: it's safe to palloc here, would PANIC on OOM */
! 	int		   *busyslots = palloc0(sizeof(int) * NumXLogInsertSlots);
! 	int			cycles = 0;
! 
! 	/*
! 	 * Get a list of backend slots that are still inserting to a point earlier
! 	 * than 'upto'.
! 	 *
! 	 * This is a bit sloppy because we don't do any locking here. A slot's
! 	 * CurrPos that we read might get split if 8-byte loads are not atomic,
! 	 * but that's harmless. All slots with values <= upto, which we really do
! 	 * have to wait for, must be in the array before this function is called.
! 	 * That's because the 'upto' value must've been obtained by reading the
! 	 * current insert position, either directly or indirectly. It can never
! 	 * be > Insert->CurrPos. So we shouldn't miss anything that we genuinely
! 	 * need to wait for. OTOH, if someone is just storing an XLogRecPtr in a
! 	 * slot while we read it, we might incorrectly think that we have to wait
! 	 * for it. But that's OK, because in the loop that follows, we'll retry
! 	 * and see that it's actually > upto.
! 	 *
! 	 * XXX: that's bogus. You might see a too-new value if a slot's CurrPos is
! 	 * advanced at the same instant.
! 	 */
! 	for (i = 0; i < NumXLogInsertSlots; i++)
! 	{
! 		volatile BackendXLogInsertSlot *slot = &XLogCtl->BackendXLogInsertSlots[i];
! 		XLogRecPtr slotptr = slot->CurrPos;
! 
! 		if (XLogRecPtrIsInvalid(slotptr))
! 			continue;
! 
! 		if (XLByteLT(slotptr, upto))
! 			busyslots[nbusyslots++] = i;
! 		else if (XLByteLT(slotptr, CurrPos))
! 			CurrPos = slotptr;
! 	}
! 
! 	/*
! 	 * Busy-wait until the insertion is done.
! 	 *
! 	 * TODO: This needs to be replaced with something smarter. I don't think
! 	 * it's possible that we'd have to wait for I/O here, though. As the code
! 	 * stands, the caller never passes an 'upto' pointer that points to an
! 	 * uninitialized page. It always points to an already inserted record, in
! 	 * which case the page must already be initialized in the WAL buffer
! 	 * cache. Nevertheless, busy-waiting is not good.
! 	 */
! 	while (nbusyslots > 0)
! 	{
! 		pg_read_barrier();
! 		for (i = 0; i < nbusyslots; i++)
! 		{
! 			volatile BackendXLogInsertSlot *slot = &XLogCtl->BackendXLogInsertSlots[busyslots[i]];
! 			XLogRecPtr slotptr = slot->CurrPos;
! 
! 			if (XLogRecPtrIsInvalid(slot->CurrPos) || !XLByteLT(slotptr, upto))
! 			{
! 				if (nbusyslots > 1)
! 				{
! 					busyslots[i] = busyslots[nbusyslots - 1];
! 					i--;
! 				}
! 				nbusyslots--;
! 			}
! 		}
! 
! 		/* a debugging aid */
! 		if (++cycles == 1000000)
! 			elog(LOG, "stuck waiting upto %X/%X", upto.xlogid, upto.xrecoff);
! 	}
! 	pfree(busyslots);
! 
! 	return CurrPos;
  }
  
  /*
***************
*** 1458,1486 **** XLogArchiveCleanup(const char *xlog)
   * If new_segment is TRUE then we set up the next buffer page as the first
   * page of the next xlog segment file, possibly but not usually the next
   * consecutive file page.
-  *
-  * The global LogwrtRqst.Write pointer needs to be advanced to include the
-  * just-filled page.  If we can do this for free (without an extra lock),
-  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
-  * request update still needs to be done, FALSE if we did it internally.
-  *
-  * Must be called with WALInsertLock held.
   */
! static bool
! AdvanceXLInsertBuffer(bool new_segment)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogCtlWrite *Write = &XLogCtl->Write;
! 	int			nextidx = NextBufIdx(Insert->curridx);
! 	bool		update_needed = true;
  	XLogRecPtr	OldPageRqstPtr;
  	XLogwrtRqst WriteRqst;
  	XLogRecPtr	NewPageEndPtr;
  	XLogPageHeader NewPage;
  
! 	/* Use Insert->LogwrtResult copy if it's more fresh */
! 	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
! 		LogwrtResult = Insert->LogwrtResult;
  
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
--- 1732,1763 ----
   * If new_segment is TRUE then we set up the next buffer page as the first
   * page of the next xlog segment file, possibly but not usually the next
   * consecutive file page.
   */
! static void
! AdvanceXLInsertBuffer(bool new_segment, XLogRecPtr upto, bool opportunistic)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogCtlWrite *Write = &XLogCtl->Write;
! 	int			nextidx;
  	XLogRecPtr	OldPageRqstPtr;
  	XLogwrtRqst WriteRqst;
  	XLogRecPtr	NewPageEndPtr;
  	XLogPageHeader NewPage;
+ 	bool		needflush;
+ 	int			npages = 0;
+ 	XLogRecPtr	EvictedPtr;
+ 
+ 	Assert(!new_segment); /* FIXME: not implemented */
+ 
+ 	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
  
! 	/*
! 	 * Now that we have the lock, check if someone initialized the page
! 	 * already.
! 	 */
! while (!XLByteLT(upto, XLogCtl->xlblocks[XLogCtl->curridx]) || opportunistic)
! {
! 	nextidx = NextBufIdx(XLogCtl->curridx);
  
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
***************
*** 1488,1499 **** AdvanceXLInsertBuffer(bool new_segment)
  	 * written out.
  	 */
  	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
! 	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  	{
  		/* nope, got work to do... */
  		XLogRecPtr	FinishedPageRqstPtr;
  
! 		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
  
  		/* Before waiting, get info_lck and update LogwrtResult */
  		{
--- 1765,1779 ----
  	 * written out.
  	 */
  	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
! 
! 	needflush = !XLByteLE(OldPageRqstPtr, LogwrtResult.Write);
! 
! 	if (needflush)
  	{
  		/* nope, got work to do... */
  		XLogRecPtr	FinishedPageRqstPtr;
  
! 		FinishedPageRqstPtr = XLogCtl->xlblocks[XLogCtl->curridx];
  
  		/* Before waiting, get info_lck and update LogwrtResult */
  		{
***************
*** 1502,1534 **** AdvanceXLInsertBuffer(bool new_segment)
  
  			SpinLockAcquire(&xlogctl->info_lck);
  			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
  				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
  			LogwrtResult = xlogctl->LogwrtResult;
  			SpinLockRelease(&xlogctl->info_lck);
  		}
  
! 		update_needed = false;	/* Did the shared-request update */
! 
! 		if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
! 		{
! 			/* OK, someone wrote it already */
! 			Insert->LogwrtResult = LogwrtResult;
! 		}
! 		else
  		{
! 			/* Must acquire write lock */
  			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  			LogwrtResult = Write->LogwrtResult;
  			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  			{
  				/* OK, someone wrote it already */
  				LWLockRelease(WALWriteLock);
- 				Insert->LogwrtResult = LogwrtResult;
  			}
  			else
  			{
  				/*
! 				 * Have to write buffers while holding insert lock. This is
  				 * not good, so only write as much as we absolutely must.
  				 */
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
--- 1782,1824 ----
  
  			SpinLockAcquire(&xlogctl->info_lck);
  			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
+ 			{
+ 				Assert(XLByteLE(FinishedPageRqstPtr, XLogCtl->Insert.CurrPos));
  				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
+ 			}
  			LogwrtResult = xlogctl->LogwrtResult;
  			SpinLockRelease(&xlogctl->info_lck);
  		}
  
! 		if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  		{
! 			/*
! 			 * If we just want to pre-initialize as much as we can without
! 			 * flushing, give up now.
! 			 */
! 			if (opportunistic)
! 				break;
! 
! 			/*
! 			 * Must acquire write lock. Release WALBufMappingLock first, to
! 			 * make sure that all insertions that we need to wait for can
! 			 * finish (up to this same position). Otherwise we risk deadlock.
! 			 */
! 			LWLockRelease(WALBufMappingLock);
! 
! 			WaitXLogInsertionsToFinish(OldPageRqstPtr, InvalidXLogRecPtr);
! 
  			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  			LogwrtResult = Write->LogwrtResult;
  			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  			{
  				/* OK, someone wrote it already */
  				LWLockRelease(WALWriteLock);
  			}
  			else
  			{
  				/*
! 				 * Have to write buffers while holding mapping lock. This is
  				 * not good, so only write as much as we absolutely must.
  				 */
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
***************
*** 1537,1560 **** AdvanceXLInsertBuffer(bool new_segment)
  				WriteRqst.Flush.xrecoff = 0;
  				XLogWrite(WriteRqst, false, false);
  				LWLockRelease(WALWriteLock);
- 				Insert->LogwrtResult = LogwrtResult;
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
  			}
  		}
  	}
  
  	/*
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
! 	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
  
  	if (new_segment)
  	{
  		/* force it to a segment start point */
  		NewPageEndPtr.xrecoff += XLogSegSize - 1;
  		NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
  	}
  
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
--- 1827,1856 ----
  				WriteRqst.Flush.xrecoff = 0;
  				XLogWrite(WriteRqst, false, false);
  				LWLockRelease(WALWriteLock);
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
  			}
+ 			/* Re-acquire WALBufMappingLock and retry */
+ 			LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+ 			continue;
  		}
  	}
  
+ 	EvictedPtr = OldPageRqstPtr;
+ 
  	/*
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
! 	NewPageEndPtr = XLogCtl->xlblocks[XLogCtl->curridx];
  
+ #ifdef BROKEN
  	if (new_segment)
  	{
  		/* force it to a segment start point */
  		NewPageEndPtr.xrecoff += XLogSegSize - 1;
  		NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
  	}
+ #endif
  
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
***************
*** 1564,1577 **** AdvanceXLInsertBuffer(bool new_segment)
  	}
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
  	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
  	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
  
- 	Insert->curridx = nextidx;
- 	Insert->currpage = NewPage;
- 
- 	Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
- 
  	/*
  	 * Be sure to re-zero the buffer so that bytes beyond what we've written
  	 * will look like zeroes and not valid XLOG records...
--- 1860,1871 ----
  	}
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
+ 	Assert(NewPageEndPtr.xrecoff % XLOG_BLCKSZ == 0);
+ 	Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
+ 
  	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
  	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
  
  	/*
  	 * Be sure to re-zero the buffer so that bytes beyond what we've written
  	 * will look like zeroes and not valid XLOG records...
***************
*** 1614,1624 **** AdvanceXLInsertBuffer(bool new_segment)
  		NewLongPage->xlp_seg_size = XLogSegSize;
  		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
- 
- 		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
  	}
  
! 	return update_needed;
  }
  
  /*
--- 1908,1938 ----
  		NewLongPage->xlp_seg_size = XLogSegSize;
  		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
  	}
  
! 	/*
! 	 * make sure the xlblocks update becomes visible to others before the
! 	 * curridx update.
! 	 */
! 	pg_write_barrier();
! 
! 	XLogCtl->curridx = nextidx;
! 
! 	npages++;
! }
! 
! 	Assert(opportunistic || XLByteLT(upto, XLogCtl->xlblocks[XLogCtl->curridx]));
! 	LWLockRelease(WALBufMappingLock);
! 
! 
! #ifdef WAL_DEBUG
! 	if (npages > 0)
! 		elog(LOG, "initialized %d pages, upto %X/%X (evicted upto %X/%X) in slot %d (backend %d)",
! 			 npages,
! 			 NewPageEndPtr.xlogid, NewPageEndPtr.xrecoff,
! 			 EvictedPtr.xlogid, EvictedPtr.xrecoff,
! 			 nextidx, MyBackendId);
! #endif
  }
  
  /*
***************
*** 1669,1675 **** XLogCheckpointNeeded(uint32 logid, uint32 logseg)
   * only if caller specifies WriteRqst == page-end and flexible == false,
   * and there is some data to write.)
   *
!  * Must be called with WALWriteLock held.
   */
  static void
  XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
--- 1983,1991 ----
   * only if caller specifies WriteRqst == page-end and flexible == false,
   * and there is some data to write.)
   *
!  * Must be called with WALWriteLock held. And you must've called
!  * WaitXLogInsertionsToFinish(WriteRqst) to make sure the data is ready to
!  * write.
   */
  static void
  XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
***************
*** 1722,1731 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  		 * last page that's been initialized by AdvanceXLInsertBuffer.
  		 */
  		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
! 			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
  				 XLogCtl->xlblocks[curridx].xlogid,
! 				 XLogCtl->xlblocks[curridx].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
  		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
--- 2038,2047 ----
  		 * last page that's been initialized by AdvanceXLInsertBuffer.
  		 */
  		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
! 			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X (slot %d)",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
  				 XLogCtl->xlblocks[curridx].xlogid,
! 				 XLogCtl->xlblocks[curridx].xrecoff, curridx);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
  		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
***************
*** 2097,2129 **** XLogFlush(XLogRecPtr record)
  	/* done already? */
  	if (!XLByteLE(record, LogwrtResult.Flush))
  	{
  		/* now wait for the write lock */
  		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  		LogwrtResult = XLogCtl->Write.LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
! 			/* try to write/flush later additions to XLOG as well */
! 			if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
! 			{
! 				XLogCtlInsert *Insert = &XLogCtl->Insert;
! 				uint32		freespace = INSERT_FREESPACE(Insert);
  
- 				if (freespace < SizeOfXLogRecord)		/* buffer is full */
- 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- 				else
- 				{
- 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- 					WriteRqstPtr.xrecoff -= freespace;
- 				}
- 				LWLockRelease(WALInsertLock);
- 				WriteRqst.Write = WriteRqstPtr;
- 				WriteRqst.Flush = WriteRqstPtr;
- 			}
- 			else
- 			{
- 				WriteRqst.Write = WriteRqstPtr;
- 				WriteRqst.Flush = record;
- 			}
  			XLogWrite(WriteRqst, false, false);
  		}
  		LWLockRelease(WALWriteLock);
--- 2413,2460 ----
  	/* done already? */
  	if (!XLByteLE(record, LogwrtResult.Flush))
  	{
+ 		/* try to write/flush later additions to XLOG as well */
+ 		volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 		uint32		freespace;
+ 		XLogRecPtr	insertpos;
+ 
+ 		/*
+ 		 * Get the current insert position.
+ 		 *
+ 		 * XXX: This used to use LWLockConditionalAcquire, and fall back
+ 		 * to writing just up to 'record' if we couldn't get the lock. I
+ 		 * wonder if it would be a good idea to have a
+ 		 * SpinLockConditionalAcquire function and use that? On one hand,
+ 		 * it would be good to not cause more contention on the lock if it's
+ 		 * busy, but on the other hand, this spinlock is much more lightweight
+ 		 * than the WALInsertLock was, so maybe it's better to just grab the
+ 		 * spinlock. Also note that if we stored the XLogRecPtr as one 64-bit
+ 		 * integer, we could just read it with no lock on platforms where
+ 		 * 64-bit integer accesses are atomic, which covers many common
+ 		 * platforms nowadays.
+ 		 */
+ 		SpinLockAcquire(&Insert->insertpos_lck);
+ 		insertpos = Insert->CurrPos;
+ 		SpinLockRelease(&Insert->insertpos_lck);
+ 
+ 		freespace = INSERT_FREESPACE(insertpos);
+ 		if (freespace < SizeOfXLogRecord)		/* buffer is full */
+ 			insertpos.xrecoff += freespace;
+ 
+ 		/*
+ 		 * Before actually performing the write, wait for all in-flight
+ 		 * insertions to the pages we're about to write to finish.
+ 		 */
+ 		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr, insertpos);
+ 
  		/* now wait for the write lock */
  		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  		LogwrtResult = XLogCtl->Write.LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
! 			WriteRqst.Write = insertpos;
! 			WriteRqst.Flush = insertpos;
  
  			XLogWrite(WriteRqst, false, false);
  		}
  		LWLockRelease(WALWriteLock);
***************
*** 2234,2243 **** XLogBackgroundFlush(void)
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
  			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
  #endif
- 
  	START_CRIT_SECTION();
  
  	/* now wait for the write lock */
  	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  	LogwrtResult = XLogCtl->Write.LogwrtResult;
  	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
--- 2565,2576 ----
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
  			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
  #endif
  	START_CRIT_SECTION();
  
  	/* now wait for the write lock */
+ 
+ 	WaitXLogInsertionsToFinish(WriteRqstPtr, InvalidXLogRecPtr);
+ 
  	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  	LogwrtResult = XLogCtl->Write.LogwrtResult;
  	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
***************
*** 2248,2256 **** XLogBackgroundFlush(void)
  		WriteRqst.Flush = WriteRqstPtr;
  		XLogWrite(WriteRqst, flexible, false);
  	}
! 	LWLockRelease(WALWriteLock);
  
  	END_CRIT_SECTION();
  }
  
  /*
--- 2581,2600 ----
  		WriteRqst.Flush = WriteRqstPtr;
  		XLogWrite(WriteRqst, flexible, false);
  	}
! 	LogwrtResult = XLogCtl->Write.LogwrtResult;
  
  	END_CRIT_SECTION();
+ 
+ 	LWLockRelease(WALWriteLock);
+ 
+ 	/*
+ 	 * Great, done. To take some work off the critical path, try to initialize
+ 	 * as many of the no-longer-needed WAL buffers for future use as we can.
+ 	 *
+ 	 * Before we release the write lock, calculate the location of the last
+ 	 * fully written page.
+ 	 */
+ 	AdvanceXLInsertBuffer(false, InvalidXLogRecPtr, true);
  }
  
  /*
***************
*** 5044,5049 **** XLOGShmemSize(void)
--- 5388,5396 ----
  	/* and the buffers themselves */
  	size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
  
+ 	/* XLog insertion slots */
+ 	size = add_size(size, mul_size(sizeof(BackendXLogInsertSlot), NumXLogInsertSlots));
+ 
  	/*
  	 * Note: we don't count ControlFileData, it comes out of the "slop factor"
  	 * added by CreateSharedMemoryAndSemaphores.  This lets us use this
***************
*** 5059,5064 **** XLOGShmemInit(void)
--- 5406,5412 ----
  	bool		foundCFile,
  				foundXLog;
  	char	   *allocptr;
+ 	int			i;
  
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
***************
*** 5084,5089 **** XLOGShmemInit(void)
--- 5432,5445 ----
  	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
  	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
  
+ 	/* Initialize per-backend buffers */
+ 	XLogCtl->BackendXLogInsertSlots = (BackendXLogInsertSlot *) allocptr;
+ 	for (i = 0; i < NumXLogInsertSlots; i++)
+ 	{
+ 		XLogCtl->BackendXLogInsertSlots[i].CurrPos = InvalidXLogRecPtr;
+ 	}
+ 	allocptr += sizeof(BackendXLogInsertSlot) * NumXLogInsertSlots;
+ 
  	/*
  	 * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
  	 */
***************
*** 5098,5108 **** XLOGShmemInit(void)
  	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
  	XLogCtl->SharedRecoveryInProgress = true;
  	XLogCtl->SharedHotStandbyActive = false;
- 	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
  	SpinLockInit(&XLogCtl->info_lck);
  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
  	InitSharedLatch(&XLogCtl->WALWriterLatch);
  
  	/*
  	 * If we are not in bootstrap mode, pg_control should already exist. Read
  	 * and validate it immediately (see comments in ReadControlFile() for the
--- 5454,5465 ----
  	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
  	XLogCtl->SharedRecoveryInProgress = true;
  	XLogCtl->SharedHotStandbyActive = false;
  	SpinLockInit(&XLogCtl->info_lck);
  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
  	InitSharedLatch(&XLogCtl->WALWriterLatch);
  
+ 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
+ 
  	/*
  	 * If we are not in bootstrap mode, pg_control should already exist. Read
  	 * and validate it immediately (see comments in ReadControlFile() for the
***************
*** 5942,5947 **** StartupXLOG(void)
--- 6299,6305 ----
  	uint32		freespace;
  	TransactionId oldestActiveXID;
  	bool		backupEndRequired = false;
+ 	int			firstIdx;
  
  	/*
  	 * Read control file and check XLOG status looks valid.
***************
*** 6697,6704 **** StartupXLOG(void)
  	openLogOff = 0;
  	Insert = &XLogCtl->Insert;
  	Insert->PrevRecord = LastRec;
! 	XLogCtl->xlblocks[0].xlogid = openLogId;
! 	XLogCtl->xlblocks[0].xrecoff =
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
--- 7055,7066 ----
  	openLogOff = 0;
  	Insert = &XLogCtl->Insert;
  	Insert->PrevRecord = LastRec;
! 
! 	firstIdx = XLogRecPtrToBufIdx(EndOfLog);
! 	XLogCtl->curridx = firstIdx;
! 
! 	XLogCtl->xlblocks[firstIdx].xlogid = openLogId;
! 	XLogCtl->xlblocks[firstIdx].xrecoff =
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
***************
*** 6706,6731 **** StartupXLOG(void)
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
! 	Insert->currpos = (char *) Insert->currpage +
! 		(EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
  
  	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
  
  	XLogCtl->Write.LogwrtResult = LogwrtResult;
- 	Insert->LogwrtResult = LogwrtResult;
  	XLogCtl->LogwrtResult = LogwrtResult;
  
  	XLogCtl->LogwrtRqst.Write = EndOfLog;
  	XLogCtl->LogwrtRqst.Flush = EndOfLog;
  
! 	freespace = INSERT_FREESPACE(Insert);
  	if (freespace > 0)
  	{
  		/* Make sure rest of page is zero */
! 		MemSet(Insert->currpos, 0, freespace);
! 		XLogCtl->Write.curridx = 0;
  	}
  	else
  	{
--- 7068,7091 ----
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[firstIdx].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], readBuf, XLOG_BLCKSZ);
! 	Insert->CurrPos = EndOfLog;
  
  	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
  
  	XLogCtl->Write.LogwrtResult = LogwrtResult;
  	XLogCtl->LogwrtResult = LogwrtResult;
  
  	XLogCtl->LogwrtRqst.Write = EndOfLog;
  	XLogCtl->LogwrtRqst.Flush = EndOfLog;
  
! 	freespace = XLOG_BLCKSZ - EndRecPtr.xrecoff % XLOG_BLCKSZ;
  	if (freespace > 0)
  	{
  		/* Make sure rest of page is zero */
! 		MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndRecPtr.xrecoff % XLOG_BLCKSZ, 0, freespace);
! 		XLogCtl->Write.curridx = firstIdx;
  	}
  	else
  	{
***************
*** 6737,6743 **** StartupXLOG(void)
  		 * this is sufficient.	The first actual attempt to insert a log
  		 * record will advance the insert state.
  		 */
! 		XLogCtl->Write.curridx = NextBufIdx(0);
  	}
  
  	/* Pre-scan prepared transactions to find out the range of XIDs present */
--- 7097,7103 ----
  		 * this is sufficient.	The first actual attempt to insert a log
  		 * record will advance the insert state.
  		 */
! 		XLogCtl->Write.curridx = NextBufIdx(firstIdx);
  	}
  
  	/* Pre-scan prepared transactions to find out the range of XIDs present */
***************
*** 7231,7239 **** GetRedoRecPtr(void)
   *
   * NOTE: The value *actually* returned is the position of the last full
   * xlog page. It lags behind the real insert position by at most 1 page.
!  * For that, we don't need to acquire WALInsertLock which can be quite
   * heavily contended, and an approximation is enough for the current
   * usage of this function.
   */
  XLogRecPtr
  GetInsertRecPtr(void)
--- 7591,7603 ----
   *
   * NOTE: The value *actually* returned is the position of the last full
   * xlog page. It lags behind the real insert position by at most 1 page.
!  * For that, we don't need to acquire insertpos_lck which can be quite
   * heavily contended, and an approximation is enough for the current
   * usage of this function.
+  *
+  * XXX: now that there can be several insertions "in-flight", what should
+  * this return? The position a new insertion would got to? Or the the oldest
+  * still in-progress insertion, perhaps?
   */
  XLogRecPtr
  GetInsertRecPtr(void)
***************
*** 7507,7512 **** CreateCheckPoint(int flags)
--- 7871,7877 ----
  	uint32		insert_logSeg;
  	TransactionId *inCommitXids;
  	int			nInCommit;
+ 	XLogRecPtr	curInsert;
  
  	/*
  	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
***************
*** 7574,7584 **** CreateCheckPoint(int flags)
  	else
  		checkPoint.oldestActiveXid = InvalidTransactionId;
  
  	/*
! 	 * We must hold WALInsertLock while examining insert state to determine
! 	 * the checkpoint REDO pointer.
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  
  	/*
  	 * If this isn't a shutdown or forced checkpoint, and we have not switched
--- 7939,7950 ----
  	else
  		checkPoint.oldestActiveXid = InvalidTransactionId;
  
+ 
  	/*
! 	 * Determine the checkpoint REDO pointer.
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	curInsert = Insert->CurrPos;
  
  	/*
  	 * If this isn't a shutdown or forced checkpoint, and we have not switched
***************
*** 7590,7596 **** CreateCheckPoint(int flags)
  	 * (Perhaps it'd make even more sense to checkpoint only when the previous
  	 * checkpoint record is in a different xlog page?)
  	 *
! 	 * While holding the WALInsertLock we find the current WAL insertion point
  	 * and compare that with the starting point of the last checkpoint, which
  	 * is the redo pointer. We use the redo pointer because the start and end
  	 * points of a checkpoint can be hundreds of files apart on large systems
--- 7956,7962 ----
  	 * (Perhaps it'd make even more sense to checkpoint only when the previous
  	 * checkpoint record is in a different xlog page?)
  	 *
! 	 * While holding insertpos_lck we find the current WAL insertion point
  	 * and compare that with the starting point of the last checkpoint, which
  	 * is the redo pointer. We use the redo pointer because the start and end
  	 * points of a checkpoint can be hundreds of files apart on large systems
***************
*** 7599,7613 **** CreateCheckPoint(int flags)
  	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
  				  CHECKPOINT_FORCE)) == 0)
  	{
- 		XLogRecPtr	curInsert;
- 
- 		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
  		XLByteToSeg(curInsert, insert_logId, insert_logSeg);
  		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logId, redo_logSeg);
  		if (insert_logId == redo_logId &&
  			insert_logSeg == redo_logSeg)
  		{
! 			LWLockRelease(WALInsertLock);
  			LWLockRelease(CheckpointLock);
  			END_CRIT_SECTION();
  			return;
--- 7965,7976 ----
  	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
  				  CHECKPOINT_FORCE)) == 0)
  	{
  		XLByteToSeg(curInsert, insert_logId, insert_logSeg);
  		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logId, redo_logSeg);
  		if (insert_logId == redo_logId &&
  			insert_logSeg == redo_logSeg)
  		{
! 			SpinLockRelease(&Insert->insertpos_lck);
  			LWLockRelease(CheckpointLock);
  			END_CRIT_SECTION();
  			return;
***************
*** 7633,7646 **** CreateCheckPoint(int flags)
  	 * the buffer flush work.  Those XLOG records are logically after the
  	 * checkpoint, even though physically before it.  Got that?
  	 */
! 	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
! 	{
! 		(void) AdvanceXLInsertBuffer(false);
! 		/* OK to ignore update return flag, since we will do flush anyway */
! 		freespace = INSERT_FREESPACE(Insert);
! 	}
! 	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
  
  	/*
  	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
--- 7996,8005 ----
  	 * the buffer flush work.  Those XLOG records are logically after the
  	 * checkpoint, even though physically before it.  Got that?
  	 */
! 	freespace = XLOG_BLCKSZ - curInsert.xrecoff % XLOG_BLCKSZ;
  	if (freespace < SizeOfXLogRecord)
! 		curInsert = AdvanceXLogRecPtrToNextPage(curInsert);
! 	checkPoint.redo = curInsert;
  
  	/*
  	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
***************
*** 7666,7672 **** CreateCheckPoint(int flags)
  	 * Now we can release WAL insert lock, allowing other xacts to proceed
  	 * while we are flushing disk buffers.
  	 */
! 	LWLockRelease(WALInsertLock);
  
  	/*
  	 * If enabled, log checkpoint start.  We postpone this until now so as not
--- 8025,8031 ----
  	 * Now we can release WAL insert lock, allowing other xacts to proceed
  	 * while we are flushing disk buffers.
  	 */
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	/*
  	 * If enabled, log checkpoint start.  We postpone this until now so as not
***************
*** 7686,7692 **** CreateCheckPoint(int flags)
  	 * we wait till he's out of his commit critical section before proceeding.
  	 * See notes in RecordTransactionCommit().
  	 *
! 	 * Because we've already released WALInsertLock, this test is a bit fuzzy:
  	 * it is possible that we will wait for xacts we didn't really need to
  	 * wait for.  But the delay should be short and it seems better to make
  	 * checkpoint take a bit longer than to hold locks longer than necessary.
--- 8045,8051 ----
  	 * we wait till he's out of his commit critical section before proceeding.
  	 * See notes in RecordTransactionCommit().
  	 *
! 	 * Because we've already released insertpos_lck, this test is a bit fuzzy:
  	 * it is possible that we will wait for xacts we didn't really need to
  	 * wait for.  But the delay should be short and it seems better to make
  	 * checkpoint take a bit longer than to hold locks longer than necessary.
***************
*** 7798,7804 **** CreateCheckPoint(int flags)
  	 */
  	if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
  		ereport(PANIC,
! 				(errmsg("concurrent transaction log activity while database system is shutting down")));
  
  	/*
  	 * Select point at which we can truncate the log, which we base on the
--- 8157,8165 ----
  	 */
  	if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
  		ereport(PANIC,
! 				(errmsg("concurrent transaction log activity while database system is shutting down (redo %X/%X, ProcLastRecPtr %X/%X",
! 						checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
! 						ProcLastRecPtr.xlogid, ProcLastRecPtr.xrecoff)));
  
  	/*
  	 * Select point at which we can truncate the log, which we base on the
***************
*** 8053,8067 **** CreateRestartPoint(int flags)
  	 * the number of segments replayed since last restartpoint, and request a
  	 * restartpoint if it exceeds checkpoint_segments.
  	 *
! 	 * You need to hold WALInsertLock and info_lck to update it, although
! 	 * during recovery acquiring WALInsertLock is just pro forma, because
! 	 * there is no other processes updating Insert.RedoRecPtr.
  	 */
- 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	SpinLockAcquire(&xlogctl->info_lck);
  	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
  	SpinLockRelease(&xlogctl->info_lck);
- 	LWLockRelease(WALInsertLock);
  
  	/*
  	 * Prepare to accumulate statistics.
--- 8414,8425 ----
  	 * the number of segments replayed since last restartpoint, and request a
  	 * restartpoint if it exceeds checkpoint_segments.
  	 *
! 	 * You need to hold info_lck to update it. There is no other processes
! 	 * updating Insert.RedoRecPtr, so we don't need a lock to protect that.
  	 */
  	SpinLockAcquire(&xlogctl->info_lck);
  	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
  	SpinLockRelease(&xlogctl->info_lck);
  
  	/*
  	 * Prepare to accumulate statistics.
***************
*** 8816,8821 **** issue_xlog_fsync(int fd, uint32 log, uint32 seg)
--- 9174,9180 ----
  XLogRecPtr
  do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = (labelfile == NULL);
  	XLogRecPtr	checkpointloc;
  	XLogRecPtr	startpoint;
***************
*** 8865,8890 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  	 * since we expect that any pages not modified during the backup interval
  	 * must have been correctly captured by the backup.)
  	 *
! 	 * We must hold WALInsertLock to change the value of forcePageWrites, to
  	 * ensure adequate interlocking against XLogInsert().
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
  	{
! 		if (XLogCtl->Insert.exclusiveBackup)
  		{
! 			LWLockRelease(WALInsertLock);
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("a backup is already in progress"),
  					 errhint("Run pg_stop_backup() and try again.")));
  		}
! 		XLogCtl->Insert.exclusiveBackup = true;
  	}
  	else
! 		XLogCtl->Insert.nonExclusiveBackups++;
! 	XLogCtl->Insert.forcePageWrites = true;
! 	LWLockRelease(WALInsertLock);
  
  	/* Ensure we release forcePageWrites if fail below */
  	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
--- 9224,9249 ----
  	 * since we expect that any pages not modified during the backup interval
  	 * must have been correctly captured by the backup.)
  	 *
! 	 * We must hold insertpos_lck to change the value of forcePageWrites, to
  	 * ensure adequate interlocking against XLogInsert().
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
  	{
! 		if (Insert->exclusiveBackup)
  		{
! 			SpinLockRelease(&Insert->insertpos_lck);
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("a backup is already in progress"),
  					 errhint("Run pg_stop_backup() and try again.")));
  		}
! 		Insert->exclusiveBackup = true;
  	}
  	else
! 		Insert->nonExclusiveBackups++;
! 	Insert->forcePageWrites = true;
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	/* Ensure we release forcePageWrites if fail below */
  	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
***************
*** 8946,8958 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  			 * taking a checkpoint right after another is not that expensive
  			 * either because only few buffers have been dirtied yet.
  			 */
! 			LWLockAcquire(WALInsertLock, LW_SHARED);
! 			if (XLByteLT(XLogCtl->Insert.lastBackupStart, startpoint))
  			{
! 				XLogCtl->Insert.lastBackupStart = startpoint;
  				gotUniqueStartpoint = true;
  			}
! 			LWLockRelease(WALInsertLock);
  		} while (!gotUniqueStartpoint);
  
  		XLByteToSeg(startpoint, _logId, _logSeg);
--- 9305,9317 ----
  			 * taking a checkpoint right after another is not that expensive
  			 * either because only few buffers have been dirtied yet.
  			 */
! 			SpinLockAcquire(&Insert->insertpos_lck);
! 			if (XLByteLT(Insert->lastBackupStart, startpoint))
  			{
! 				Insert->lastBackupStart = startpoint;
  				gotUniqueStartpoint = true;
  			}
! 			SpinLockRelease(&Insert->insertpos_lck);
  		} while (!gotUniqueStartpoint);
  
  		XLByteToSeg(startpoint, _logId, _logSeg);
***************
*** 9034,9043 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  static void
  pg_start_backup_callback(int code, Datum arg)
  {
  	bool		exclusive = DatumGetBool(arg);
  
  	/* Update backup counters and forcePageWrites on failure */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
  	{
  		Assert(XLogCtl->Insert.exclusiveBackup);
--- 9393,9403 ----
  static void
  pg_start_backup_callback(int code, Datum arg)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = DatumGetBool(arg);
  
  	/* Update backup counters and forcePageWrites on failure */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
  	{
  		Assert(XLogCtl->Insert.exclusiveBackup);
***************
*** 9054,9060 **** pg_start_backup_callback(int code, Datum arg)
  	{
  		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  }
  
  /*
--- 9414,9420 ----
  	{
  		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  }
  
  /*
***************
*** 9067,9072 **** pg_start_backup_callback(int code, Datum arg)
--- 9427,9433 ----
  XLogRecPtr
  do_pg_stop_backup(char *labelfile, bool waitforarchive)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = (labelfile == NULL);
  	XLogRecPtr	startpoint;
  	XLogRecPtr	stoppoint;
***************
*** 9108,9116 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  	/*
  	 * OK to update backup counters and forcePageWrites
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
! 		XLogCtl->Insert.exclusiveBackup = false;
  	else
  	{
  		/*
--- 9469,9477 ----
  	/*
  	 * OK to update backup counters and forcePageWrites
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
! 		Insert->exclusiveBackup = false;
  	else
  	{
  		/*
***************
*** 9119,9134 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  		 * backups, it is expected that each do_pg_start_backup() call is
  		 * matched by exactly one do_pg_stop_backup() call.
  		 */
! 		Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 		XLogCtl->Insert.nonExclusiveBackups--;
  	}
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  
  	if (exclusive)
  	{
--- 9480,9495 ----
  		 * backups, it is expected that each do_pg_start_backup() call is
  		 * matched by exactly one do_pg_stop_backup() call.
  		 */
! 		Assert(Insert->nonExclusiveBackups > 0);
! 		Insert->nonExclusiveBackups--;
  	}
  
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	if (exclusive)
  	{
***************
*** 9330,9345 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  void
  do_pg_abort_backup(void)
  {
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
! 	Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 	XLogCtl->Insert.nonExclusiveBackups--;
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  }
  
  /*
--- 9691,9708 ----
  void
  do_pg_abort_backup(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	Assert(Insert->nonExclusiveBackups > 0);
! 	Insert->nonExclusiveBackups--;
! 
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  }
  
  /*
***************
*** 9391,9406 **** GetStandbyFlushRecPtr(void)
   * Get latest WAL insert pointer
   */
  XLogRecPtr
! GetXLogInsertRecPtr(bool needlock)
  {
! 	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	current_recptr;
  
! 	if (needlock)
! 		LWLockAcquire(WALInsertLock, LW_SHARED);
! 	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
! 	if (needlock)
! 		LWLockRelease(WALInsertLock);
  
  	return current_recptr;
  }
--- 9754,9767 ----
   * Get latest WAL insert pointer
   */
  XLogRecPtr
! GetXLogInsertRecPtr(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	current_recptr;
  
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	current_recptr = Insert->CurrPos;
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	return current_recptr;
  }
*** a/src/backend/access/transam/xlogfuncs.c
--- b/src/backend/access/transam/xlogfuncs.c
***************
*** 200,206 **** pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
  				 errmsg("recovery is in progress"),
  				 errhint("WAL control functions cannot be executed during recovery.")));
  
! 	current_recptr = GetXLogInsertRecPtr(true);
  
  	snprintf(location, sizeof(location), "%X/%X",
  			 current_recptr.xlogid, current_recptr.xrecoff);
--- 200,206 ----
  				 errmsg("recovery is in progress"),
  				 errhint("WAL control functions cannot be executed during recovery.")));
  
! 	current_recptr = GetXLogInsertRecPtr();
  
  	snprintf(location, sizeof(location), "%X/%X",
  			 current_recptr.xlogid, current_recptr.xrecoff);
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 288,294 **** extern bool XLogInsertAllowed(void);
  extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
  extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
  extern XLogRecPtr GetStandbyFlushRecPtr(void);
! extern XLogRecPtr GetXLogInsertRecPtr(bool needlock);
  extern XLogRecPtr GetXLogWriteRecPtr(void);
  extern bool RecoveryIsPaused(void);
  extern void SetRecoveryPause(bool recoveryPause);
--- 288,294 ----
  extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
  extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
  extern XLogRecPtr GetStandbyFlushRecPtr(void);
! extern XLogRecPtr GetXLogInsertRecPtr(void);
  extern XLogRecPtr GetXLogWriteRecPtr(void);
  extern bool RecoveryIsPaused(void);
  extern void SetRecoveryPause(bool recoveryPause);
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 53,59 **** typedef enum LWLockId
  	ProcArrayLock,
  	SInvalReadLock,
  	SInvalWriteLock,
! 	WALInsertLock,
  	WALWriteLock,
  	ControlFileLock,
  	CheckpointLock,
--- 53,59 ----
  	ProcArrayLock,
  	SInvalReadLock,
  	SInvalWriteLock,
! 	WALBufMappingLock,
  	WALWriteLock,
  	ControlFileLock,
  	CheckpointLock,
***************
*** 79,84 **** typedef enum LWLockId
--- 79,85 ----
  	SerializablePredicateLockListLock,
  	OldSerXidLock,
  	SyncRepLock,
+ 	WALAuxSlotLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
