*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 42,47 ****
--- 42,48 ----
  #include "postmaster/startup.h"
  #include "replication/walreceiver.h"
  #include "replication/walsender.h"
+ #include "storage/barrier.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
***************
*** 262,268 **** XLogRecPtr	XactLastRecEnd = {0, 0};
   * CHECKPOINT record).	We update this from the shared-memory copy,
   * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
   * hold the Insert lock).  See XLogInsert for details.	We are also allowed
!  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
   * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
   * InitXLOGAccess.
   */
--- 263,269 ----
   * CHECKPOINT record).	We update this from the shared-memory copy,
   * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
   * hold the Insert lock).  See XLogInsert for details.	We are also allowed
!  * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
   * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
   * InitXLOGAccess.
   */
***************
*** 300,309 **** static XLogRecPtr RedoStartLSN = {0, 0};
   * (protected by info_lck), but we don't need to cache any copies of it.
   *
   * info_lck is only held long enough to read/update the protected variables,
!  * so it's a plain spinlock.  The other locks are held longer (potentially
!  * over I/O operations), so we use LWLocks for them.  These locks are:
   *
!  * WALInsertLock: must be held to insert a record into the WAL buffers.
   *
   * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
   * XLogFlush).
--- 301,315 ----
   * (protected by info_lck), but we don't need to cache any copies of it.
   *
   * info_lck is only held long enough to read/update the protected variables,
!  * so it's a plain spinlock.  insertpos_lck protects the current logical
!  * insert location, ie. the head of reserved WAL space.  The other locks are
!  * held longer (potentially over I/O operations), so we use LWLocks for them.
!  * These locks are:
   *
!  * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
!  * This is only held while initializing and changing the mapping. If the
!  * contents of the buffer being replaced haven't been written yet, the mapping
!  * lock is released while the write is done, and reacquired afterwards.
   *
   * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
   * XLogFlush).
***************
*** 315,320 **** static XLogRecPtr RedoStartLSN = {0, 0};
--- 321,393 ----
   * only one checkpointer at a time; currently, with all checkpoints done by
   * the checkpointer, this is just pro forma).
   *
+  *
+  * Inserting a new WAL record is a two-step process:
+  *
+  * 1. Reserve the right amount of space from the WAL, and the next insertion
+  *    slot to advertise that the insertion is in progress. The current head
+  *    of reserved space is kept in Insert->CurrPos, and is protected by
+  *    insertpos_lck. Try to keep this section as short as possible,
+  *    insertpos_lck can be heavily contended on a busy system
+  *
+  * 2. Copy the record to the reserved WAL space. This involves finding the
+  *    correct WAL buffer containing the reserved space, and copying the
+  *    record in place. This can be done concurrently in multiple processes.
+  *
+  * To allow as much parallelism as possible for step 2, we try hard to avoid
+  * lock contention in that code path. Each insertion is asssigned its own
+  * "XLog insertion slot", which is used to advertise the position the backend
+  * is writing to. The slot is marked as in-use in step 1, while holding
+  * insertpos_lck, by setting the position field in the slot. When the backend
+  * is finished with the insertion, it clears its slot. Each slot is protected
+  * by a separate spinlock, to keep contention minimal.
+  *
+  * The insertion slots also provide a mechanism to wait for an insertion to
+  * finish. This is important when an XLOG page is written out - any
+  * in-progress insertions must finish copying data to the page first, or the
+  * on-disk copy will be incomplete. Waiting is done by the
+  * WaitXLogInsertionsToFinish() function. It adds the current process to the
+  * waiting queue in the slot it needs to wait for, and when that insertion
+  * finishes (or proceeds to the next page, at least), the inserter wakes up
+  * the process.
+  *
+  * The insertion slots form a ring. Insert->nextslot points to the next free
+  * slot, and Insert->lastslot points to the last slot that's still in use.
+  * lastslot can lag behind reality by any number of slots, as long as nextslot
+  * doesn't catch up with it. lastslot is advanced by
+  * WaitXLogInsertionsToFinish(), and is protected by WALInsertTailLock.
+  * nextslot is advanced in ReserveXLogInsertLocation() and is protected by
+  * insertpos_lck. Both slot variables are 32-bit integers, so that they can
+  * be read atomically without holding a lock. nextslot == lastslot means that
+  * all the slots are empty.
+  *
+  * Whenever the ring fills up, ie. when nextslot wraps around and catches up
+  * with lastslot, ReserveXLogInsertLocation() has to wait for the oldest
+  * insertion to finish and advance lastslot, to make room for the new
+  * insertion. This is also handled by WaitXLogInsertionsToFinish().
+  *
+  *
+  * Deadlock analysis
+  * -----------------
+  *
+  * It's important to call WaitXLogInsertionsToFinish() *before* acquiring
+  * WALWriteLock. Otherwise you might get stuck waiting for an insertion to
+  * finish (or at least advance to next uninitialized page), while you're
+  * holding WALWriteLock. That would be bad, because the backend you're waiting
+  * for might need to acquire WALWriteLock, too, to evict an old buffer, so
+  * you'd get deadlock.
+  *
+  * WaitXLogInsertionsToFinish() will not get stuck indefinitely, as long as
+  * it's called with a location that's known to be already allocated in the WAL
+  * buffers. Calling it with the position of a record you've already inserted
+  * satisfies that condition, so the common pattern:
+  *
+  *   recptr = XLogInsert(...)
+  *   XLogFlush(recptr)
+  *
+  * is safe. It can't get stuck, because an insertion to a WAL page that's
+  * already initialized in cache can always proceed without waiting on a lock.
+  *
   *----------
   */
  
***************
*** 335,344 **** typedef struct XLogwrtResult
   */
  typedef struct XLogCtlInsert
  {
! 	XLogRecPtr	PrevRecord;		/* start of previously-inserted record */
! 	int			curridx;		/* current block index in cache */
! 	XLogPageHeader currpage;	/* points to header of block in cache */
! 	char	   *currpos;		/* current insertion point in cache */
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  
--- 408,429 ----
   */
  typedef struct XLogCtlInsert
  {
! 	slock_t		insertpos_lck;	/* protects all the fields in this struct
! 								 * (except lastslot). */
! 
! 	int32		nextslot;		/* next insertion slot to use */
! 	int32		lastslot;		/* last in-use insertion slot (protected by
! 								 * WALInsertTailLock) */
! 
! 	/*
! 	 * CurrPos is the very tip of the reserved WAL space at the moment.
! 	 * The next record will be inserted there (or somewhere after it if
! 	 * there's not enough space on the current page).  PrevRecord points to
! 	 * the beginning of the last record already reserved.  It might not be
! 	 * fully copied into place yet, but we know its exact location already.
! 	 */
! 	XLogRecPtr	CurrPos;
! 	XLogRecPtr	PrevRecord;
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  
***************
*** 372,387 **** typedef struct XLogCtlWrite
  	pg_time_t	lastSegSwitchTime;		/* time of last xlog segment switch */
  } XLogCtlWrite;
  
  /*
   * Total shared-memory state for XLOG.
   */
  typedef struct XLogCtlData
  {
! 	/* Protected by WALInsertLock: */
  	XLogCtlInsert Insert;
  
  	/* Protected by info_lck: */
  	XLogwrtRqst LogwrtRqst;
  	uint32		ckptXidEpoch;	/* nextXID & epoch of latest checkpoint */
  	TransactionId ckptXid;
  	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
--- 457,489 ----
  	pg_time_t	lastSegSwitchTime;		/* time of last xlog segment switch */
  } XLogCtlWrite;
  
+ 
+ /*
+  * Slots for in-progress WAL insertions.
+  */
+ typedef struct
+ {
+ 	slock_t		lck;
+ 	XLogRecPtr	CurrPos;	/* current position this process is inserting to */
+ 	PGPROC	   *head;		/* head of list of waiting PGPROCs */
+ 	PGPROC	   *tail;		/* tail of list of waiting PGPROCs */
+ } XLogInsertSlot;
+ 
+ #define NumXLogInsertSlots	512
+ 
  /*
   * Total shared-memory state for XLOG.
   */
  typedef struct XLogCtlData
  {
! 	/* Protected by insertpos_lck: */
  	XLogCtlInsert Insert;
  
+ 	XLogInsertSlot XLogInsertSlots[NumXLogInsertSlots];
+ 
  	/* Protected by info_lck: */
  	XLogwrtRqst LogwrtRqst;
+ 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
  	uint32		ckptXidEpoch;	/* nextXID & epoch of latest checkpoint */
  	TransactionId ckptXid;
  	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
***************
*** 398,406 **** typedef struct XLogCtlData
  	XLogwrtResult LogwrtResult;
  
  	/*
  	 * These values do not change after startup, although the pointed-to pages
  	 * and xlblocks values certainly do.  Permission to read/write the pages
! 	 * and xlblocks values depends on WALInsertLock and WALWriteLock.
  	 */
  	char	   *pages;			/* buffers for unwritten XLOG pages */
  	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
--- 500,517 ----
  	XLogwrtResult LogwrtResult;
  
  	/*
+ 	 * To change curridx and the identity of a buffer, you need to hold
+ 	 * WALBufMappingLock.  To change the identity of a buffer that's still
+ 	 * dirty, the old page needs to be written out first, and for that you
+ 	 * need WALWriteLock, and you need to ensure that there's no in-progress
+ 	 * insertions to the page by calling WaitXLogInsertionsToFinish().
+ 	 */
+ 	int			curridx;		/* latest initialized block index in cache */
+ 
+ 	/*
  	 * These values do not change after startup, although the pointed-to pages
  	 * and xlblocks values certainly do.  Permission to read/write the pages
! 	 * and xlblocks values depends on WALBufMappingLock and WALWriteLock.
  	 */
  	char	   *pages;			/* buffers for unwritten XLOG pages */
  	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
***************
*** 478,505 **** static XLogCtlData *XLogCtl = NULL;
  static ControlFileData *ControlFile = NULL;
  
  /*
!  * Macros for managing XLogInsert state.  In most cases, the calling routine
!  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
!  * so these are passed as parameters instead of being fetched via XLogCtl.
   */
  
! /* Free space remaining in the current xlog page buffer */
! #define INSERT_FREESPACE(Insert)  \
! 	(XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
  
! /* Construct XLogRecPtr value for current insertion point */
! #define INSERT_RECPTR(recptr,Insert,curridx)  \
! 	( \
! 	  (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
! 	  (recptr).xrecoff = \
! 		XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
! 	)
  
! #define PrevBufIdx(idx)		\
! 		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
  
! #define NextBufIdx(idx)		\
! 		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
  /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
--- 589,620 ----
  static ControlFileData *ControlFile = NULL;
  
  /*
!  * Calculate the amount of space left on the page after 'endptr'.
!  * Beware multiple evaluation!
   */
+ #define INSERT_FREESPACE(endptr)	\
+ 	(((endptr).xrecoff % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr).xrecoff % XLOG_BLCKSZ))
  
! /*
!  * Macros to advance to next buffer index and insertion slot.
!  */
! #define NextBufIdx(idx)		\
! 		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
! #define NextSlotNo(idx)		(((idx) + 1) % NumXLogInsertSlots)
  
! /*
!  * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
!  * would hold if it was in cache, the page containing 'recptr'.
!  *
!  * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
!  * page is taken to mean the previous page.
!  */
! #define XLogRecPtrToBufIdx(recptr)	\
! 	((((((uint64) (recptr).xlogid * (uint64) XLogFileSize) + (recptr).xrecoff)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
  
! #define XLogRecEndPtrToBufIdx(recptr)	\
! 	((((((uint64) (recptr).xlogid * (uint64) XLogFileSize) + (recptr).xrecoff - 1)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
  
  /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
***************
*** 625,633 **** static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg);
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(bool new_segment);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
! static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
  static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
--- 740,748 ----
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
! static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
  static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
***************
*** 674,679 **** static bool read_backup_label(XLogRecPtr *checkPointLoc,
--- 789,810 ----
  static void rm_redo_error_callback(void *arg);
  static int	get_sync_bit(int method);
  
+ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+ 				  XLogRecord *rechdr,
+ 				  XLogRecData *rdata, pg_crc32 rdata_crc,
+ 				  XLogInsertSlot *myslot,
+ 				  XLogRecPtr StartPos, XLogRecPtr EndPos);
+ static bool ReserveXLogInsertLocation(int size, bool forcePageWrites,
+ 						  bool isLogSwitch,
+ 						  XLogRecPtr *PrevRecord_p, XLogRecPtr *StartPos_p,
+ 						  XLogRecPtr *EndPos_p,
+ 						  XLogInsertSlot **myslot_p, bool *updrqst_p);
+ static void UpdateSlotCurrPos(volatile XLogInsertSlot *myslot,
+ 				  XLogRecPtr CurrPos);
+ static void	ReuseOldSlots(void);
+ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+ static char *GetXLogBuffer(XLogRecPtr ptr);
+ 
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 694,705 **** XLogRecPtr
  XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
- 	XLogRecord *record;
- 	XLogContRecord *contrecord;
- 	XLogRecPtr	RecPtr;
- 	XLogRecPtr	WriteRqst;
- 	uint32		freespace;
- 	int			curridx;
  	XLogRecData *rdt;
  	XLogRecData *rdt_lastnormal;
  	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
--- 825,830 ----
***************
*** 717,722 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
--- 842,852 ----
  	bool		doPageWrites;
  	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
  	uint8		info_orig = info;
+ 	XLogRecord	rechdr;
+ 	XLogRecPtr	PrevRecord;
+ 	XLogRecPtr	StartPos;
+ 	XLogRecPtr	EndPos;
+ 	XLogInsertSlot *myslot;
  
  	/* cross-check on whether we should be here or not */
  	if (!XLogInsertAllowed())
***************
*** 734,742 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  	 */
  	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
  	{
! 		RecPtr.xlogid = 0;
! 		RecPtr.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
! 		return RecPtr;
  	}
  
  	/*
--- 864,872 ----
  	 */
  	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
  	{
! 		EndPos.xlogid = 0;
! 		EndPos.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
! 		return EndPos;
  	}
  
  	/*
***************
*** 903,1035 **** begin:;
  	for (rdt = rdata; rdt != NULL; rdt = rdt->next)
  		COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  
! 	START_CRIT_SECTION();
  
! 	/* Now wait to get insert lock */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  
  	/*
! 	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go
! 	 * back and recompute everything.  This can only happen just after a
! 	 * checkpoint, so it's better to be slow in this case and fast otherwise.
! 	 *
! 	 * If we aren't doing full-page writes then RedoRecPtr doesn't actually
! 	 * affect the contents of the XLOG record, so we'll update our local copy
! 	 * but not force a recomputation.
  	 */
! 	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
  	{
! 		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
! 		RedoRecPtr = Insert->RedoRecPtr;
  
! 		if (doPageWrites)
  		{
! 			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
! 			{
! 				if (dtbuf[i] == InvalidBuffer)
! 					continue;
! 				if (dtbuf_bkp[i] == false &&
! 					XLByteLE(dtbuf_lsn[i], RedoRecPtr))
! 				{
! 					/*
! 					 * Oops, this buffer now needs to be backed up, but we
! 					 * didn't think so above.  Start over.
! 					 */
! 					LWLockRelease(WALInsertLock);
! 					END_CRIT_SECTION();
! 					rdt_lastnormal->next = NULL;
! 					info = info_orig;
! 					goto begin;
! 				}
! 			}
  		}
  	}
! 
! 	/*
! 	 * Also check to see if fullPageWrites or forcePageWrites was just turned on;
! 	 * if we weren't already doing full-page writes then go back and recompute.
! 	 * (If it was just turned off, we could recompute the record without full pages,
! 	 * but we choose not to bother.)
! 	 */
! 	if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
  	{
! 		/* Oops, must redo it with full-page data. */
! 		LWLockRelease(WALInsertLock);
! 		END_CRIT_SECTION();
! 		rdt_lastnormal->next = NULL;
! 		info = info_orig;
! 		goto begin;
  	}
  
  	/*
! 	 * If there isn't enough space on the current XLOG page for a record
! 	 * header, advance to the next page (leaving the unused space as zeroes).
  	 */
! 	updrqst = false;
! 	freespace = INSERT_FREESPACE(Insert);
! 	if (freespace < SizeOfXLogRecord)
  	{
! 		updrqst = AdvanceXLInsertBuffer(false);
! 		freespace = INSERT_FREESPACE(Insert);
! 	}
  
! 	/* Compute record's XLOG location */
! 	curridx = Insert->curridx;
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
  
  	/*
! 	 * If the record is an XLOG_SWITCH, and we are exactly at the start of a
! 	 * segment, we need not insert it (and don't want to because we'd like
! 	 * consecutive switch requests to be no-ops).  Instead, make sure
! 	 * everything is written and flushed through the end of the prior segment,
! 	 * and return the prior segment's end address.
  	 */
! 	if (isLogSwitch &&
! 		(RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
  	{
! 		/* We can release insert lock immediately */
! 		LWLockRelease(WALInsertLock);
! 
! 		RecPtr.xrecoff -= SizeOfXLogLongPHD;
! 		if (RecPtr.xrecoff == 0)
! 		{
! 			/* crossing a logid boundary */
! 			RecPtr.xlogid -= 1;
! 			RecPtr.xrecoff = XLogFileSize;
! 		}
! 
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 		LogwrtResult = XLogCtl->LogwrtResult;
! 		if (!XLByteLE(RecPtr, LogwrtResult.Flush))
! 		{
! 			XLogwrtRqst FlushRqst;
! 
! 			FlushRqst.Write = RecPtr;
! 			FlushRqst.Flush = RecPtr;
! 			XLogWrite(FlushRqst, false, false);
! 		}
! 		LWLockRelease(WALWriteLock);
  
! 		END_CRIT_SECTION();
  
! 		return RecPtr;
  	}
  
! 	/* Insert record header */
! 
! 	record = (XLogRecord *) Insert->currpos;
! 	record->xl_prev = Insert->PrevRecord;
! 	record->xl_xid = GetCurrentTransactionIdIfAny();
! 	record->xl_tot_len = SizeOfXLogRecord + write_len;
! 	record->xl_len = len;		/* doesn't include backup blocks */
! 	record->xl_info = info;
! 	record->xl_rmid = rmid;
! 
! 	/* Now we can finish computing the record's CRC */
! 	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
! 			   SizeOfXLogRecord - sizeof(pg_crc32));
! 	FIN_CRC32(rdata_crc);
! 	record->xl_crc = rdata_crc;
  
  #ifdef WAL_DEBUG
  	if (XLOG_DEBUG)
--- 1033,1138 ----
  	for (rdt = rdata; rdt != NULL; rdt = rdt->next)
  		COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  
! 	/* Construct record header. */
! 	MemSet(&rechdr, 0, sizeof(rechdr));
! 	/* rechdr.xl_prev is set later */
! 	rechdr.xl_xid = GetCurrentTransactionIdIfAny();
! 	rechdr.xl_tot_len = SizeOfXLogRecord + write_len;
! 	rechdr.xl_len = len;		/* doesn't include backup blocks */
! 	rechdr.xl_info = info;
! 	rechdr.xl_rmid = rmid;
  
! 	START_CRIT_SECTION();
  
  	/*
! 	 * Try to reserve space for the record from the WAL.
  	 */
! 	if (!ReserveXLogInsertLocation(write_len, doPageWrites, isLogSwitch,
! 								   &PrevRecord, &StartPos, &EndPos,
! 								   (XLogInsertSlot **) &myslot, &updrqst))
  	{
! 		/*
! 		 * Reservation failed. This could be because the record was an
! 		 * XLOG_SWITCH, and we're exactly at the start of a segment. In that
! 		 * case we need not insert it (and don't want to because we'd like
! 		 * consecutive switch requests to be no-ops).  Instead, make sure
! 		 * everything is written and flushed through the end of the prior
! 		 * segment, and return the prior segment's end address.
! 		 *
! 		 * The other reason for failure is that someone changed RedoRecPtr
! 		 * or forcePageWrites after we had constructed our WAL record. In
! 		 * that case we need to redo it with full-page data.
! 		 */
! 		END_CRIT_SECTION();
  
! 		if (isLogSwitch && !XLogRecPtrIsInvalid(EndPos))
  		{
! 			XLogFlush(EndPos);
! 			return EndPos;
! 		}
! 		else
! 		{
! 			rdt_lastnormal->next = NULL;
! 			info = info_orig;
! 			goto begin;
  		}
  	}
! 	else
  	{
! 		/*
! 		 * Reservation succeeded.  Finish the record header by setting
! 		 * prev-link (now that we know it), and finish computing the record's
! 		 * CRC (in CopyXLogRecordToWAL).  Then copy the record to the space
! 		 * we reserved.
! 		 */
! 		rechdr.xl_prev = PrevRecord;
! 		CopyXLogRecordToWAL(write_len, isLogSwitch, &rechdr,
! 							rdata, rdata_crc, myslot, StartPos, EndPos);
  	}
+ 	END_CRIT_SECTION();
  
  	/*
! 	 * Update shared LogwrtRqst.Write, if we crossed page boundary.
  	 */
! 	if (updrqst)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile XLogCtlData *xlogctl = XLogCtl;
  
! 		SpinLockAcquire(&xlogctl->info_lck);
! 		/* advance global request to include new block(s) */
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, EndPos))
! 			xlogctl->LogwrtRqst.Write = EndPos;
! 		/* update local result copy while I have the chance */
! 		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
! 	}
  
  	/*
! 	 * If this was an XLOG_SWITCH record, flush the record and the empty
! 	 * padding space that fills the rest of the segment, and perform
! 	 * end-of-segment actions (eg, notifying archiver).
  	 */
! 	if (isLogSwitch)
  	{
! 		TRACE_POSTGRESQL_XLOG_SWITCH();
  
! 		XLogFlush(EndPos);
  
! 		/*
! 		 * Even though we reserved the rest of the segment for us, which is
! 		 * reflected in EndPos, we return a pointer to just the end of the
! 		 * xlog-switch record.
! 		 */
! 		EndPos.xlogid = StartPos.xlogid;
! 		EndPos.xrecoff = StartPos.xrecoff + SizeOfXLogRecord;
  	}
  
! 	/*
! 	 * Update our global variables
! 	 */
! 	ProcLastRecPtr = StartPos;
! 	XactLastRecEnd = EndPos;
  
  #ifdef WAL_DEBUG
  	if (XLOG_DEBUG)
***************
*** 1038,1219 **** begin:;
  
  		initStringInfo(&buf);
  		appendStringInfo(&buf, "INSERT @ %X/%X: ",
! 						 RecPtr.xlogid, RecPtr.xrecoff);
! 		xlog_outrec(&buf, record);
  		if (rdata->data != NULL)
  		{
  			appendStringInfo(&buf, " - ");
! 			RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
  		}
  		elog(LOG, "%s", buf.data);
  		pfree(buf.data);
  	}
  #endif
  
- 	/* Record begin of record in appropriate places */
- 	ProcLastRecPtr = RecPtr;
- 	Insert->PrevRecord = RecPtr;
- 
- 	Insert->currpos += SizeOfXLogRecord;
- 	freespace -= SizeOfXLogRecord;
- 
  	/*
! 	 * Append the data, including backup blocks if any
  	 */
! 	while (write_len)
! 	{
! 		while (rdata->data == NULL)
! 			rdata = rdata->next;
  
! 		if (freespace > 0)
  		{
! 			if (rdata->len > freespace)
  			{
! 				memcpy(Insert->currpos, rdata->data, freespace);
  				rdata->data += freespace;
  				rdata->len -= freespace;
! 				write_len -= freespace;
! 			}
! 			else
! 			{
! 				memcpy(Insert->currpos, rdata->data, rdata->len);
! 				freespace -= rdata->len;
! 				write_len -= rdata->len;
! 				Insert->currpos += rdata->len;
! 				rdata = rdata->next;
! 				continue;
  			}
  		}
  
! 		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer(false);
! 		curridx = Insert->curridx;
! 		/* Insert cont-record header */
! 		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
! 		contrecord = (XLogContRecord *) Insert->currpos;
! 		contrecord->xl_rem_len = write_len;
! 		Insert->currpos += SizeOfXLogContRecord;
! 		freespace = INSERT_FREESPACE(Insert);
  	}
  
! 	/* Ensure next record will be properly aligned */
! 	Insert->currpos = (char *) Insert->currpage +
! 		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
! 	freespace = INSERT_FREESPACE(Insert);
  
  	/*
! 	 * The recptr I return is the beginning of the *next* record. This will be
! 	 * stored as LSN for changed data pages...
  	 */
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
  
  	/*
! 	 * If the record is an XLOG_SWITCH, we must now write and flush all the
! 	 * existing data, and then forcibly advance to the start of the next
! 	 * segment.  It's not good to do this I/O while holding the insert lock,
! 	 * but there seems too much risk of confusion if we try to release the
! 	 * lock sooner.  Fortunately xlog switch needn't be a high-performance
! 	 * operation anyway...
  	 */
! 	if (isLogSwitch)
! 	{
! 		XLogwrtRqst FlushRqst;
! 		XLogRecPtr	OldSegEnd;
  
! 		TRACE_POSTGRESQL_XLOG_SWITCH();
  
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  
  		/*
! 		 * Flush through the end of the page containing XLOG_SWITCH, and
! 		 * perform end-of-segment actions (eg, notifying archiver).
  		 */
! 		WriteRqst = XLogCtl->xlblocks[curridx];
! 		FlushRqst.Write = WriteRqst;
! 		FlushRqst.Flush = WriteRqst;
! 		XLogWrite(FlushRqst, false, true);
! 
! 		/* Set up the next buffer as first page of next segment */
! 		/* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
! 		(void) AdvanceXLInsertBuffer(true);
! 
! 		/* There should be no unwritten data */
! 		curridx = Insert->curridx;
! 		Assert(curridx == XLogCtl->Write.curridx);
! 
! 		/* Compute end address of old segment */
! 		OldSegEnd = XLogCtl->xlblocks[curridx];
! 		OldSegEnd.xrecoff -= XLOG_BLCKSZ;
! 		if (OldSegEnd.xrecoff == 0)
! 		{
! 			/* crossing a logid boundary */
! 			OldSegEnd.xlogid -= 1;
! 			OldSegEnd.xrecoff = XLogFileSize;
! 		}
  
! 		/* Make it look like we've written and synced all of old segment */
! 		LogwrtResult.Write = OldSegEnd;
! 		LogwrtResult.Flush = OldSegEnd;
  
  		/*
! 		 * Update shared-memory status --- this code should match XLogWrite
  		 */
  		{
! 			/* use volatile pointer to prevent code rearrangement */
! 			volatile XLogCtlData *xlogctl = XLogCtl;
  
! 			SpinLockAcquire(&xlogctl->info_lck);
! 			xlogctl->LogwrtResult = LogwrtResult;
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
! 				xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
! 			if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
! 				xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
! 			SpinLockRelease(&xlogctl->info_lck);
! 		}
  
! 		LWLockRelease(WALWriteLock);
  
! 		updrqst = false;		/* done already */
  	}
  	else
  	{
! 		/* normal case, ie not xlog switch */
  
! 		/* Need to update shared LogwrtRqst if some block was filled up */
! 		if (freespace < SizeOfXLogRecord)
  		{
! 			/* curridx is filled and available for writing out */
  			updrqst = true;
  		}
! 		else
  		{
! 			/* if updrqst already set, write through end of previous buf */
! 			curridx = PrevBufIdx(curridx);
  		}
- 		WriteRqst = XLogCtl->xlblocks[curridx];
  	}
  
! 	LWLockRelease(WALInsertLock);
  
! 	if (updrqst)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile XLogCtlData *xlogctl = XLogCtl;
  
! 		SpinLockAcquire(&xlogctl->info_lck);
! 		/* advance global request to include new block(s) */
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
! 			xlogctl->LogwrtRqst.Write = WriteRqst;
! 		/* update local result copy while I have the chance */
! 		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}
  
! 	XactLastRecEnd = RecPtr;
  
! 	END_CRIT_SECTION();
  
! 	return RecPtr;
  }
  
  /*
--- 1141,1898 ----
  
  		initStringInfo(&buf);
  		appendStringInfo(&buf, "INSERT @ %X/%X: ",
! 						 EndPos.xlogid, EndPos.xrecoff);
! 		xlog_outrec(&buf, &rechdr);
  		if (rdata->data != NULL)
  		{
  			appendStringInfo(&buf, " - ");
! 			RmgrTable[rmid].rm_desc(&buf, rechdr.xl_info, rdata->data);
  		}
  		elog(LOG, "%s", buf.data);
  		pfree(buf.data);
  	}
  #endif
  
  	/*
! 	 * The recptr I return is the beginning of the *next* record. This will
! 	 * be stored as LSN for changed data pages...
  	 */
! 	return EndPos;
! }
! 
! /*
!  * Subroutine of XLogInsert.  Copies a WAL record to an already-reserved
!  * area in the WAL.
!  */
! static void
! CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecord *rechdr,
! 					XLogRecData *rdata, pg_crc32 rdata_crc,
! 					XLogInsertSlot *myslot_p,
! 					XLogRecPtr StartPos, XLogRecPtr EndPos)
! {
! 	volatile XLogInsertSlot *myslot = myslot_p;
! 	char	   *currpos;
! 	XLogRecord *record;
! 	int			freespace;
! 	int			written;
! 	XLogRecPtr	CurrPos;
! 
! 	/* Get the right WAL page to start inserting to */
! 	CurrPos = StartPos;
! 	currpos = GetXLogBuffer(CurrPos);
! 
! 	/* Copy the record header in place, and finish calculating CRC */
! 	record = (XLogRecord *) currpos;
! 	memcpy(record, rechdr, sizeof(XLogRecord));
! 	COMP_CRC32(rdata_crc, currpos + sizeof(pg_crc32),
! 			   SizeOfXLogRecord - sizeof(pg_crc32));
! 	FIN_CRC32(rdata_crc);
! 	record->xl_crc = rdata_crc;
! 
! 	currpos += SizeOfXLogRecord;
! 	XLByteAdvance(CurrPos, SizeOfXLogRecord);
  
! 	freespace = INSERT_FREESPACE(CurrPos);
! 
! 	if (!isLogSwitch)
! 	{
! 		/* Copy record data */
! 		written = 0;
! 		while (rdata != NULL)
  		{
! 			while (rdata->len > freespace)
  			{
! 				/*
! 				 * Write what fits on this page, then write the continuation
! 				 * record, and continue on the next page.
! 				 */
! 				XLogContRecord *contrecord;
! 
! 				memcpy(currpos, rdata->data, freespace);
  				rdata->data += freespace;
  				rdata->len -= freespace;
! 				written += freespace;
! 				XLByteAdvance(CurrPos, freespace);
! 
! 				/*
! 				 * CurrPos now points to the page boundary, ie. the first byte
! 				 * of the next page. Advertise that position in our insertion
! 				 * slot before calling GetXLogBuffer(), because GetXLogBuffer()
! 				 * might need to wait for some insertions to finish so that it
! 				 * can write out a buffer to make room for the new page.
! 				 * Updating the slot before waiting for a new buffer ensures
! 				 * that we don't deadlock with ourselves if we run out of
! 				 * clean buffers.
! 				 *
! 				 * Note that we must not advance CurrPos past the page header
! 				 * yet, otherwise someone might try to flush up to that point,
! 				 * which would fail if the next page was not initialized yet.
! 				 */
! 				UpdateSlotCurrPos(myslot, CurrPos);
! 
! 				/*
! 				 * Get pointer to beginning of next page, and set the
! 				 * XLP_FIRST_IS_CONTRECORD flag in the page header.
! 				 *
! 				 * It's safe to set the contrecord flag without a lock on the
! 				 * page. All the other flags are set in AdvanceXLInsertBuffer,
! 				 * and we're the only backend that needs to set the contrecord
! 				 * flag.
! 				 */
! 				currpos = GetXLogBuffer(CurrPos);
! 				((XLogPageHeader) currpos)->xlp_info |= XLP_FIRST_IS_CONTRECORD;
! 
! 				/* skip over the page header, and write continuation record */
! 				if (CurrPos.xrecoff % XLogSegSize == 0)
! 				{
! 					CurrPos.xrecoff += SizeOfXLogLongPHD;
! 					currpos += SizeOfXLogLongPHD;
! 				}
! 				else
! 				{
! 					CurrPos.xrecoff += SizeOfXLogShortPHD;
! 					currpos += SizeOfXLogShortPHD;
! 				}
! 				contrecord = (XLogContRecord *) currpos;
! 				contrecord->xl_rem_len = write_len - written;
! 
! 				currpos += SizeOfXLogContRecord;
! 				CurrPos.xrecoff += SizeOfXLogContRecord;
! 
! 				freespace = INSERT_FREESPACE(CurrPos);
  			}
+ 
+ 			memcpy(currpos, rdata->data, rdata->len);
+ 			currpos += rdata->len;
+ 			XLByteAdvance(CurrPos, rdata->len);
+ 			freespace -= rdata->len;
+ 			written += rdata->len;
+ 
+ 			rdata = rdata->next;
  		}
+ 		Assert(written == write_len);
  
! 		/* Align the end position, so that the next record starts aligned */
! 		CurrPos.xrecoff = MAXALIGN(CurrPos.xrecoff);
! 		if (CurrPos.xrecoff >= XLogFileSize)
! 		{
! 			/* crossed a logid boundary */
! 			CurrPos.xlogid += 1;
! 			CurrPos.xrecoff = 0;
! 		}
! 
! 		if (!XLByteEQ(CurrPos, EndPos))
! 			elog(PANIC, "space reserved for WAL record does not match what was written");
  	}
+ 	else
+ 	{
+ 		/* An xlog-switch record doesn't contain any data besides the header */
+ 		Assert(write_len == 0);
+ 
+ 		/*
+ 		 * An xlog-switch record consumes all the remaining space on the
+ 		 * WAL segment. We have already reserved it for us, but we still need
+ 		 * to make sure it's been allocated and zeroed in the WAL buffers so
+ 		 * that when the caller (or someone else) does XLogWrite(), it can
+ 		 * really write out all the zeros.
+ 		 *
+ 		 * We do this one page at a time, to make sure we don't deadlock
+ 		 * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+ 		 */
+ 		Assert(EndPos.xrecoff % XLogSegSize == 0);
  
! 		/* Use up all the remaining space on the first page */
! 		XLByteAdvance(CurrPos, freespace);
! 
! 		while (XLByteLT(CurrPos, EndPos))
! 		{
! 			/*
! 			 * like in the non-xlog-switch codepath, let others know that
! 			 * we're done writing up to the end of this page
! 			 */
! 			UpdateSlotCurrPos(myslot, CurrPos);
! 			/* initialize the next page (if not initialized already */
! 			AdvanceXLInsertBuffer(CurrPos, false);
! 			XLByteAdvance(CurrPos, XLOG_BLCKSZ);
! 		}
! 	}
  
  	/*
! 	 * Done! Clear CurrPos in our slot to let others know that we're
! 	 * finished.
  	 */
! 	UpdateSlotCurrPos(myslot, InvalidXLogRecPtr);
  
  	/*
! 	 * When we run out of insertion slots, the next inserter has to grab the
! 	 * WALInsertTailLock to clean up some old slots.  That stalls all new
! 	 * insertions. The WAL writer process cleans up old slots periodically,
! 	 * but on a busy system that might not be enough. So we try to clean up
! 	 * old ones every time we've gone through 1/4 of all the slots.
  	 */
! 	if ((myslot_p - XLogCtl->XLogInsertSlots) % (NumXLogInsertSlots / 4) == 0)
! 		ReuseOldSlots();
! }
  
! /*
!  * Reserves the right amount of space for a record of given size from the WAL.
!  * *StartPos_p is set to the beginning of the reserved section, *EndPos_p to
!  * its end+1, and *PrevRecord_p to the beginning of the previous record to set
!  * to the prev-link of the record header.
!  *
!  * A log-switch record is handled slightly differently. The rest of the
!  * segment will be reserved for this insertion, as indicated by the returned
!  * *EndPos_p value. However, if we are already at the beginning of the current
!  * segment, the *EndPos_p is set to the current location without reserving
!  * any space, and the function returns false.
!  *
!  * *updrqst_p is set to true, if this record ends on different page than
!  * the previous one - the caller should update the shared LogwrtRqst value
!  * after it's done inserting the record in that case, so that the WAL page
!  * that filled up gets written out at the next convenient moment.
!  *
!  * While holding insertpos_lck, sets myslot->CurrPos to the starting position,
!  * (or the end of previous record, to be exact) to let others know that we're
!  * busy inserting to the reserved area. The caller must clear it when the
!  * insertion is finished.
!  *
!  * Returns true on success, or false if RedoRecPtr or forcePageWrites was
!  * changed. On failure, the shared state is not modified.
!  *
!  * This is the performance critical part of XLogInsert that must be serialized
!  * across backends. The rest can happen mostly in parallel.
!  *
!  * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
!  * where we actually copy the record to the reserved space.
!  */
! static bool
! ReserveXLogInsertLocation(int size, bool didPageWrites,
! 						  bool isLogSwitch,
! 						  XLogRecPtr *PrevRecord_p, XLogRecPtr *StartPos_p,
! 						  XLogRecPtr *EndPos_p,
! 						  XLogInsertSlot **myslot_p, bool *updrqst_p)
! {
! 	volatile XLogInsertSlot *myslot;
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			freespace;
! 	XLogRecPtr	ptr;
! 	XLogRecPtr	StartPos;
! 	XLogRecPtr	BeginCurrPos;
! 	int32		nextslot;
! 	int32		lastslot;
! 	bool		updrqst = false;
! 
! 	/* log-switch records should contain no data */
! 	Assert(!isLogSwitch || size == 0);
  
! 	size = SizeOfXLogRecord + size;
  
+ retry:
+ 	SpinLockAcquire(&Insert->insertpos_lck);
+ 
+ 	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr) ||
+ 		(!didPageWrites && (Insert->forcePageWrites || Insert->fullPageWrites)))
+ 	{
  		/*
! 		 * Oops, a checkpoint just happened, or forcePageWrites was just
! 		 * turned on. Start XLogInsert() all over, because we might have to
! 		 * include more full-page images in the record.
  		 */
! 		RedoRecPtr = Insert->RedoRecPtr;
! 		SpinLockRelease(&Insert->insertpos_lck);
! 		*EndPos_p = InvalidXLogRecPtr;
! 		return false;
! 	}
! 
! 	/*
! 	 * Reserve the next insertion slot for us.
! 	 *
! 	 * First check that the slot is not still in use. Modifications to
! 	 * lastslot are protected by WALInsertTailLock, but here we assume that
! 	 * reading an int32 is atomic. Another process might advance lastslot at
! 	 * the same time, but not past nextslot.
! 	 */
! 	lastslot = Insert->lastslot;
! 	nextslot = Insert->nextslot;
! 	if (NextSlotNo(nextslot) == lastslot)
! 	{
! 		/*
! 		 * Oops, we've "caught our tail" and the oldest slot is still in use.
! 		 * Have to wait for it to become vacant, and retry.
! 		 */
! 		SpinLockRelease(&Insert->insertpos_lck);
! 		WaitXLogInsertionsToFinish(InvalidXLogRecPtr);
! 		goto retry;
! 	}
! 
! 	/*
! 	 * Got the slot. Now reserve the right amount of space from the WAL for
! 	 * our record.
! 	 */
! 	ptr = Insert->CurrPos;
! 	*PrevRecord_p = Insert->PrevRecord;
! 
! 	/*
! 	 * If there isn't enough space on the current XLOG page for a record
! 	 * header, advance to the next page (leaving the unused space as zeroes).
! 	 */
! 	freespace = INSERT_FREESPACE(ptr);
! 	if (freespace < SizeOfXLogRecord)
! 	{
! 		XLByteAdvance(ptr, freespace);
! 		BeginCurrPos = ptr;
! 
! 		if (ptr.xrecoff % XLogSegSize == 0)
! 			ptr.xrecoff += SizeOfXLogLongPHD;
! 		else
! 			ptr.xrecoff += SizeOfXLogShortPHD;
! 		freespace = INSERT_FREESPACE(ptr);
! 		updrqst = true;
! 	}
! 	else
! 		BeginCurrPos = ptr;
  
! 	/*
! 	 * We are now at the starting position of our record. Now figure out how
! 	 * the data will be split across the WAL pages, to calculate where the
! 	 * record ends.
! 	 */
! 	StartPos = ptr;
  
+ 	if (isLogSwitch)
+ 	{
  		/*
! 		 * If the record is an XLOG_SWITCH, and we are exactly at the start
! 		 * of a segment, we need not insert it (and don't want to because
! 		 * we'd like consecutive switch requests to be no-ops). Otherwise the
! 		 * XLOG_SWITCH record should consume all the remaining space on the
! 		 * current segment.
  		 */
+ 		if ((ptr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
  		{
! 			/* We can release insert lock immediately */
! 			SpinLockRelease(&Insert->insertpos_lck);
  
! 			ptr.xrecoff -= SizeOfXLogLongPHD;
! 			if (ptr.xrecoff == 0)
! 			{
! 				/* crossing a logid boundary */
! 				ptr.xlogid -= 1;
! 				ptr.xrecoff = XLogFileSize;
! 			}
  
! 			*EndPos_p = ptr;
! 			*StartPos_p = ptr;
! 			*myslot_p = NULL;
  
! 			return false;
! 		}
! 		else
! 		{
! 			if (ptr.xrecoff % XLOG_SEG_SIZE != 0)
! 			{
! 				int segleft = XLOG_SEG_SIZE - (ptr.xrecoff % XLOG_SEG_SIZE);
! 				XLByteAdvance(ptr, segleft);
! 			}
! 			updrqst = true;
! 		}
  	}
  	else
  	{
! 		/*
! 		 * A normal record, ie. not xlog-switch. Calculate how the record will
! 		 * be layed out across WAL pages. The straightforward way to do this
! 		 * would be a loop that fills in the WAL pages one at a time, tracking
! 		 * how much of the size is still left.  That's how the
! 		 * CopyXLogRecordToWAL() works when actually copying the data.
! 		 * However, we want to avoid looping to keep this spinlock-protected
! 		 * as short as possible, if the record spans many pages.
! 		 */
! 		int		sizeleft = size;
  
! 		if (sizeleft > freespace)
  		{
! 			int		pagesneeded;
! 			int		pagesleftonseg;
! 			int		fullpages;
! 
! 			/* First fill the first page with as much data as fits. */
! 			sizeleft -= freespace;
! 			XLByteAdvance(ptr, freespace);
! 
! 			/* We're now positioned at the beginning of the next page */
! 			Assert(ptr.xrecoff % XLOG_BLCKSZ == 0);
! 			do
! 			{
! 				/*
! 				 * If we're positioned at the beginning of a segment, take
! 				 * into account that the first page needs a long header.
! 				 */
! 				if (ptr.xrecoff % XLOG_SEG_SIZE == 0)
! 					sizeleft += (SizeOfXLogLongPHD - SizeOfXLogShortPHD);
! 
! 				/*
! 				 * Calculate the number of extra pages we need.  Each page
! 				 * will have a continuation record at the beginning.
! 				 *
! 				 * We do the calculation assuming that all the pages have a
! 				 * short header.  We don't know whether we have to cross to
! 				 * the next segment until we've calculated how many pages we
! 				 * need. If it turns out that we do, we'll fill up the current
! 				 * segment, and loop back to add the long page header to
! 				 * sizeleft, and continue calculation from there.
! 				 */
! #define SpaceOnXLogPage	(XLOG_BLCKSZ - SizeOfXLogShortPHD - SizeOfXLogContRecord)
! 				pagesneeded = (sizeleft + SpaceOnXLogPage - 1) / SpaceOnXLogPage;
! 
! 				pagesleftonseg = (XLOG_SEG_SIZE - (ptr.xrecoff % XLOG_SEG_SIZE)) / XLOG_BLCKSZ;
! 
! 				if (pagesneeded <= pagesleftonseg)
! 				{
! 					/*
! 					 * Fits in this segment. Skip over all the full pages, to
! 					 * the last page that will (possibly) be only partially
! 					 * filled.
! 					 */
! 					fullpages = pagesneeded - 1;
! 				}
! 				else
! 				{
! 					/*
! 					 * Doesn't fit in this segment. Fit as much as does, and
! 					 * continue from next segment.
! 					 */
! 					fullpages = pagesleftonseg;
! 				}
! 
! 				sizeleft -= fullpages * SpaceOnXLogPage;
! 				XLByteAdvance(ptr, fullpages * XLOG_BLCKSZ);
! 			} while (pagesneeded > pagesleftonseg);
! 
! 			/*
! 			 * We're now positioned at the beginning of the last page this
! 			 * record spans.  The rest should fit on this page.
! 			 *
! 			 * Note: We already took into account the long header above.
! 			 */
! 			ptr.xrecoff += SizeOfXLogShortPHD;
! 			ptr.xrecoff += SizeOfXLogContRecord;
! 
! 			Assert(sizeleft <= INSERT_FREESPACE(ptr));
! 
  			updrqst = true;
  		}
! 
! 		/* the rest fits on this page */
! 		ptr.xrecoff += sizeleft;
! 
! 		/* Align the end position, so that the next record starts aligned */
! 		ptr.xrecoff = MAXALIGN(ptr.xrecoff);
! 		if (ptr.xrecoff >= XLogFileSize)
  		{
! 			/* crossed a logid boundary */
! 			ptr.xlogid += 1;
! 			ptr.xrecoff = 0;
  		}
  	}
  
! 	/* Update the shared state, and our slot, before releasing the lock */
! 	myslot = &XLogCtl->XLogInsertSlots[nextslot];
! 	myslot->CurrPos = BeginCurrPos;
  
! 	Insert->CurrPos = ptr;
! 	Insert->PrevRecord = StartPos;
! 	Insert->nextslot = NextSlotNo(nextslot);
! 
! 	SpinLockRelease(&Insert->insertpos_lck);
! 
! #ifdef RESERVE_XLOGINSERT_LOCATION_DEBUG
! 	elog(LOG, "reserved xlog: prev %X/%X, start %X/%X, end %X/%X (len %d)",
! 		 PrevRecord_p->xlogid, PrevRecord_p->xrecoff,
! 		 StartPos.xlogid, StartPos.xrecoff,
! 		 ptr.xlogid, ptr.xrecoff,
! 		 size);
! #endif
! 
! 	*EndPos_p = ptr;
! 	*StartPos_p = StartPos;
! 	*myslot_p = (XLogInsertSlot *) myslot;
! 	*updrqst_p = updrqst;
! 
! 	return true;
! }
! 
! /*
!  * Update slot's CurrPos variable, and wake up anyone waiting on it.
!  */
! static void
! UpdateSlotCurrPos(volatile XLogInsertSlot *myslot, XLogRecPtr CurrPos)
! {
! 	PGPROC	   *head;
! 
! 	/*
! 	 * The write-barrier ensures that the changes we made to the WAL pages
! 	 * are visible to everyone before the update of CurrPos.
! 	 *
! 	 * XXX: I'm not sure if this is necessary. Doesn't the spinlock
! 	 * acquire/release act as an implicit barrier?
! 	 */
! 	pg_write_barrier();
! 
! 	SpinLockAcquire(&myslot->lck);
! 	myslot->CurrPos = CurrPos;
! 	head = myslot->head;
! 	myslot->head = myslot->tail = NULL;
! 	SpinLockRelease(&myslot->lck);
! 	while (head != NULL)
  	{
! 		PGPROC *proc = head;
! 		head = proc->lwWaitLink;
! 		proc->lwWaitLink = NULL;
! 		proc->lwWaiting = false;
! 		PGSemaphoreUnlock(&proc->sem);
! 	}
! }
  
! /*
!  * Get a pointer to the right location in the WAL buffer containing the
!  * given XLogRecPtr.
!  *
!  * If the page is not initialized yet, it is initialized. That might require
!  * evicting an old dirty buffer from the buffer cache, which means I/O.
!  *
!  * The caller must ensure that the page containing the requested location
!  * isn't evicted yet, and won't be evicted, by holding onto an
!  * XLOG insertion slot with CurrPos set to 'ptr'. Setting it to some value
!  * less than 'ptr' would suffice for GetXLogBuffer(), but risks deadlock:
!  * If we have to evict a buffer, we might have to wait for someone else to
!  * finish a write. And that someone else might not be able to finish the
!  * write, if our CurrPos points to a buffer that's still in the buffer cache.
!  */
! static char *
! GetXLogBuffer(XLogRecPtr ptr)
! {
! 	int			idx;
! 	XLogRecPtr	endptr;
! 	static uint32 cachedXlogid = 0;
! 	static uint32 cachedPage = 0;
! 	static char *cachedPos = NULL;
! 	XLogRecPtr	expectedEndPtr;
! 
! 	/*
! 	 * Fast path for the common case that we need to access again the same
! 	 * page as last time.
! 	 */
! 	if (ptr.xlogid == cachedXlogid && ptr.xrecoff / XLOG_BLCKSZ == cachedPage)
! 		return cachedPos + ptr.xrecoff % XLOG_BLCKSZ;
! 
! 	cachedXlogid = ptr.xlogid;
! 	cachedPage = ptr.xrecoff / XLOG_BLCKSZ;
! 
! 	/*
! 	 * The XLog buffer cache is organized so a page must always be loaded
! 	 * to a particular buffer.  That way we can easily calculate the buffer
! 	 * a given page must be loaded into, from the XLogRecPtr alone.
! 	 */
! 	idx = XLogRecPtrToBufIdx(ptr);
! 
! 	/*
! 	 * See what page is loaded in the buffer at the moment. It could be the
! 	 * page we're looking for, or something older. It can't be anything
! 	 * newer - that would imply the page we're looking for has already
! 	 * been written out to disk, which shouldn't happen as long as the caller
! 	 * has set its slot's CurrPos correctly.
! 	 *
! 	 * However, we don't hold a lock while we read the value. If someone has
! 	 * just initialized the page, it's possible that we get a "torn read" of
! 	 * the XLogRecPtr, and see a bogus value. That's ok, we'll grab the
! 	 * mapping lock (in AdvanceXLInsertBuffer) and retry if we see anything
! 	 * else than the page we're looking for. But it means that when we do this
! 	 * unlocked read, we might see a value that appears to be ahead of the
! 	 * page we're looking for. Don't PANIC on that, until we've verified the
! 	 * value while holding the lock.
! 	 */
! 	expectedEndPtr.xlogid = ptr.xlogid;
! 	expectedEndPtr.xrecoff = ptr.xrecoff - ptr.xrecoff % XLOG_BLCKSZ + XLOG_BLCKSZ;
! 
! 	endptr = XLogCtl->xlblocks[idx];
! 	if (!XLByteEQ(expectedEndPtr, endptr))
! 	{
! 		AdvanceXLInsertBuffer(ptr, false);
! 		endptr = XLogCtl->xlblocks[idx];
! 
! 		if (!XLByteEQ(expectedEndPtr, endptr))
! 			elog(PANIC, "could not find WAL buffer for %X/%X",
! 				 ptr.xlogid, ptr.xrecoff);
  	}
  
! 	/*
! 	 * Found the buffer holding this page. Return a pointer to the right
! 	 * offset within the page.
! 	 */
! 	cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
! 	return XLogCtl->pages + idx * (Size) XLOG_BLCKSZ +
! 		ptr.xrecoff % XLOG_BLCKSZ;
! }
  
! /*
!  * Try to mark old insertion slots as free for reuse.
!  */
! static void
! ReuseOldSlots(void)
! {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			lastslot;
! 	int			nextslot;
  
! 	/* Give up if someone else is already doing this */
! 	if (!LWLockConditionalAcquire(WALInsertTailLock, LW_EXCLUSIVE))
! 		return;
! 
! 	lastslot = Insert->lastslot;
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	nextslot = Insert->nextslot;
! 	SpinLockRelease(&Insert->insertpos_lck);
! 
! 	while (lastslot != nextslot)
! 	{
! 		/*
! 		 * Check if the oldest slot is still in use. We don't do any locking
! 		 * here, we just give up as soon as we find a slot that's still in
! 		 * use.
! 		 */
! 		volatile XLogInsertSlot *slot;
! 		slot = &XLogCtl->XLogInsertSlots[lastslot];
! 
! 		if (!XLByteEQ(slot->CurrPos, InvalidXLogRecPtr))
! 			break;
! 
! 		lastslot = NextSlotNo(lastslot);
! 	}
! 
! 	Insert->lastslot = lastslot;
! 	LWLockRelease(WALInsertTailLock);
! }
! 
! /*
!  * Wait for any insertions < upto to finish. If upto is invalid, we wait until
!  * at least one slot is available for insertion.
!  *
!  * Returns a value >= upto, which indicates the oldest in-progress insertion
!  * that we saw in the array, or InvalidXLogRecPtr if there are no insertions
!  * in-progress at exit.
!  */
! static XLogRecPtr
! WaitXLogInsertionsToFinish(XLogRecPtr upto)
! {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			lastslot;
! 	int			nextslot;
! 	XLogRecPtr	LastPos = InvalidXLogRecPtr;
! 	int			extraWaits = 0;
! 
! 	if (MyProc == NULL)
! 		elog(PANIC, "cannot wait without a PGPROC structure");
! 
! retry:
! 	/*
! 	 * Read lastslot and nextslot. lastslot cannot change while we hold the
! 	 * tail-lock. nextslot can advance while we run, but not beyond
! 	 * lastslot - 1. We still have to acquire insertpos_lck to make sure that
! 	 * we see the CurrPos of the latest slot correctly.
! 	 */
! 	LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
! 	lastslot = Insert->lastslot;
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	nextslot = Insert->nextslot;
! 	SpinLockRelease(&Insert->insertpos_lck);
! 
! 	while (lastslot != nextslot)
! 	{
! 		/*
! 		 * Examine the oldest slot still in use.
! 		 */
! 		volatile XLogInsertSlot *slot;
! 		XLogRecPtr	slotptr;
! 
! 		slot = &XLogCtl->XLogInsertSlots[lastslot];
! 
! 		/* First, a quick check without the lock. */
! 		if (XLByteEQ(slot->CurrPos, InvalidXLogRecPtr))
! 		{
! 			lastslot = NextSlotNo(lastslot);
! 			continue;
! 		}
! 
! 		SpinLockAcquire(&slot->lck);
! 		slotptr = slot->CurrPos;
! 
! 		if (XLByteEQ(slotptr, InvalidXLogRecPtr))
! 		{
! 			/*
! 			 * The insertion has already finished, we just need to advance
! 			 * lastslot to make the slot available for reuse.
! 			 */
! 			SpinLockRelease(&slot->lck);
! 			lastslot = NextSlotNo(lastslot);
! 			continue;
! 		}
! 		else
! 		{
! 			/*
! 			 * The insertion is still in-progress. If we just needed for
! 			 * any slot to become available and there is at least one slot
! 			 * free now, or if this slot's CurrPos >= upto, we can
! 			 * stop here. Otherwise we have to wait for it to finish.
! 			 */
! 			if ((XLogRecPtrIsInvalid(upto) && NextSlotNo(nextslot) != lastslot)
! 				|| (!XLogRecPtrIsInvalid(upto) && XLByteLE(upto, slotptr)))
! 			{
! 				SpinLockRelease(&slot->lck);
! 				LastPos = slotptr;
! 				break;
! 			}
! 			else
! 			{
! 				/* Wait for this insertion to finish. */
! 				MyProc->lwWaiting = true;
! 				MyProc->lwWaitMode = 0; /* doesn't matter */
! 				MyProc->lwWaitLink = NULL;
! 				if (slot->head == NULL)
! 					slot->head = MyProc;
! 				else
! 					slot->tail->lwWaitLink = MyProc;
! 				slot->tail = MyProc;
! 				SpinLockRelease(&slot->lck);
! 
! 				Insert->lastslot = lastslot;
! 				LWLockRelease(WALInsertTailLock);
! 				for (;;)
! 				{
! 					PGSemaphoreLock(&MyProc->sem, false);
! 					if (!MyProc->lwWaiting)
! 						break;
! 					extraWaits++;
! 				}
! 
! 				/*
! 				 * The insertion has now finished. Start all over. While we
! 				 * were not holding the tail-lock, someone might've filled up
! 				 * all slots again.
! 				 */
! 				goto retry;
! 			}
! 		}
! 	}
! 
! 	/* Update lastslot before we release the lock */
! 	Insert->lastslot = lastslot;
! 	LWLockRelease(WALInsertTailLock);
! 
! 	while (extraWaits-- > 0)
! 		PGSemaphoreUnlock(&MyProc->sem);
! 
! 	return LastPos;
  }
  
  /*
***************
*** 1440,1469 **** XLogArchiveCleanup(const char *xlog)
  }
  
  /*
!  * Advance the Insert state to the next buffer page, writing out the next
!  * buffer if it still contains unwritten data.
!  *
!  * If new_segment is TRUE then we set up the next buffer page as the first
!  * page of the next xlog segment file, possibly but not usually the next
!  * consecutive file page.
!  *
!  * The global LogwrtRqst.Write pointer needs to be advanced to include the
!  * just-filled page.  If we can do this for free (without an extra lock),
!  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
!  * request update still needs to be done, FALSE if we did it internally.
!  *
!  * Must be called with WALInsertLock held.
   */
! static bool
! AdvanceXLInsertBuffer(bool new_segment)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			nextidx = NextBufIdx(Insert->curridx);
! 	bool		update_needed = true;
  	XLogRecPtr	OldPageRqstPtr;
  	XLogwrtRqst WriteRqst;
! 	XLogRecPtr	NewPageEndPtr;
  	XLogPageHeader NewPage;
  
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
--- 2119,2151 ----
  }
  
  /*
!  * Initialize XLOG buffers, writing out old buffers if they still contain
!  * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
!  * true, initialize as many pages as we can without having to write out
!  * unwritten data. Any new pages are initialized to zeros, with pages headers
!  * initialized properly.
   */
! static void
! AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			nextidx;
  	XLogRecPtr	OldPageRqstPtr;
  	XLogwrtRqst WriteRqst;
! 	XLogRecPtr	NewPageEndPtr = InvalidXLogRecPtr;
  	XLogPageHeader NewPage;
+ 	int			npages = 0;
+ 
+ 	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * Now that we have the lock, check if someone initialized the page
+ 	 * already.
+ 	 */
+ /* XXX: fix indentation before commit */
+ while (!XLByteLT(upto, XLogCtl->xlblocks[XLogCtl->curridx]) || opportunistic)
+ {
+ 	nextidx = NextBufIdx(XLogCtl->curridx);
  
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
***************
*** 1473,1482 **** AdvanceXLInsertBuffer(bool new_segment)
  	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
  	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  	{
! 		/* nope, got work to do... */
! 		XLogRecPtr	FinishedPageRqstPtr;
! 
! 		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
  
  		/* Before waiting, get info_lck and update LogwrtResult */
  		{
--- 2155,2166 ----
  	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
  	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  	{
! 		/*
! 		 * Nope, got work to do. If we just want to pre-initialize as much as
! 		 * we can without flushing, give up now.
! 		 */
! 		if (opportunistic)
! 			break;
  
  		/* Before waiting, get info_lck and update LogwrtResult */
  		{
***************
*** 1484,1504 **** AdvanceXLInsertBuffer(bool new_segment)
  			volatile XLogCtlData *xlogctl = XLogCtl;
  
  			SpinLockAcquire(&xlogctl->info_lck);
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
! 				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
  			LogwrtResult = xlogctl->LogwrtResult;
  			SpinLockRelease(&xlogctl->info_lck);
  		}
  
- 		update_needed = false;	/* Did the shared-request update */
- 
  		/*
  		 * Now that we have an up-to-date LogwrtResult value, see if we still
  		 * need to write it or if someone else already did.
  		 */
  		if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  		{
! 			/* Must acquire write lock */
  			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  			LogwrtResult = XLogCtl->LogwrtResult;
  			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
--- 2168,2194 ----
  			volatile XLogCtlData *xlogctl = XLogCtl;
  
  			SpinLockAcquire(&xlogctl->info_lck);
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, OldPageRqstPtr))
! 				xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
  			LogwrtResult = xlogctl->LogwrtResult;
  			SpinLockRelease(&xlogctl->info_lck);
  		}
  
  		/*
  		 * Now that we have an up-to-date LogwrtResult value, see if we still
  		 * need to write it or if someone else already did.
  		 */
  		if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  		{
! 			/*
! 			 * Must acquire write lock. Release WALBufMappingLock first, to
! 			 * make sure that all insertions that we need to wait for can
! 			 * finish (up to this same position). Otherwise we risk deadlock.
! 			 */
! 			LWLockRelease(WALBufMappingLock);
! 
! 			WaitXLogInsertionsToFinish(OldPageRqstPtr);
! 
  			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  			LogwrtResult = XLogCtl->LogwrtResult;
  			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
***************
*** 1508,1525 **** AdvanceXLInsertBuffer(bool new_segment)
  			}
  			else
  			{
! 				/*
! 				 * Have to write buffers while holding insert lock. This is
! 				 * not good, so only write as much as we absolutely must.
! 				 */
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
  				WriteRqst.Write = OldPageRqstPtr;
  				WriteRqst.Flush.xlogid = 0;
  				WriteRqst.Flush.xrecoff = 0;
! 				XLogWrite(WriteRqst, false, false);
  				LWLockRelease(WALWriteLock);
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
  			}
  		}
  	}
  
--- 2198,2215 ----
  			}
  			else
  			{
! 				/* Have to write it ourselves */
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
  				WriteRqst.Write = OldPageRqstPtr;
  				WriteRqst.Flush.xlogid = 0;
  				WriteRqst.Flush.xrecoff = 0;
! 				XLogWrite(WriteRqst, false);
  				LWLockRelease(WALWriteLock);
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
  			}
+ 			/* Re-acquire WALBufMappingLock and retry */
+ 			LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+ 			continue;
  		}
  	}
  
***************
*** 1527,1540 **** AdvanceXLInsertBuffer(bool new_segment)
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
! 	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
! 
! 	if (new_segment)
! 	{
! 		/* force it to a segment start point */
! 		NewPageEndPtr.xrecoff += XLogSegSize - 1;
! 		NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
! 	}
  
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
--- 2217,2223 ----
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
! 	NewPageEndPtr = XLogCtl->xlblocks[XLogCtl->curridx];
  
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
***************
*** 1544,1556 **** AdvanceXLInsertBuffer(bool new_segment)
  	}
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
! 	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
! 	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
! 
! 	Insert->curridx = nextidx;
! 	Insert->currpage = NewPage;
  
! 	Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
  
  	/*
  	 * Be sure to re-zero the buffer so that bytes beyond what we've written
--- 2227,2236 ----
  	}
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
! 	Assert(NewPageEndPtr.xrecoff % XLOG_BLCKSZ == 0);
! 	Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
  
! 	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
  
  	/*
  	 * Be sure to re-zero the buffer so that bytes beyond what we've written
***************
*** 1594,1604 **** AdvanceXLInsertBuffer(bool new_segment)
  		NewLongPage->xlp_seg_size = XLogSegSize;
  		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
- 
- 		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
  	}
  
! 	return update_needed;
  }
  
  /*
--- 2274,2301 ----
  		NewLongPage->xlp_seg_size = XLogSegSize;
  		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
  	}
  
! 	/*
! 	 * Make sure the initialization of the page becomes visible to others
! 	 * before the xlblocks update. GetXLogBuffer() reads xlblocks without
! 	 * holding a lock.
! 	 */
! 	pg_write_barrier();
! 
! 	*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
! 
! 	XLogCtl->curridx = nextidx;
! 
! 	npages++;
! }
! 	LWLockRelease(WALBufMappingLock);
! 
! #ifdef WAL_DEBUG
! 	if (npages > 0)
! 		elog(DEBUG1, "initialized %d pages, upto %X/%X",
! 			 npages, NewPageEndPtr.xlogid, NewPageEndPtr.xrecoff);
! #endif
  }
  
  /*
***************
*** 1643,1658 **** XLogCheckpointNeeded(uint32 logid, uint32 logseg)
   * This option allows us to avoid uselessly issuing multiple writes when a
   * single one would do.
   *
!  * If xlog_switch == TRUE, we are intending an xlog segment switch, so
!  * perform end-of-segment actions after writing the last page, even if
!  * it's not physically the end of its segment.  (NB: this will work properly
!  * only if caller specifies WriteRqst == page-end and flexible == false,
!  * and there is some data to write.)
!  *
!  * Must be called with WALWriteLock held.
   */
  static void
! XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
--- 2340,2351 ----
   * This option allows us to avoid uselessly issuing multiple writes when a
   * single one would do.
   *
!  * Must be called with WALWriteLock held. And you must've called
!  * WaitXLogInsertionsToFinish(WriteRqst) before grabbing the lock to make sure
!  * the data is ready to write.
   */
  static void
! XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
***************
*** 1701,1714 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  		 * if we're passed a bogus WriteRqst.Write that is past the end of the
  		 * last page that's been initialized by AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[curridx].xlogid,
! 				 XLogCtl->xlblocks[curridx].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
--- 2394,2407 ----
  		 * if we're passed a bogus WriteRqst.Write that is past the end of the
  		 * last page that's been initialized by AdvanceXLInsertBuffer.
  		 */
! 		XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
! 		if (!XLByteLT(LogwrtResult.Write, EndPtr))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 EndPtr.xlogid, EndPtr.xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = EndPtr;
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
***************
*** 1805,1820 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  			 * later. Doing it here ensures that one and only one backend will
  			 * perform this fsync.
  			 *
- 			 * We also do this if this is the last page written for an xlog
- 			 * switch.
- 			 *
  			 * This is also the right place to notify the Archiver that the
  			 * segment is ready to copy to archival storage, and to update the
  			 * timer for archive_timeout, and to signal for a checkpoint if
  			 * too many logfile segments have been used since the last
  			 * checkpoint.
  			 */
! 			if (finishing_seg || (xlog_switch && last_iteration))
  			{
  				issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
  				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
--- 2498,2510 ----
  			 * later. Doing it here ensures that one and only one backend will
  			 * perform this fsync.
  			 *
  			 * This is also the right place to notify the Archiver that the
  			 * segment is ready to copy to archival storage, and to update the
  			 * timer for archive_timeout, and to signal for a checkpoint if
  			 * too many logfile segments have been used since the last
  			 * checkpoint.
  			 */
! 			if (finishing_seg)
  			{
  				issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
  				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
***************
*** 2066,2073 **** XLogFlush(XLogRecPtr record)
  	 */
  	for (;;)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		/* read LogwrtResult and update local state */
  		SpinLockAcquire(&xlogctl->info_lck);
--- 2756,2767 ----
  	 */
  	for (;;)
  	{
! 		/* use volatile pointers to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
+ 		volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 		uint32		freespace;
+ 		XLogRecPtr	insertpos,
+ 					inprogresspos;
  
  		/* read LogwrtResult and update local state */
  		SpinLockAcquire(&xlogctl->info_lck);
***************
*** 2081,2086 **** XLogFlush(XLogRecPtr record)
--- 2775,2812 ----
  			break;
  
  		/*
+ 		 * Get the current insert position.
+ 		 *
+ 		 * XXX: This used to do LWLockConditionalAcquire(WALInsertLock),
+ 		 * fall back to writing just up to 'record' if we couldn't get the
+ 		 * lock. I wonder if it would be a good idea to have a
+ 		 * SpinLockConditionalAcquire function and use that? On one hand,
+ 		 * it would be good to not cause more contention on the lock if it's
+ 		 * busy, but on the other hand, this spinlock is much more lightweight
+ 		 * than the WALInsertLock was, so maybe it's better to just grab the
+ 		 * spinlock. In fact, LWLockConditionalAcquire did a spinlock acquire
+ 		 * + release, anyway. Also note that if we stored the XLogRecPtr as
+ 		 * one 64-bit integer, we could just read it with no lock on platforms
+ 		 * where 64-bit integer accesses are atomic, which covers many common
+ 		 * platforms nowadays.
+ 		 */
+ 		SpinLockAcquire(&Insert->insertpos_lck);
+ 		insertpos = Insert->CurrPos;
+ 		SpinLockRelease(&Insert->insertpos_lck);
+ 
+ 		freespace = INSERT_FREESPACE(insertpos);
+ 		if (freespace < SizeOfXLogRecord)               /* buffer is full */
+ 			insertpos.xrecoff += freespace;
+ 
+ 		/*
+ 		 * Before actually performing the write, wait for all in-flight
+ 		 * insertions to the pages we're about to write to finish.
+ 		 */
+ 		inprogresspos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+ 		if (!XLogRecPtrIsInvalid(inprogresspos))
+ 			insertpos = inprogresspos;
+ 
+ 		/*
  		 * Try to get the write lock. If we can't get it immediately, wait
  		 * until it's released, and recheck if we still need to do the flush
  		 * or if the backend that held the lock did it for us already. This
***************
*** 2100,2128 **** XLogFlush(XLogRecPtr record)
  		LogwrtResult = XLogCtl->LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
! 			/* try to write/flush later additions to XLOG as well */
! 			if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
! 			{
! 				XLogCtlInsert *Insert = &XLogCtl->Insert;
! 				uint32		freespace = INSERT_FREESPACE(Insert);
  
! 				if (freespace < SizeOfXLogRecord)		/* buffer is full */
! 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
! 				else
! 				{
! 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
! 					WriteRqstPtr.xrecoff -= freespace;
! 				}
! 				LWLockRelease(WALInsertLock);
! 				WriteRqst.Write = WriteRqstPtr;
! 				WriteRqst.Flush = WriteRqstPtr;
! 			}
! 			else
! 			{
! 				WriteRqst.Write = WriteRqstPtr;
! 				WriteRqst.Flush = record;
! 			}
! 			XLogWrite(WriteRqst, false, false);
  		}
  		LWLockRelease(WALWriteLock);
  		/* done */
--- 2826,2835 ----
  		LogwrtResult = XLogCtl->LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
! 			WriteRqst.Write = insertpos;
! 			WriteRqst.Flush = insertpos;
  
! 			XLogWrite(WriteRqst, false);
  		}
  		LWLockRelease(WALWriteLock);
  		/* done */
***************
*** 2237,2243 **** XLogBackgroundFlush(void)
  
  	START_CRIT_SECTION();
  
! 	/* now wait for the write lock */
  	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  	LogwrtResult = XLogCtl->LogwrtResult;
  	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
--- 2944,2951 ----
  
  	START_CRIT_SECTION();
  
! 	/* now wait for any in-progress insertions to finish and get write lock */
! 	WaitXLogInsertionsToFinish(WriteRqstPtr);
  	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  	LogwrtResult = XLogCtl->LogwrtResult;
  	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
***************
*** 2246,2256 **** XLogBackgroundFlush(void)
  
  		WriteRqst.Write = WriteRqstPtr;
  		WriteRqst.Flush = WriteRqstPtr;
! 		XLogWrite(WriteRqst, flexible, false);
  	}
- 	LWLockRelease(WALWriteLock);
  
  	END_CRIT_SECTION();
  }
  
  /*
--- 2954,2971 ----
  
  		WriteRqst.Write = WriteRqstPtr;
  		WriteRqst.Flush = WriteRqstPtr;
! 		XLogWrite(WriteRqst, flexible);
  	}
  
  	END_CRIT_SECTION();
+ 
+ 	LWLockRelease(WALWriteLock);
+ 
+ 	/*
+ 	 * Great, done. To take some work off the critical path, try to initialize
+ 	 * as many of the no-longer-needed WAL buffers for future use as we can.
+ 	 */
+ 	AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
  }
  
  /*
***************
*** 5059,5064 **** XLOGShmemInit(void)
--- 5774,5780 ----
  	bool		foundCFile,
  				foundXLog;
  	char	   *allocptr;
+ 	int			i;
  
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
***************
*** 5084,5089 **** XLOGShmemInit(void)
--- 5800,5816 ----
  	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
  	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
  
+ 	/* Initialize insertion slots */
+ 	for (i = 0; i < NumXLogInsertSlots; i++)
+ 	{
+ 		XLogInsertSlot *slot = &XLogCtl->XLogInsertSlots[i];
+ 		slot->CurrPos = InvalidXLogRecPtr;
+ 		slot->head = slot->tail = NULL;
+ 		SpinLockInit(&slot->lck);
+ 	}
+ 	XLogCtl->Insert.nextslot = 0;
+ 	XLogCtl->Insert.lastslot = 0;
+ 
  	/*
  	 * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
  	 */
***************
*** 5098,5104 **** XLOGShmemInit(void)
  	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
  	XLogCtl->SharedRecoveryInProgress = true;
  	XLogCtl->SharedHotStandbyActive = false;
! 	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
  	SpinLockInit(&XLogCtl->info_lck);
  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
  	InitSharedLatch(&XLogCtl->WALWriterLatch);
--- 5825,5831 ----
  	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
  	XLogCtl->SharedRecoveryInProgress = true;
  	XLogCtl->SharedHotStandbyActive = false;
! 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
  	SpinLockInit(&XLogCtl->info_lck);
  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
  	InitSharedLatch(&XLogCtl->WALWriterLatch);
***************
*** 5980,5985 **** StartupXLOG(void)
--- 6707,6713 ----
  	bool		backupEndRequired = false;
  	bool		backupFromStandby = false;
  	DBState		dbstate_at_startup;
+ 	int			firstIdx;
  
  	/*
  	 * Read control file and check XLOG status looks valid.
***************
*** 6232,6238 **** StartupXLOG(void)
  
  	lastFullPageWrites = checkPoint.fullPageWrites;
  
! 	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
  
  	if (XLByteLT(RecPtr, checkPoint.redo))
  		ereport(PANIC,
--- 6960,6966 ----
  
  	lastFullPageWrites = checkPoint.fullPageWrites;
  
! 	RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
  
  	if (XLByteLT(RecPtr, checkPoint.redo))
  		ereport(PANIC,
***************
*** 6786,6793 **** StartupXLOG(void)
  	openLogOff = 0;
  	Insert = &XLogCtl->Insert;
  	Insert->PrevRecord = LastRec;
! 	XLogCtl->xlblocks[0].xlogid = openLogId;
! 	XLogCtl->xlblocks[0].xrecoff =
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
--- 7514,7525 ----
  	openLogOff = 0;
  	Insert = &XLogCtl->Insert;
  	Insert->PrevRecord = LastRec;
! 
! 	firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
! 	XLogCtl->curridx = firstIdx;
! 
! 	XLogCtl->xlblocks[firstIdx].xlogid = openLogId;
! 	XLogCtl->xlblocks[firstIdx].xrecoff =
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
***************
*** 6795,6804 **** StartupXLOG(void)
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
! 	Insert->currpos = (char *) Insert->currpage +
! 		(EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
  
  	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
  
--- 7527,7535 ----
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[firstIdx].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], readBuf, XLOG_BLCKSZ);
! 	Insert->CurrPos = EndOfLog;
  
  	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
  
***************
*** 6807,6818 **** StartupXLOG(void)
  	XLogCtl->LogwrtRqst.Write = EndOfLog;
  	XLogCtl->LogwrtRqst.Flush = EndOfLog;
  
! 	freespace = INSERT_FREESPACE(Insert);
  	if (freespace > 0)
  	{
  		/* Make sure rest of page is zero */
! 		MemSet(Insert->currpos, 0, freespace);
! 		XLogCtl->Write.curridx = 0;
  	}
  	else
  	{
--- 7538,7549 ----
  	XLogCtl->LogwrtRqst.Write = EndOfLog;
  	XLogCtl->LogwrtRqst.Flush = EndOfLog;
  
! 	freespace = INSERT_FREESPACE(EndOfLog);
  	if (freespace > 0)
  	{
  		/* Make sure rest of page is zero */
! 		MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog.xrecoff % XLOG_BLCKSZ, 0, freespace);
! 		XLogCtl->Write.curridx = firstIdx;
  	}
  	else
  	{
***************
*** 6824,6830 **** StartupXLOG(void)
  		 * this is sufficient.	The first actual attempt to insert a log
  		 * record will advance the insert state.
  		 */
! 		XLogCtl->Write.curridx = NextBufIdx(0);
  	}
  
  	/* Pre-scan prepared transactions to find out the range of XIDs present */
--- 7555,7561 ----
  		 * this is sufficient.	The first actual attempt to insert a log
  		 * record will advance the insert state.
  		 */
! 		XLogCtl->Write.curridx = NextBufIdx(firstIdx);
  	}
  
  	/* Pre-scan prepared transactions to find out the range of XIDs present */
***************
*** 7307,7327 **** InitXLOGAccess(void)
  }
  
  /*
!  * Once spawned, a backend may update its local RedoRecPtr from
!  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
!  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
   */
  XLogRecPtr
  GetRedoRecPtr(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
  
  	SpinLockAcquire(&xlogctl->info_lck);
! 	Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
! 	RedoRecPtr = xlogctl->Insert.RedoRecPtr;
  	SpinLockRelease(&xlogctl->info_lck);
  
  	return RedoRecPtr;
  }
  
--- 8038,8066 ----
  }
  
  /*
!  * Return the current Redo pointer from shared memory.
!  *
!  * As a side-effect, the local RedoRecPtr copy is updated.
   */
  XLogRecPtr
  GetRedoRecPtr(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
+ 	XLogRecPtr ptr;
  
+ 	/*
+ 	 * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+ 	 * grabbed insertpos_lck to read the master copy, someone might update
+ 	 * it just after we've released the lock.
+ 	 */
  	SpinLockAcquire(&xlogctl->info_lck);
! 	ptr = xlogctl->RedoRecPtr;
  	SpinLockRelease(&xlogctl->info_lck);
  
+ 	if (XLByteLT(RedoRecPtr, ptr))
+ 		RedoRecPtr = xlogctl->RedoRecPtr;
+ 
  	return RedoRecPtr;
  }
  
***************
*** 7330,7336 **** GetRedoRecPtr(void)
   *
   * NOTE: The value *actually* returned is the position of the last full
   * xlog page. It lags behind the real insert position by at most 1 page.
!  * For that, we don't need to acquire WALInsertLock which can be quite
   * heavily contended, and an approximation is enough for the current
   * usage of this function.
   */
--- 8069,8075 ----
   *
   * NOTE: The value *actually* returned is the position of the last full
   * xlog page. It lags behind the real insert position by at most 1 page.
!  * For that, we don't need to acquire insertpos_lck which can be quite
   * heavily contended, and an approximation is enough for the current
   * usage of this function.
   */
***************
*** 7592,7597 **** LogCheckpointEnd(bool restartpoint)
--- 8331,8338 ----
  void
  CreateCheckPoint(int flags)
  {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile XLogCtlData *xlogctl = XLogCtl;
  	bool		shutdown;
  	CheckPoint	checkPoint;
  	XLogRecPtr	recptr;
***************
*** 7606,7611 **** CreateCheckPoint(int flags)
--- 8347,8353 ----
  	uint32		insert_logSeg;
  	TransactionId *inCommitXids;
  	int			nInCommit;
+ 	XLogRecPtr	curInsert;
  
  	/*
  	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
***************
*** 7674,7683 **** CreateCheckPoint(int flags)
  		checkPoint.oldestActiveXid = InvalidTransactionId;
  
  	/*
! 	 * We must hold WALInsertLock while examining insert state to determine
  	 * the checkpoint REDO pointer.
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  
  	/*
  	 * If this isn't a shutdown or forced checkpoint, and we have not switched
--- 8416,8426 ----
  		checkPoint.oldestActiveXid = InvalidTransactionId;
  
  	/*
! 	 * We must hold insertpos_lck while examining insert state to determine
  	 * the checkpoint REDO pointer.
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	curInsert = Insert->CurrPos;
  
  	/*
  	 * If this isn't a shutdown or forced checkpoint, and we have not switched
***************
*** 7689,7695 **** CreateCheckPoint(int flags)
  	 * (Perhaps it'd make even more sense to checkpoint only when the previous
  	 * checkpoint record is in a different xlog page?)
  	 *
! 	 * While holding the WALInsertLock we find the current WAL insertion point
  	 * and compare that with the starting point of the last checkpoint, which
  	 * is the redo pointer. We use the redo pointer because the start and end
  	 * points of a checkpoint can be hundreds of files apart on large systems
--- 8432,8438 ----
  	 * (Perhaps it'd make even more sense to checkpoint only when the previous
  	 * checkpoint record is in a different xlog page?)
  	 *
! 	 * While holding insertpos_lck we find the current WAL insertion point
  	 * and compare that with the starting point of the last checkpoint, which
  	 * is the redo pointer. We use the redo pointer because the start and end
  	 * points of a checkpoint can be hundreds of files apart on large systems
***************
*** 7698,7712 **** CreateCheckPoint(int flags)
  	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
  				  CHECKPOINT_FORCE)) == 0)
  	{
- 		XLogRecPtr	curInsert;
- 
- 		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
  		XLByteToSeg(curInsert, insert_logId, insert_logSeg);
  		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logId, redo_logSeg);
  		if (insert_logId == redo_logId &&
  			insert_logSeg == redo_logSeg)
  		{
! 			LWLockRelease(WALInsertLock);
  			LWLockRelease(CheckpointLock);
  			END_CRIT_SECTION();
  			return;
--- 8441,8452 ----
  	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
  				  CHECKPOINT_FORCE)) == 0)
  	{
  		XLByteToSeg(curInsert, insert_logId, insert_logSeg);
  		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logId, redo_logSeg);
  		if (insert_logId == redo_logId &&
  			insert_logSeg == redo_logSeg)
  		{
! 			SpinLockRelease(&Insert->insertpos_lck);
  			LWLockRelease(CheckpointLock);
  			END_CRIT_SECTION();
  			return;
***************
*** 7733,7750 **** CreateCheckPoint(int flags)
  	 * the buffer flush work.  Those XLOG records are logically after the
  	 * checkpoint, even though physically before it.  Got that?
  	 */
! 	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		(void) AdvanceXLInsertBuffer(false);
! 		/* OK to ignore update return flag, since we will do flush anyway */
! 		freespace = INSERT_FREESPACE(Insert);
  	}
! 	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
  
  	/*
  	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
! 	 * must be done while holding the insert lock AND the info_lck.
  	 *
  	 * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
  	 * pointing past where it really needs to point.  This is okay; the only
--- 8473,8492 ----
  	 * the buffer flush work.  Those XLOG records are logically after the
  	 * checkpoint, even though physically before it.  Got that?
  	 */
! 	freespace = INSERT_FREESPACE(curInsert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		XLByteAdvance(curInsert, freespace);
! 		if (curInsert.xrecoff % XLogSegSize == 0)
! 			curInsert.xrecoff += SizeOfXLogLongPHD;
! 		else
! 			curInsert.xrecoff += SizeOfXLogShortPHD;
  	}
! 	checkPoint.redo = curInsert;
  
  	/*
  	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
! 	 * must be done while holding the insert lock.
  	 *
  	 * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
  	 * pointing past where it really needs to point.  This is okay; the only
***************
*** 7753,7772 **** CreateCheckPoint(int flags)
  	 * XLogInserts that happen while we are dumping buffers must assume that
  	 * their buffer changes are not included in the checkpoint.
  	 */
! 	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile XLogCtlData *xlogctl = XLogCtl;
! 
! 		SpinLockAcquire(&xlogctl->info_lck);
! 		RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
! 		SpinLockRelease(&xlogctl->info_lck);
! 	}
  
  	/*
  	 * Now we can release WAL insert lock, allowing other xacts to proceed
  	 * while we are flushing disk buffers.
  	 */
! 	LWLockRelease(WALInsertLock);
  
  	/*
  	 * If enabled, log checkpoint start.  We postpone this until now so as not
--- 8495,8512 ----
  	 * XLogInserts that happen while we are dumping buffers must assume that
  	 * their buffer changes are not included in the checkpoint.
  	 */
! 	RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
  
  	/*
  	 * Now we can release WAL insert lock, allowing other xacts to proceed
  	 * while we are flushing disk buffers.
  	 */
! 	SpinLockRelease(&Insert->insertpos_lck);
! 
! 	/* Update the info_lck-protected copy of RedoRecPtr as well */
! 	SpinLockAcquire(&xlogctl->info_lck);
! 	xlogctl->RedoRecPtr = checkPoint.redo;
! 	SpinLockRelease(&xlogctl->info_lck);
  
  	/*
  	 * If enabled, log checkpoint start.  We postpone this until now so as not
***************
*** 7786,7792 **** CreateCheckPoint(int flags)
  	 * we wait till he's out of his commit critical section before proceeding.
  	 * See notes in RecordTransactionCommit().
  	 *
! 	 * Because we've already released WALInsertLock, this test is a bit fuzzy:
  	 * it is possible that we will wait for xacts we didn't really need to
  	 * wait for.  But the delay should be short and it seems better to make
  	 * checkpoint take a bit longer than to hold locks longer than necessary.
--- 8526,8532 ----
  	 * we wait till he's out of his commit critical section before proceeding.
  	 * See notes in RecordTransactionCommit().
  	 *
! 	 * Because we've already released insertpos_lck, this test is a bit fuzzy:
  	 * it is possible that we will wait for xacts we didn't really need to
  	 * wait for.  But the delay should be short and it seems better to make
  	 * checkpoint take a bit longer than to hold locks longer than necessary.
***************
*** 8153,8167 **** CreateRestartPoint(int flags)
  	 * the number of segments replayed since last restartpoint, and request a
  	 * restartpoint if it exceeds checkpoint_segments.
  	 *
! 	 * You need to hold WALInsertLock and info_lck to update it, although
! 	 * during recovery acquiring WALInsertLock is just pro forma, because
! 	 * there is no other processes updating Insert.RedoRecPtr.
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
! 	SpinLockAcquire(&xlogctl->info_lck);
  	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
  	SpinLockRelease(&xlogctl->info_lck);
- 	LWLockRelease(WALInsertLock);
  
  	/*
  	 * Prepare to accumulate statistics.
--- 8893,8910 ----
  	 * the number of segments replayed since last restartpoint, and request a
  	 * restartpoint if it exceeds checkpoint_segments.
  	 *
! 	 * Like in CreatecheckPoint(), hold insertpos_lck to update it, although
! 	 * during recovery acquiring insertpos_lck is just pro forma, because no
! 	 * WAL insertions are happening.
  	 */
! 	SpinLockAcquire(&xlogctl->Insert.insertpos_lck);
  	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+ 	SpinLockRelease(&xlogctl->Insert.insertpos_lck);
+ 
+ 	/* Also update the info_lck-protected copy */
+ 	SpinLockAcquire(&xlogctl->info_lck);
+ 	xlogctl->RedoRecPtr = lastCheckPoint.redo;
  	SpinLockRelease(&xlogctl->info_lck);
  
  	/*
  	 * Prepare to accumulate statistics.
***************
*** 8448,8454 **** XLogReportParameters(void)
  void
  UpdateFullPageWrites(void)
  {
! 	XLogCtlInsert *Insert = &XLogCtl->Insert;
  
  	/*
  	 * Do nothing if full_page_writes has not been changed.
--- 9191,9197 ----
  void
  UpdateFullPageWrites(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  
  	/*
  	 * Do nothing if full_page_writes has not been changed.
***************
*** 8471,8479 **** UpdateFullPageWrites(void)
  	 */
  	if (fullPageWrites)
  	{
! 		LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  		Insert->fullPageWrites = true;
! 		LWLockRelease(WALInsertLock);
  	}
  
  	/*
--- 9214,9222 ----
  	 */
  	if (fullPageWrites)
  	{
! 		SpinLockAcquire(&Insert->insertpos_lck);
  		Insert->fullPageWrites = true;
! 		SpinLockRelease(&Insert->insertpos_lck);
  	}
  
  	/*
***************
*** 8494,8502 **** UpdateFullPageWrites(void)
  
  	if (!fullPageWrites)
  	{
! 		LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  		Insert->fullPageWrites = false;
! 		LWLockRelease(WALInsertLock);
  	}
  	END_CRIT_SECTION();
  }
--- 9237,9245 ----
  
  	if (!fullPageWrites)
  	{
! 		SpinLockAcquire(&Insert->insertpos_lck);
  		Insert->fullPageWrites = false;
! 		SpinLockRelease(&Insert->insertpos_lck);
  	}
  	END_CRIT_SECTION();
  }
***************
*** 9024,9029 **** issue_xlog_fsync(int fd, uint32 log, uint32 seg)
--- 9767,9773 ----
  XLogRecPtr
  do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = (labelfile == NULL);
  	bool		backup_started_in_recovery = false;
  	XLogRecPtr	checkpointloc;
***************
*** 9086,9111 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  	 * Note that forcePageWrites has no effect during an online backup from
  	 * the standby.
  	 *
! 	 * We must hold WALInsertLock to change the value of forcePageWrites, to
  	 * ensure adequate interlocking against XLogInsert().
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
  	{
! 		if (XLogCtl->Insert.exclusiveBackup)
  		{
! 			LWLockRelease(WALInsertLock);
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("a backup is already in progress"),
  					 errhint("Run pg_stop_backup() and try again.")));
  		}
! 		XLogCtl->Insert.exclusiveBackup = true;
  	}
  	else
! 		XLogCtl->Insert.nonExclusiveBackups++;
! 	XLogCtl->Insert.forcePageWrites = true;
! 	LWLockRelease(WALInsertLock);
  
  	/* Ensure we release forcePageWrites if fail below */
  	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
--- 9830,9855 ----
  	 * Note that forcePageWrites has no effect during an online backup from
  	 * the standby.
  	 *
! 	 * We must hold insertpos_lck to change the value of forcePageWrites, to
  	 * ensure adequate interlocking against XLogInsert().
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
  	{
! 		if (Insert->exclusiveBackup)
  		{
! 			SpinLockRelease(&Insert->insertpos_lck);
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("a backup is already in progress"),
  					 errhint("Run pg_stop_backup() and try again.")));
  		}
! 		Insert->exclusiveBackup = true;
  	}
  	else
! 		Insert->nonExclusiveBackups++;
! 	Insert->forcePageWrites = true;
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	/* Ensure we release forcePageWrites if fail below */
  	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
***************
*** 9218,9230 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  			 * taking a checkpoint right after another is not that expensive
  			 * either because only few buffers have been dirtied yet.
  			 */
! 			LWLockAcquire(WALInsertLock, LW_SHARED);
! 			if (XLByteLT(XLogCtl->Insert.lastBackupStart, startpoint))
  			{
! 				XLogCtl->Insert.lastBackupStart = startpoint;
  				gotUniqueStartpoint = true;
  			}
! 			LWLockRelease(WALInsertLock);
  		} while (!gotUniqueStartpoint);
  
  		XLByteToSeg(startpoint, _logId, _logSeg);
--- 9962,9974 ----
  			 * taking a checkpoint right after another is not that expensive
  			 * either because only few buffers have been dirtied yet.
  			 */
! 			SpinLockAcquire(&Insert->insertpos_lck);
! 			if (XLByteLT(Insert->lastBackupStart, startpoint))
  			{
! 				Insert->lastBackupStart = startpoint;
  				gotUniqueStartpoint = true;
  			}
! 			SpinLockRelease(&Insert->insertpos_lck);
  		} while (!gotUniqueStartpoint);
  
  		XLByteToSeg(startpoint, _logId, _logSeg);
***************
*** 9308,9334 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  static void
  pg_start_backup_callback(int code, Datum arg)
  {
  	bool		exclusive = DatumGetBool(arg);
  
  	/* Update backup counters and forcePageWrites on failure */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
  	{
! 		Assert(XLogCtl->Insert.exclusiveBackup);
! 		XLogCtl->Insert.exclusiveBackup = false;
  	}
  	else
  	{
! 		Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 		XLogCtl->Insert.nonExclusiveBackups--;
  	}
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  }
  
  /*
--- 10052,10079 ----
  static void
  pg_start_backup_callback(int code, Datum arg)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = DatumGetBool(arg);
  
  	/* Update backup counters and forcePageWrites on failure */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
  	{
! 		Assert(Insert->exclusiveBackup);
! 		Insert->exclusiveBackup = false;
  	}
  	else
  	{
! 		Assert(Insert->nonExclusiveBackups > 0);
! 		Insert->nonExclusiveBackups--;
  	}
  
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  }
  
  /*
***************
*** 9341,9346 **** pg_start_backup_callback(int code, Datum arg)
--- 10086,10092 ----
  XLogRecPtr
  do_pg_stop_backup(char *labelfile, bool waitforarchive)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = (labelfile == NULL);
  	bool		backup_started_in_recovery = false;
  	XLogRecPtr	startpoint;
***************
*** 9394,9402 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  	/*
  	 * OK to update backup counters and forcePageWrites
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
! 		XLogCtl->Insert.exclusiveBackup = false;
  	else
  	{
  		/*
--- 10140,10148 ----
  	/*
  	 * OK to update backup counters and forcePageWrites
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
! 		Insert->exclusiveBackup = false;
  	else
  	{
  		/*
***************
*** 9405,9420 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  		 * backups, it is expected that each do_pg_start_backup() call is
  		 * matched by exactly one do_pg_stop_backup() call.
  		 */
! 		Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 		XLogCtl->Insert.nonExclusiveBackups--;
  	}
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  
  	if (exclusive)
  	{
--- 10151,10166 ----
  		 * backups, it is expected that each do_pg_start_backup() call is
  		 * matched by exactly one do_pg_stop_backup() call.
  		 */
! 		Assert(Insert->nonExclusiveBackups > 0);
! 		Insert->nonExclusiveBackups--;
  	}
  
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	if (exclusive)
  	{
***************
*** 9692,9707 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  void
  do_pg_abort_backup(void)
  {
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
! 	Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 	XLogCtl->Insert.nonExclusiveBackups--;
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  }
  
  /*
--- 10438,10455 ----
  void
  do_pg_abort_backup(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
! 
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	Assert(Insert->nonExclusiveBackups > 0);
! 	Insert->nonExclusiveBackups--;
  
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  }
  
  /*
***************
*** 9755,9766 **** GetStandbyFlushRecPtr(void)
  XLogRecPtr
  GetXLogInsertRecPtr(void)
  {
! 	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	current_recptr;
  
! 	LWLockAcquire(WALInsertLock, LW_SHARED);
! 	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
! 	LWLockRelease(WALInsertLock);
  
  	return current_recptr;
  }
--- 10503,10514 ----
  XLogRecPtr
  GetXLogInsertRecPtr(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	current_recptr;
  
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	current_recptr = Insert->CurrPos;
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	return current_recptr;
  }
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 1753,1761 **** GetOldestActiveTransactionId(void)
   * the result is somewhat indeterminate, but we don't really care.  Even in
   * a multiprocessor with delayed writes to shared memory, it should be certain
   * that setting of inCommit will propagate to shared memory when the backend
!  * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
!  * it's already inserted its commit record.  Whether it takes a little while
!  * for clearing of inCommit to propagate is unimportant for correctness.
   */
  int
  GetTransactionsInCommit(TransactionId **xids_p)
--- 1753,1762 ----
   * the result is somewhat indeterminate, but we don't really care.  Even in
   * a multiprocessor with delayed writes to shared memory, it should be certain
   * that setting of inCommit will propagate to shared memory when the backend
!  * takes a lock to write the WAL record, so we cannot fail to see an xact as
!  * inCommit if it's already inserted its commit record.  Whether it takes a
!  * little while for clearing of inCommit to propagate is unimportant for
!  * correctness.
   */
  int
  GetTransactionsInCommit(TransactionId **xids_p)
*** a/src/backend/storage/lmgr/spin.c
--- b/src/backend/storage/lmgr/spin.c
***************
*** 56,61 **** SpinlockSemas(void)
--- 56,64 ----
  	 *
  	 * For now, though, we just need a few spinlocks (10 should be plenty)
  	 * plus one for each LWLock and one for each buffer header.
+ 	 *
+ 	 * XXX: remember to adjust this for the number of spinlocks needed by the
+ 	 * xlog.c changes before committing!
  	 */
  	return NumLWLocks() + NBuffers + 10;
  }
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 53,59 **** typedef enum LWLockId
  	ProcArrayLock,
  	SInvalReadLock,
  	SInvalWriteLock,
! 	WALInsertLock,
  	WALWriteLock,
  	ControlFileLock,
  	CheckpointLock,
--- 53,59 ----
  	ProcArrayLock,
  	SInvalReadLock,
  	SInvalWriteLock,
! 	WALBufMappingLock,
  	WALWriteLock,
  	ControlFileLock,
  	CheckpointLock,
***************
*** 79,84 **** typedef enum LWLockId
--- 79,85 ----
  	SerializablePredicateLockListLock,
  	OldSerXidLock,
  	SyncRepLock,
+ 	WALInsertTailLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
