commit 83b1e4fcd74b4dd6c6992395f21e4fe606c8e80d
Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date:   Thu Jun 14 23:53:17 2012 +0300

    Rebase code from xloginsert-noslots branch.
    
    This is based on xloginsert-scale18.patch, but instead of slots, use the
    xl_rem_len to indicate that a record has been fully written.

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3f5e0b2..0d0e799 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/barrier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -261,14 +262,26 @@ XLogRecPtr	XactLastRecEnd = {0, 0};
  * (which is almost but not quite the same as a pointer to the most recent
  * CHECKPOINT record).	We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock).  See XLogInsert for details.	We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold the insertpos lock).  See XLogInsert for details.	We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
  */
 static XLogRecPtr RedoRecPtr;
 
 /*
+ * doPageWrites is this backend's local copy of the Insert->fullPageWrites ||
+ * Insert->forcePageWrites. It is refreshed at every insertion.
+ */
+static bool doPageWrites;
+
+/*
+ * FinalizedUpto is this backend's local copy of XLogCtl->Insert.FinalizedUpto.
+ * Everything before this is CRC'd and ready for writing out.
+ */
+static XLogRecPtr FinalizedUpto = { 0, 0 };
+
+/*
  * RedoStartLSN points to the checkpoint's REDO location which is specified
  * in a backup label file, backup history file or control file. In standby
  * mode, XLOG streaming usually starts from the position where an invalid
@@ -300,10 +313,15 @@ static XLogRecPtr RedoStartLSN = {0, 0};
  * (protected by info_lck), but we don't need to cache any copies of it.
  *
  * info_lck is only held long enough to read/update the protected variables,
- * so it's a plain spinlock.  The other locks are held longer (potentially
- * over I/O operations), so we use LWLocks for them.  These locks are:
+ * so it's a plain spinlock.  insertpos_lck protects the current logical
+ * insert location, ie. the head of reserved WAL space.  The other locks are
+ * held longer (potentially over I/O operations), so we use LWLocks for them.
+ * These locks are:
  *
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * This is only held while initializing and changing the mapping. If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
  *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
@@ -315,6 +333,93 @@ static XLogRecPtr RedoStartLSN = {0, 0};
  * only one checkpointer at a time; currently, with all checkpoints done by
  * the checkpointer, this is just pro forma).
  *
+ * WALInsertShareLocks: This lock is partitioned into multiple lwlocks. To
+ * hold it in share mode, it's enough to hold any of the lwlocks in share mode,
+ * but to hold it in exclusive mode, you must grab all the lwlocks.  It must
+ * be held in share-mode while inserting a new XLOG record, and in exclusive
+ * mode when changing RedoRecPtr or fullPageWrites. Those fields determine
+ * whether full-page images are included in a record, and they change very
+ * seldom, so we prefer to be fast and non-contended when they need to be
+ * read, and slow when they're changed.
+ *
+ *
+ * Inserting a new WAL record is a three-step process:
+ *
+ * 1. Reserve the right amount of space from the WAL. The current head of
+ *    reserved space is kept in Insert->CurrBytePos, and is protected by
+ *    insertpos_lck. Try to keep this section as short as possible,
+ *    insertpos_lck can be heavily contended on a busy system.
+ *
+ * 2. Copy the record to the reserved WAL space. This involves finding the
+ *    correct WAL buffer containing the reserved space, and copying the
+ *    record in place. This can be done concurrently in multiple processes.
+ *
+ * 3. Finalize the record by filling in xl_prev, and updating the CRC with it.
+ *    This can be done by another process, long after step 2. This only needs
+ *    to be done just before the record is flushed to disk, so it's done in
+ *    bulk at that point.
+ *
+ * To allow as much parallelism as possible, the conteneded portion of step 1
+ * is performed while only holding a spinlock. The duration the spinlock
+ * needs to be held is minimized by minimizing the calculations that have to
+ * be done while holding the lock. The current tip of reserved WAL is kept
+ * in CurrBytePos, as a byte position that only counts "usable" bytes in WAL,
+ * that is, it excludes all WAL page headers. The mapping between "usable" byte
+ * positions and physical positions (XLogRecPtrs) can be done outside the
+ * locked region, and because the usable byte position doesn't include any
+ * headers, reserving X bytes from WAL is simply "CurrBytePos += X". On
+ * platforms that have an atomic 64-bit fetch-and-add instruction, we don't
+ * even need a spinlock (XXX: not implemented yet - ATM spinlock is always
+ * used).
+ *
+ * Step 2 can usually be done completely in parallel. If the required WAL
+ * page is not initialized yet, you have to grab WALBufMappingLock to
+ * initialize it, but we pre-initialize WAL buffers in the WAL writer to
+ * avoid that from happening in the critical path.
+ *
+ * In step 2, the xl_prev field is left at 0/0, because even though we've
+ * reserved a slice of WAL space for the record, we don't know where the
+ * previous record began. We could keep track of that along with CurrBytePos,
+ * in step 1, but then it would no longer be possible to implement it with
+ * an atomic fetch-and-add instruction. So at step 3, we finalize all the
+ * records by filling in xl_prev, and calculating the final CRC that includes
+ * xl_prev as well. Finalization starts from the end of the last finalized
+ * records, and walks the chain of WAL records until it hits a record with
+ * xl_tot_len == 0. Setting xl_tot_len is a sign that the record is fully
+ * written - a memory barrier ensures that xl_tot_len is not seen by other
+ * processes before the rest of the record. If the record doesn't fit on the
+ * page, setting xl_tot_len indicates that the record is fully written up to
+ * the page boundary, and on the next page, setting XLP_FIRST_IS_CONTRECORD
+ * acts as a signal that the continued part is fully written to the page.
+ *
+ * XXX: There is currently no good mechanism to wait for step 2 of an
+ * insertion to finish. Step 3 busy-loops. In the previous version of this
+ * patch, which used "insertion slots", the slot included a linked list of
+ * PGPROCs waiting for the slot to finish inserting, similar to LWLocks.
+ * We'll probably need to add something like that, busy-waiting is not good.
+ *
+ *
+ * Deadlock analysis
+ * -----------------
+ *
+ * It's important to call WaitXLogInsertionsToFinish() *before* acquiring
+ * WALWriteLock. Otherwise you might get stuck waiting for an insertion to
+ * finish (or at least advance to next uninitialized page), while you're
+ * holding WALWriteLock. That would be bad, because the backend you're waiting
+ * for might need to acquire WALWriteLock, too, to evict an old buffer, so
+ * you'd get deadlock.
+ *
+ * WaitXLogInsertionsToFinish() will not get stuck indefinitely, as long as
+ * it's called with a location that's known to be already allocated in the WAL
+ * buffers. Calling it with the position of a record you've already inserted
+ * satisfies that condition, so the common pattern:
+ *
+ *   recptr = XLogInsert(...)
+ *   XLogFlush(recptr)
+ *
+ * is safe. It can't get stuck, because an insertion to a WAL page that's
+ * already initialized in cache can always proceed without waiting on a lock.
+ *
  *----------
  */
 
@@ -335,12 +440,26 @@ typedef struct XLogwrtResult
  */
 typedef struct XLogCtlInsert
 {
-	XLogRecPtr	PrevRecord;		/* start of previously-inserted record */
-	int			curridx;		/* current block index in cache */
-	XLogPageHeader currpage;	/* points to header of block in cache */
-	char	   *currpos;		/* current insertion point in cache */
-	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
-	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
+	slock_t		insertpos_lck;	/* protects CurrBytePos */
+
+	/*
+	 * CurrBytePos is the very tip of the reserved WAL space at the moment.
+	 * The next record will be inserted there.
+	 */
+	uint64		CurrBytePos;
+
+	/*
+	 * These fields track the progress of record finalization. FinalizedUpto
+	 * points to the end of fully finalized portion - everything before it
+	 * is ready to be written to disk. LastFinalizedRecord points to the
+	 * beginning of the last finalized record. When the next record is
+	 * finalized, it is written to the xl_prev of the next record. If
+	 * ExpectingContRecord is true, we are stopped at a page boundary, in the
+	 * middle of a WAL record. These fields are protected by WALInsertTailLock.
+	 */
+	XLogRecPtr	FinalizedUpto;
+	XLogRecPtr	LastFinalizedRecord;
+	bool		ExpectingContRecord;
 
 	/*
 	 * fullPageWrites is the master copy used by all backends to determine
@@ -348,7 +467,11 @@ typedef struct XLogCtlInsert
 	 * one. This is required because, when full_page_writes is changed
 	 * by SIGHUP, we must WAL-log it before it actually affects
 	 * WAL-logging by backends. Checkpointer sets at startup or after SIGHUP.
+	 *
+	 * These fields are protected by WALInsertShareLocks.
 	 */
+	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
+	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
 	bool		fullPageWrites;
 
 	/*
@@ -372,16 +495,21 @@ typedef struct XLogCtlWrite
 	pg_time_t	lastSegSwitchTime;		/* time of last xlog segment switch */
 } XLogCtlWrite;
 
+
 /*
  * Total shared-memory state for XLOG.
  */
 typedef struct XLogCtlData
 {
-	/* Protected by WALInsertLock: */
+	/*
+	 * Note: Insert must be the first field in the struct or it won't be
+	 * aligned to a cache-line boundary like we want it to be.
+	 */
 	XLogCtlInsert Insert;
 
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
+	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
 	uint32		ckptXidEpoch;	/* nextXID & epoch of latest checkpoint */
 	TransactionId ckptXid;
 	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
@@ -397,9 +525,18 @@ typedef struct XLogCtlData
 	XLogwrtResult LogwrtResult;
 
 	/*
+	 * To change curridx and the identity of a buffer, you need to hold
+	 * WALBufMappingLock.  To change the identity of a buffer that's still
+	 * dirty, the old page needs to be written out first, and for that you
+	 * need WALWriteLock, and you need to ensure that there's no in-progress
+	 * insertions to the page by calling WaitXLogInsertionsToFinish().
+	 */
+	int			curridx;		/* latest initialized block index in cache */
+
+	/*
 	 * These values do not change after startup, although the pointed-to pages
 	 * and xlblocks values certainly do.  Permission to read/write the pages
-	 * and xlblocks values depends on WALInsertLock and WALWriteLock.
+	 * and xlblocks values depends on WALBufMappingLock and WALWriteLock.
 	 */
 	char	   *pages;			/* buffers for unwritten XLOG pages */
 	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -479,30 +616,37 @@ static XLogCtlData *XLogCtl = NULL;
 static ControlFileData *ControlFile = NULL;
 
 /*
- * Macros for managing XLogInsert state.  In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'.
+ * Beware multiple evaluation!
  */
+#define INSERT_FREESPACE(endptr)	\
+	(((endptr).xrecoff % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr).xrecoff % XLOG_BLCKSZ))
 
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert)  \
-	(XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
+/*
+ * Macros to advance to next buffer index and insertion slot.
+ */
+#define NextBufIdx(idx)		\
+		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
 
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx)  \
-	do {																\
-		(recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid;			\
-		(recptr).xrecoff =												\
-			XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert); \
-		if (XLogCtl->xlblocks[curridx].xrecoff == 0)					\
-			(recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid - 1;	\
-	} while(0)
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ *
+ * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
+ * page is taken to mean the previous page.
+ */
+#define XLogRecPtrToBufIdx(recptr)	\
+	(((((((uint64) (recptr).xlogid) << 32) + (recptr).xrecoff)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define PrevBufIdx(idx)		\
-		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
+#define XLogRecEndPtrToBufIdx(recptr)	\
+	(((((((uint64) (recptr).xlogid) << 32) + (recptr).xrecoff - 1)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define NextBufIdx(idx)		\
-		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
+/*
+ * These are the number of bytes usable in a WAL page and segment, excluding
+ * page headers.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
 
 /*
  * Private, possibly out-of-date copy of shared LogwrtResult.
@@ -625,9 +769,9 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
 				XLogRecPtr *lsn, BkpBlock *bkpb);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 					   bool find_free, int *max_advance,
 					   bool use_lock);
@@ -674,6 +818,75 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int	get_sync_bit(int method);
 
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+				  XLogRecData *rdata,
+				  XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+						  XLogRecPtr *EndPos);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static char *GetXLogBuffer(XLogRecPtr ptr, bool failok);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+/*
+ * Equivalent of LWLockAcquire() for the partitioned WALInsertShareLock.
+ */
+static void
+WALInsertLockAcquire(LWLockMode mode)
+{
+	int lockid;
+
+	if (mode == LW_EXCLUSIVE)
+	{
+		/*
+		 * To acquire the lock in exclusive mode, need to hold all the
+		 * partition locks.
+		 */
+		for (lockid = FirstWALInsertShareLock; lockid <= LastWALInsertShareLock; lockid++)
+		{
+			LWLockAcquire(lockid, LW_EXCLUSIVE);
+		}
+	}
+	else
+	{
+		/*
+		 * Grab one of the partitioned locks. It doesn't matter which one,
+		 * but to avoid contention, it's good if different processes choose
+		 * different locks.
+		 */
+		lockid = FirstWALInsertShareLock +
+			(MyProc->pgprocno % (LastWALInsertShareLock - FirstWALInsertShareLock + 1));
+		LWLockAcquire(lockid, LW_SHARED);
+	}
+}
+
+/*
+ * Equivalent of LWLockRelease() for the partitioned WALInsertShareLock.
+ */
+static void
+WALInsertLockRelease(LWLockMode mode)
+{
+	int lockid;
+
+	if (mode == LW_EXCLUSIVE)
+	{
+		for (lockid = FirstWALInsertShareLock; lockid <= LastWALInsertShareLock; lockid++)
+		{
+			LWLockRelease(lockid);
+		}
+	}
+	else
+	{
+		/*
+		 * this calculation better match the one used when the lock was
+		 * acquired.
+		 */
+		lockid = FirstWALInsertShareLock + (MyProc->pgprocno % (LastWALInsertShareLock - FirstWALInsertShareLock + 1));
+		LWLockRelease(lockid);
+	}
+}
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -693,11 +906,7 @@ static int	get_sync_bit(int method);
 XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 {
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
-	XLogRecPtr	RecPtr;
-	XLogRecPtr	WriteRqst;
-	uint32		freespace;
-	int			curridx;
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
 	XLogRecData *rdt;
 	XLogRecData *rdt_lastnormal;
 	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
@@ -712,12 +921,14 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 	uint32		len,
 				write_len;
 	unsigned	i;
-	bool		updrqst;
-	bool		doPageWrites;
 	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
-	uint8		info_orig = info;
 	static XLogRecord *rechdr;
+	XLogRecPtr	StartPos;
+	XLogRecPtr	EndPos;
 
+	/*
+	 * On the first call, allocate a buffer to hold the xlog record.
+	 */
 	if (rechdr == NULL)
 	{
 		rechdr = malloc(SizeOfXLogRecord);
@@ -742,40 +953,33 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 	 */
 	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
 	{
-		RecPtr.xlogid = 0;
-		RecPtr.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
-		return RecPtr;
+		EndPos.xlogid = 0;
+		EndPos.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
+		return EndPos;
 	}
 
 	/*
 	 * Here we scan the rdata chain, to determine which buffers must be backed
 	 * up.
 	 *
-	 * We may have to loop back to here if a race condition is detected below.
-	 * We could prevent the race by doing all this work while holding the
-	 * insert lock, but it seems better to avoid doing CRC calculations while
-	 * holding the lock.
-	 *
 	 * We add entries for backup blocks to the chain, so that they don't
 	 * need any special treatment in the critical section where the chunks are
-	 * copied into the WAL buffers. Those entries have to be unlinked from the
-	 * chain if we have to loop back here.
+	 * copied into the WAL buffers.
+	 *
+	 * First acquire WALInsertShareLock, to prevent RedoRecPtr and
+	 * force/fullPageWrites flags from changing.
 	 */
-begin:;
+	WALInsertLockAcquire(isLogSwitch ? LW_EXCLUSIVE : LW_SHARED);
+
+	doPageWrites = Insert->forcePageWrites || Insert->fullPageWrites;
+	RedoRecPtr = Insert->RedoRecPtr;
+
 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
 	{
 		dtbuf[i] = InvalidBuffer;
 		dtbuf_bkp[i] = false;
 	}
 
-	/*
-	 * Decide if we need to do full-page writes in this XLOG record: true if
-	 * full_page_writes is on or we have a PITR request for it.  Since we
-	 * don't yet have the insert lock, fullPageWrites and forcePageWrites
-	 * could change under us, but we'll recheck them once we have the lock.
-	 */
-	doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
-
 	len = 0;
 	for (rdt = rdata;;)
 	{
@@ -831,8 +1035,7 @@ begin:;
 	 * NOTE: We disallow len == 0 because it provides a useful bit of extra
 	 * error checking in ReadRecord.  This means that all callers of
 	 * XLogInsert must supply at least some not-in-a-buffer data.  However, we
-	 * make an exception for XLOG SWITCH records because we don't want them to
-	 * ever cross a segment boundary.
+	 * make an exception for XLOG SWITCH records.
 	 */
 	if (len == 0 && !isLogSwitch)
 		elog(PANIC, "invalid xlog record length %u", len);
@@ -840,9 +1043,7 @@ begin:;
 	/*
 	 * Make additional rdata chain entries for the backup blocks, so that we
 	 * don't need to special-case them in the write loop.  This modifies the
-	 * original rdata chain, but we keep a pointer to the last regular entry,
-	 * rdt_lastnormal, so that we can undo this if we have to loop back to the
-	 * beginning.
+	 * original rdata chain.
 	 *
 	 * At the exit of this loop, write_len includes the backup block data.
 	 *
@@ -912,15 +1113,23 @@ begin:;
 		COMP_CRC32(rdata_crc, rdt->data, rdt->len);
 
 	/*
-	 * Construct record header (prev-link and CRC are filled in later), and
-	 * make that the first chunk in the chain.
+	 * Construct record header (prev-link is filled in later, in record
+	 * finalization), and make that the first chunk in the chain.
 	 */
 	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
 	rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
 	rechdr->xl_len = len;		/* doesn't include backup blocks */
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
+	rechdr->xl_prev = InvalidXLogRecPtr;
+	/*
+	 * The CRC calculated here doesn't include the correct prev-link yet.
+	 * It will be updated in record finalization.
+	 */
+	COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
+	rechdr->xl_crc = rdata_crc;
 
+	/* Make the record header the first chunk in the chain */
 	hdr_rdt.next = rdata;
 	hdr_rdt.data = (char *) rechdr;
 	hdr_rdt.len = SizeOfXLogRecord;
@@ -929,118 +1138,82 @@ begin:;
 
 	START_CRIT_SECTION();
 
-	/* Now wait to get insert lock */
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-
 	/*
-	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go
-	 * back and recompute everything.  This can only happen just after a
-	 * checkpoint, so it's better to be slow in this case and fast otherwise.
-	 *
-	 * If we aren't doing full-page writes then RedoRecPtr doesn't actually
-	 * affect the contents of the XLOG record, so we'll update our local copy
-	 * but not force a recomputation.
+	 * Reserve space for the record from the WAL, and copy the record there.
 	 */
-	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
+	if (isLogSwitch)
 	{
-		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
-		RedoRecPtr = Insert->RedoRecPtr;
+		if (ReserveXLogSwitch(&StartPos, &EndPos))
+		{
+			WaitXLogInsertionsToFinish(StartPos);
 
-		if (doPageWrites)
+			CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt,
+								StartPos, EndPos);
+		}
+		else
 		{
-			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-			{
-				if (dtbuf[i] == InvalidBuffer)
-					continue;
-				if (dtbuf_bkp[i] == false &&
-					XLByteLE(dtbuf_lsn[i], RedoRecPtr))
-				{
-					/*
-					 * Oops, this buffer now needs to be backed up, but we
-					 * didn't think so above.  Start over.
-					 */
-					LWLockRelease(WALInsertLock);
-					END_CRIT_SECTION();
-					rdt_lastnormal->next = NULL;
-					info = info_orig;
-					goto begin;
-				}
-			}
+			/*
+			 * The current insert location was already exactly at the beginning
+			 * of a segment, so there's no need to switch.
+			 */
 		}
 	}
-
-	/*
-	 * Also check to see if fullPageWrites or forcePageWrites was just turned on;
-	 * if we weren't already doing full-page writes then go back and recompute.
-	 * (If it was just turned off, we could recompute the record without full pages,
-	 * but we choose not to bother.)
-	 */
-	if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
+	else
 	{
-		/* Oops, must redo it with full-page data. */
-		LWLockRelease(WALInsertLock);
-		END_CRIT_SECTION();
-		rdt_lastnormal->next = NULL;
-		info = info_orig;
-		goto begin;
+		ReserveXLogInsertLocation(write_len, &StartPos, &EndPos);
+
+		/* And copy the record there. */
+		CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
 	}
+	END_CRIT_SECTION();
+
+	WALInsertLockRelease(isLogSwitch ? LW_EXCLUSIVE : LW_SHARED);
 
 	/*
-	 * If the current page is completely full, the record goes to the next
-	 * page, right after the page header.
+	 * Update shared LogwrtRqst.Write, if we crossed page boundary.
 	 */
-	updrqst = false;
-	freespace = INSERT_FREESPACE(Insert);
-	if (freespace == 0)
+	if (StartPos.xrecoff / XLOG_BLCKSZ != EndPos.xrecoff / XLOG_BLCKSZ)
 	{
-		updrqst = AdvanceXLInsertBuffer(false);
-		freespace = INSERT_FREESPACE(Insert);
-	}
+		/* use volatile pointer to prevent code rearrangement */
+		volatile XLogCtlData *xlogctl = XLogCtl;
 
-	/* Compute record's XLOG location */
-	curridx = Insert->curridx;
-	INSERT_RECPTR(RecPtr, Insert, curridx);
+		SpinLockAcquire(&xlogctl->info_lck);
+		/* advance global request to include new block(s) */
+		if (XLByteLT(xlogctl->LogwrtRqst.Write, EndPos))
+			xlogctl->LogwrtRqst.Write = EndPos;
+		/* update local result copy while I have the chance */
+		LogwrtResult = xlogctl->LogwrtResult;
+		SpinLockRelease(&xlogctl->info_lck);
+	}
 
 	/*
-	 * If the record is an XLOG_SWITCH, and we are exactly at the start of a
-	 * segment, we need not insert it (and don't want to because we'd like
-	 * consecutive switch requests to be no-ops).  Instead, make sure
-	 * everything is written and flushed through the end of the prior segment,
-	 * and return the prior segment's end address.
+	 * If this was an XLOG_SWITCH record, flush the record and the empty
+	 * padding space that fills the rest of the segment, and perform
+	 * end-of-segment actions (eg, notifying archiver).
 	 */
-	if (isLogSwitch &&
-		(RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
+	if (isLogSwitch)
 	{
-		/* We can release insert lock immediately */
-		LWLockRelease(WALInsertLock);
-
-		RecPtr.xrecoff -= SizeOfXLogLongPHD;
-
-		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-		LogwrtResult = XLogCtl->LogwrtResult;
-		if (!XLByteLE(RecPtr, LogwrtResult.Flush))
+		TRACE_POSTGRESQL_XLOG_SWITCH();
+		XLogFlush(EndPos);
+		/*
+		 * Even though we reserved the rest of the segment for us, which is
+		 * reflected in EndPos, we return a pointer to just the end of the
+		 * xlog-switch record.
+		 */
+		if (StartPos.xrecoff % XLOG_SEG_SIZE != 0)
 		{
-			XLogwrtRqst FlushRqst;
-
-			FlushRqst.Write = RecPtr;
-			FlushRqst.Flush = RecPtr;
-			XLogWrite(FlushRqst, false, false);
+			EndPos = StartPos;
+			XLByteAdvance(EndPos, SizeOfXLogRecord);
+			if (StartPos.xrecoff / XLOG_BLCKSZ != EndPos.xrecoff / XLOG_BLCKSZ)
+			{
+				if (EndPos.xrecoff % XLOG_SEG_SIZE == EndPos.xrecoff % XLOG_BLCKSZ)
+					EndPos.xrecoff += SizeOfXLogLongPHD;
+				else
+					EndPos.xrecoff += SizeOfXLogShortPHD;
+			}
 		}
-		LWLockRelease(WALWriteLock);
-
-		END_CRIT_SECTION();
-
-		return RecPtr;
 	}
 
-	/* Finish the record header */
-	rechdr->xl_prev = Insert->PrevRecord;
-
-	/* Now we can finish computing the record's CRC */
-	COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
-	FIN_CRC32(rdata_crc);
-	rechdr->xl_crc = rdata_crc;
-
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG)
 	{
@@ -1048,7 +1221,7 @@ begin:;
 
 		initStringInfo(&buf);
 		appendStringInfo(&buf, "INSERT @ %X/%X: ",
-						 RecPtr.xlogid, RecPtr.xrecoff);
+						 EndPos.xlogid, EndPos.xrecoff);
 		xlog_outrec(&buf, rechdr);
 		if (rdata->data != NULL)
 		{
@@ -1060,165 +1233,741 @@ begin:;
 	}
 #endif
 
-	/* Record begin of record in appropriate places */
-	ProcLastRecPtr = RecPtr;
-	Insert->PrevRecord = RecPtr;
+	/*
+	 * Update our global variables
+	 */
+	ProcLastRecPtr = StartPos;
+	XactLastRecEnd = EndPos;
+
+	return EndPos;
+}
+
+/*
+ * Subroutine of XLogInsert.  Copies a WAL record to an already-reserved
+ * area in the WAL.
+ */
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+					XLogRecData *rdata,
+					XLogRecPtr StartPos, XLogRecPtr EndPos)
+{
+	char	   *currpos;
+	int			freespace;
+	int			written;
+	XLogRecPtr	CurrPos;
+	XLogRecord *rechdr;
+	uint32	   *tot_len_p;
+	bool		firstpage = true;
+	XLogPageHeader pagehdr = NULL;
+
+	/* The first chunk should be the record header */
+	rechdr = (XLogRecord *) rdata->data;
+	Assert(rdata->len == SizeOfXLogRecord);
 
 	/*
-	 * Append the data, including backup blocks if any
+	 * When we write the record, we initially leave xl_tot_len at zero,
+	 * and set it to the correct value only after copying the rest of the
+	 * record in place. That way when a process sees that xl_tot_len is set,
+	 * it knows that the record is fully copied in place (or the part that
+	 * fits on this page, anyway).
 	 */
-	rdata = &hdr_rdt;
-	while (write_len)
+	Assert(rechdr->xl_tot_len == write_len);
+	rechdr->xl_tot_len = 0;
+
+	/* Get the right WAL page to start inserting to */
+	CurrPos = StartPos;
+	currpos = GetXLogBuffer(CurrPos, false);
+	freespace = INSERT_FREESPACE(CurrPos);
+
+	/*
+	 * there should be enough space for at least the first field (xl_tot_len)
+	 * on this page.
+	 */
+	Assert(freespace >= sizeof(uint32));
+	tot_len_p = (uint32 *) currpos;
+
+	/* Copy record data */
+	written = 0;
+	while (rdata != NULL)
 	{
-		while (rdata->data == NULL)
-			rdata = rdata->next;
+		char	   *rdata_data = rdata->data;
+		int			rdata_len = rdata->len;
 
-		if (freespace > 0)
+		while (rdata_len > freespace)
 		{
-			if (rdata->len > freespace)
+			/*
+			 * Write what fits on this page, and continue on the next page.
+			 */
+			Assert (((uint64) currpos) % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+			memcpy(currpos, rdata_data, freespace);
+			rdata_data += freespace;
+			rdata_len -= freespace;
+			written += freespace;
+			XLByteAdvance(CurrPos, freespace);
+
+			/*
+			 * Before we step to the next page, let others know that we're done
+			 * copying to this page, by setting xl_tot_len (or
+			 * XLP_FIRST_IS_CONT_RECORD, if we're continuing from previous
+			 * page).
+			 */
+			pg_write_barrier();
+			if (firstpage)
 			{
-				memcpy(Insert->currpos, rdata->data, freespace);
-				rdata->data += freespace;
-				rdata->len -= freespace;
-				write_len -= freespace;
+				*tot_len_p = write_len;
+				firstpage = false;
 			}
 			else
+				pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+			/*
+			 * Get pointer to beginning of next page, and set the xlp_rem_len
+			 * in the page header. We don't set XLP_FIRST_IS_CONTRECORD yet,
+			 * that is used to signal that we're done copying, so it's done
+			 * last.
+			 *
+			 * It's safe to set the contrecord flag  and xlp_rem_len without a
+			 * lock on the page. All the other flags were already set when the
+			 * page was initialized, in AdvanceXLInsertBuffer, and we're the
+			 * only backend that needs to set the contrecord flag.
+			 */
+			currpos = GetXLogBuffer(CurrPos, false);
+			pagehdr = (XLogPageHeader) currpos;
+			pagehdr->xlp_rem_len = write_len - written;
+
+			/* skip over the page header */
+			if (CurrPos.xrecoff % XLogSegSize == 0)
 			{
-				memcpy(Insert->currpos, rdata->data, rdata->len);
-				freespace -= rdata->len;
-				write_len -= rdata->len;
-				Insert->currpos += rdata->len;
-				rdata = rdata->next;
-				continue;
+				CurrPos.xrecoff += SizeOfXLogLongPHD;
+				currpos += SizeOfXLogLongPHD;
 			}
+			else
+			{
+				CurrPos.xrecoff += SizeOfXLogShortPHD;
+				currpos += SizeOfXLogShortPHD;
+			}
+			freespace = INSERT_FREESPACE(CurrPos);
 		}
 
-		/* Use next buffer */
-		updrqst = AdvanceXLInsertBuffer(false);
-		curridx = Insert->curridx;
-		/* Insert cont-record header */
-		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
-		Insert->currpage->xlp_rem_len = write_len;
-		freespace = INSERT_FREESPACE(Insert);
+		Assert (((uint64) currpos) % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+		memcpy(currpos, rdata_data, rdata_len);
+		currpos += rdata_len;
+		XLByteAdvance(CurrPos, rdata_len);
+		freespace -= rdata_len;
+		written += rdata_len;
+
+		rdata = rdata->next;
 	}
+	Assert(written == write_len);
 
-	/* Ensure next record will be properly aligned */
-	Insert->currpos = (char *) Insert->currpage +
-		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
-	freespace = INSERT_FREESPACE(Insert);
+	/* Align the end position, so that the next record starts aligned */
+	if (CurrPos.xrecoff % MAXIMUM_ALIGNOF != 0)
+	{
+		CurrPos.xrecoff = MAXALIGN(CurrPos.xrecoff);
+		if (CurrPos.xrecoff == 0)
+		{
+			/* crossed a logid boundary */
+			CurrPos.xlogid += 1;
+		}
+	}
 
 	/*
-	 * The recptr I return is the beginning of the *next* record. This will be
-	 * stored as LSN for changed data pages...
+	 * Done! Let others know that we're finished.
 	 */
-	INSERT_RECPTR(RecPtr, Insert, curridx);
+	pg_write_barrier();
+	if (firstpage)
+		*tot_len_p = write_len;
+	else
+		pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
 
 	/*
-	 * If the record is an XLOG_SWITCH, we must now write and flush all the
-	 * existing data, and then forcibly advance to the start of the next
-	 * segment.  It's not good to do this I/O while holding the insert lock,
-	 * but there seems too much risk of confusion if we try to release the
-	 * lock sooner.  Fortunately xlog switch needn't be a high-performance
-	 * operation anyway...
+	 * If this was an xlog-switch, it's not enough to write the switch record,
+	 * we also have to consume all the remaining space in the WAL segment.
+	 * We have already reserved it for us, but we still need to make sure it's
+	 * allocated and zeroed in the WAL buffers so that when the caller (or
+	 * someone else) does XLogWrite(), it can really write out all the zeros.
 	 */
-	if (isLogSwitch)
+	if (isLogSwitch && CurrPos.xrecoff % XLOG_SEG_SIZE != 0)
 	{
-		XLogwrtRqst FlushRqst;
-		XLogRecPtr	OldSegEnd;
+		volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
 
-		TRACE_POSTGRESQL_XLOG_SWITCH();
+		WaitXLogInsertionsToFinish(CurrPos);
 
-		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+		/* An xlog-switch record doesn't contain any data besides the header */
+		Assert(write_len == SizeOfXLogRecord);
 
 		/*
-		 * Flush through the end of the page containing XLOG_SWITCH, and
-		 * perform end-of-segment actions (eg, notifying archiver).
+		 * We do this one page at a time, to make sure we don't deadlock
+		 * against ourselves if wal_buffers < XLOG_SEG_SIZE.
 		 */
-		WriteRqst = XLogCtl->xlblocks[curridx];
-		FlushRqst.Write = WriteRqst;
-		FlushRqst.Flush = WriteRqst;
-		XLogWrite(FlushRqst, false, true);
-
-		/* Set up the next buffer as first page of next segment */
-		/* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
-		(void) AdvanceXLInsertBuffer(true);
+		Assert(EndPos.xrecoff % XLogSegSize == 0);
 
-		/* There should be no unwritten data */
-		curridx = Insert->curridx;
-		Assert(curridx == XLogCtl->Write.curridx);
+		/* Use up all the remaining space on the first page */
+		XLByteAdvance(CurrPos, freespace);
 
-		/* Compute end address of old segment */
-		OldSegEnd = XLogCtl->xlblocks[curridx];
-		if (OldSegEnd.xrecoff == 0)
+		while (XLByteLT(CurrPos, EndPos))
 		{
-			/* crossing a logid boundary */
-			OldSegEnd.xlogid -= 1;
+			/* initialize the next page (if not initialized already) */
+			AdvanceXLInsertBuffer(CurrPos, false);
+			XLByteAdvance(CurrPos, XLOG_BLCKSZ);
+
+			/*
+			 * Update FinalizedUpto immediately. FinalizeRecord() doesn't know
+			 * that an xlog-switch record consumes the rest of the segment,
+			 * so we have to do this ourselves.
+			 */
+			LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
+			FinalizedUpto = Insert->FinalizedUpto = CurrPos;
+			Insert->ExpectingContRecord = false;
+			Assert(XLByteEQ(Insert->LastFinalizedRecord, StartPos));
+			LWLockRelease(WALInsertTailLock);
 		}
-		OldSegEnd.xrecoff -= XLOG_BLCKSZ;
+	}
+	if (!XLByteEQ(CurrPos, EndPos))
+		elog(PANIC, "space reserved for WAL record does not match what was written");
+}
+
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos_p is set to the beginning of the reserved section, *EndPos_p to
+ * its end+1.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos)
+{
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	uint64		startbytepos;
+	uint64		endbytepos;
 
-		/* Make it look like we've written and synced all of old segment */
-		LogwrtResult.Write = OldSegEnd;
-		LogwrtResult.Flush = OldSegEnd;
+	size = MAXALIGN(size);
 
-		/*
-		 * Update shared-memory status --- this code should match XLogWrite
-		 */
-		{
-			/* use volatile pointer to prevent code rearrangement */
-			volatile XLogCtlData *xlogctl = XLogCtl;
+	/* All (non xlog-switch) records should contain data. */
+	Assert(size > SizeOfXLogRecord);
 
-			SpinLockAcquire(&xlogctl->info_lck);
-			xlogctl->LogwrtResult = LogwrtResult;
-			if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
-				xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-			if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
-				xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
-			SpinLockRelease(&xlogctl->info_lck);
-		}
+	SpinLockAcquire(&Insert->insertpos_lck);
 
-		LWLockRelease(WALWriteLock);
+	startbytepos = Insert->CurrBytePos;
+	endbytepos = startbytepos + size;
+	Insert->CurrBytePos = endbytepos;
+
+	SpinLockRelease(&Insert->insertpos_lck);
+
+	*StartPos = XLogBytePosToRecPtr(startbytepos);
+	Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
+	Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+}
+
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos_p value. However, if we are already at the beginning of the current
+ * segment, the *EndPos_p is set to the current location without reserving
+ * any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos)
+{
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	uint64		startbytepos;
+	uint64		endbytepos;
+	uint32		size = SizeOfXLogRecord;
+	XLogRecPtr	ptr;
+	uint32		segleft;
+
+	SpinLockAcquire(&Insert->insertpos_lck);
+
+	startbytepos = Insert->CurrBytePos;
+
+	ptr = XLogBytePosToEndRecPtr(startbytepos);
+	if (ptr.xrecoff % XLOG_SEG_SIZE == 0)
+	{
+		SpinLockRelease(&Insert->insertpos_lck);
+		*EndPos = *StartPos = ptr;
+		return false;
+	}
+
+	*StartPos = XLogBytePosToRecPtr(startbytepos);
+
+	endbytepos = startbytepos + size;
 
-		updrqst = false;		/* done already */
+	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
+	Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+
+	Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+
+	segleft = XLOG_SEG_SIZE - (EndPos->xrecoff % XLOG_SEG_SIZE);
+	if (segleft != XLOG_SEG_SIZE)
+	{
+		/* consume the rest of the segment */
+		EndPos->xrecoff += segleft;
+		endbytepos = XLogRecPtrToBytePos(*EndPos);
+	}
+	Insert->CurrBytePos = endbytepos;
+
+	SpinLockRelease(&Insert->insertpos_lck);
+
+	Assert(EndPos->xrecoff % XLOG_BLCKSZ == 0);
+
+	return true;
+}
+
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ * Unless failok == true, in which case the function returns NULL instead.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. If you have reserved some WAL
+ * space, and not yet marked that you're done inserting it (by not having
+ * set xl_tot_len yet), that is enough. You should not be holding onto
+ * anything < ptr, though, because that might lead to deadlock if we would
+ * need to evict an old buffer to make room for the new one.
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr, bool failok)
+{
+	int			idx;
+	XLogRecPtr	endptr;
+	static uint32 cachedXlogid = 0;
+	static uint32 cachedPage = 0;
+	static char *cachedPos = NULL;
+	XLogRecPtr	expectedEndPtr;
+
+	/*
+	 * Fast path for the common case that we need to access again the same
+	 * page as last time.
+	 */
+	if (ptr.xlogid == cachedXlogid && ptr.xrecoff / XLOG_BLCKSZ == cachedPage)
+	{
+		Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+		Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr.xlogid == cachedXlogid);
+		Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr.xrecoff == cachedPage * XLOG_BLCKSZ);
+		return cachedPos + ptr.xrecoff % XLOG_BLCKSZ;
+	}
+
+	/*
+	 * The XLog buffer cache is organized so that a page must always be loaded
+	 * to a particular buffer.  That way we can easily calculate the buffer
+	 * a given page must be loaded into, from the XLogRecPtr alone.
+	 */
+	idx = XLogRecPtrToBufIdx(ptr);
+
+	/*
+	 * See what page is loaded in the buffer at the moment. It could be the
+	 * page we're looking for, or something older. It can't be anything newer
+	 * - that would imply the page we're looking for has already been written
+	 * out to disk and evicted, and the caller is responsible for making sure
+	 * that doesn't happen.
+	 *
+	 * However, we don't hold a lock while we read the value. If someone has
+	 * just initialized the page, it's possible that we get a "torn read" of
+	 * the XLogRecPtr, and see a bogus value. That's ok, we'll grab the
+	 * mapping lock (in AdvanceXLInsertBuffer) and retry if we see anything
+	 * else than the page we're looking for. But it means that when we do this
+	 * unlocked read, we might see a value that appears to be ahead of the
+	 * page we're looking for. Don't PANIC on that, until we've verified the
+	 * value while holding the lock.
+	 */
+	expectedEndPtr = ptr;
+	XLByteAdvance(expectedEndPtr, XLOG_BLCKSZ - ptr.xrecoff % XLOG_BLCKSZ);
+
+	endptr = XLogCtl->xlblocks[idx];
+	if (!XLByteEQ(expectedEndPtr, endptr))
+	{
+		if (failok)
+			return NULL;
+
+		AdvanceXLInsertBuffer(ptr, false);
+		endptr = XLogCtl->xlblocks[idx];
+
+		if (!XLByteEQ(expectedEndPtr, endptr))
+			elog(PANIC, "could not find WAL buffer for %X/%X",
+				 ptr.xlogid, ptr.xrecoff);
+	}
+
+	/*
+	 * Found the buffer holding this page. Return a pointer to the right
+	 * offset within the page.
+	 */
+	cachedXlogid = ptr.xlogid;
+	cachedPage = ptr.xrecoff / XLOG_BLCKSZ;
+	cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+
+	Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+	Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr.xlogid == cachedXlogid);
+	Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr.xrecoff == cachedPage * XLOG_BLCKSZ);
+	Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr.xrecoff == ptr.xrecoff - (ptr.xrecoff % XLOG_BLCKSZ));
+
+	return cachedPos + ptr.xrecoff % XLOG_BLCKSZ;
+}
+
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+	uint64		fullsegs;
+	uint64		fullpages;
+	uint64		bytesleft;
+	uint32		seg_offset;
+	XLogRecPtr	result;
+
+	fullsegs = bytepos / UsableBytesInSegment;
+	bytesleft = bytepos % UsableBytesInSegment;
+
+	if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+	{
+		/* fits on first page of segment */
+		seg_offset = bytesleft + SizeOfXLogLongPHD;
+	}
+	else
+	{
+		/* account for the first page on segment with long header */
+		seg_offset = XLOG_BLCKSZ;
+		bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+		fullpages = bytesleft / UsableBytesInPage;
+		bytesleft = bytesleft % UsableBytesInPage;
+
+		seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+	}
+
+	XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
+
+	return result;
+}
+
+/*
+ * Like XLogBytePosToEndRecPtr, but a page boundary is represented by pointer
+ * to beginning of page, not to where the first xlog record goes to.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+	uint64		fullsegs;
+	uint64		fullpages;
+	uint64		bytesleft;
+	uint32		seg_offset;
+	XLogRecPtr	result;
+
+	fullsegs = bytepos / UsableBytesInSegment;
+	bytesleft = bytepos % UsableBytesInSegment;
+
+	if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+	{
+		/* fits on first page of segment */
+		if (bytesleft == 0)
+			seg_offset = 0;
+		else
+			seg_offset = bytesleft + SizeOfXLogLongPHD;
 	}
 	else
 	{
-		/* normal case, ie not xlog switch */
+		/* account for the first page on segment with long header */
+		seg_offset = XLOG_BLCKSZ;
+		bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
+
+		fullpages = bytesleft / UsableBytesInPage;
+		bytesleft = bytesleft % UsableBytesInPage;
+
+		if (bytesleft == 0)
+			seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+		else
+			seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+	}
+
+	XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
+
+	return result;
+}
+
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+	uint64		fullsegs;
+	uint32		fullpages;
+	uint32		offset;
+	uint64		result;
+
+	XLByteToSeg(ptr, fullsegs);
 
-		/* Need to update shared LogwrtRqst if some block was filled up */
-		if (freespace == 0)
+	fullpages = (ptr.xrecoff % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+	offset = ptr.xrecoff % XLOG_BLCKSZ;
+
+	if (fullpages == 0)
+	{
+		result = fullsegs * UsableBytesInSegment;
+		if (offset > 0)
 		{
-			/* curridx is filled and available for writing out */
-			updrqst = true;
+			Assert(offset >= SizeOfXLogLongPHD);
+			result += offset - SizeOfXLogLongPHD;
 		}
-		else
+	}
+	else
+	{
+		result = fullsegs * UsableBytesInSegment +
+			(XLOG_BLCKSZ - SizeOfXLogLongPHD) +  /* account for first page */
+			(fullpages - 1) * UsableBytesInPage; /* full pages */
+		if (offset > 0)
 		{
-			/* if updrqst already set, write through end of previous buf */
-			curridx = PrevBufIdx(curridx);
+			Assert(offset >= SizeOfXLogShortPHD);
+			result += offset - SizeOfXLogShortPHD;
 		}
-		WriteRqst = XLogCtl->xlblocks[curridx];
 	}
 
-	LWLockRelease(WALInsertLock);
+	return result;
+}
+
+/*
+ * Attempt to finalize next record, if it's been copied in place.
+ */
+static bool
+FinalizeRecord(void)
+{
+	XLogCtlInsert *Insert = &XLogCtl->Insert;
+	XLogRecPtr	ptr;
+	uint32		len;
+	char	   *p;
+	int			freespace;
+	XLogRecPtr StartPos, EndPos;
+
+	ptr = XLogCtl->Insert.FinalizedUpto;
+	p = GetXLogBuffer(ptr, true);
+	if (p == NULL)
+		return false;
+
+	StartPos = ptr;
 
-	if (updrqst)
+	/*
+	 * If LastFinalizedRecord points to beginning of page, assume it's
+	 * a continuation record.
+	 */
+	if (XLogCtl->Insert.ExpectingContRecord)
 	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
+		XLogPageHeader pagehdr = (XLogPageHeader) p;
+		int		pagehdrsize;
 
-		SpinLockAcquire(&xlogctl->info_lck);
-		/* advance global request to include new block(s) */
-		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
-			xlogctl->LogwrtRqst.Write = WriteRqst;
-		/* update local result copy while I have the chance */
-		LogwrtResult = xlogctl->LogwrtResult;
-		SpinLockRelease(&xlogctl->info_lck);
+		Assert(pagehdr->xlp_magic == XLOG_PAGE_MAGIC);
+		pagehdrsize = (ptr.xrecoff % XLOG_SEG_SIZE == 0) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD;
+		Assert(pagehdrsize == XLogPageHeaderSize(pagehdr));
+
+		if ((pagehdr->xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+			return false;
+
+		pg_memory_barrier();
+
+		/* Cool, the part of this continued record on this page is done */
+		len = MAXALIGN(pagehdr->xlp_rem_len);
+		Assert(len > 0 && len < 1000000);
+		if (len < XLOG_BLCKSZ - pagehdrsize)
+		{
+			ptr.xrecoff += pagehdrsize + len;
+			Insert->ExpectingContRecord = false;
+		}
+		else if (len == XLOG_BLCKSZ - pagehdrsize)
+		{
+			XLByteAdvance(ptr, XLOG_BLCKSZ);
+			Insert->ExpectingContRecord = false;
+		}
+		else
+		{
+			XLByteAdvance(ptr, XLOG_BLCKSZ);
+			Insert->ExpectingContRecord = true;
+		}
+		FinalizedUpto = XLogCtl->Insert.FinalizedUpto = ptr;
+		EndPos = ptr;
 	}
+	else
+	{
+		XLogRecPtr	LastFinalized = Insert->LastFinalizedRecord;
+		pg_crc32	rdata_crc;
+		XLogRecPtr	recstart;
+		char	   *recstartp;
+
+		/* if we're located at page boundary, skip page header */
+		if (ptr.xrecoff % XLOG_BLCKSZ == 0)
+		{
+			if (ptr.xrecoff % XLOG_SEG_SIZE == 0)
+			{
+				ptr.xrecoff += SizeOfXLogLongPHD;
+				p += SizeOfXLogLongPHD;
+			}
+			else
+			{
+				ptr.xrecoff += SizeOfXLogShortPHD;
+				p += SizeOfXLogShortPHD;
+			}
+		}
 
-	XactLastRecEnd = RecPtr;
+		recstart = ptr;
+		recstartp = p;
 
-	END_CRIT_SECTION();
+		/* NB: we might not have the full header on this page! */
+		/* fetch record->xl_tot_len */
+		len = MAXALIGN(*((uint32 *) p));
+		if (len == 0)
+			return false;
 
-	return RecPtr;
+		pg_memory_barrier();
+
+		/* Cool, this record is done. Set xl_prev, and finish CRC calculation. */
+		/* xl_prev might be on next page */
+		freespace = INSERT_FREESPACE(ptr);
+		if (freespace < offsetof(XLogRecord, xl_prev) + sizeof(XLogRecPtr))
+		{
+			XLogPageHeader pagehdr;
+			int		pagehdrsize;
+			int		off = offsetof(XLogRecord, xl_prev) - freespace;
+
+			XLByteAdvance(ptr, freespace);
+			p = GetXLogBuffer(ptr, true);
+			if (p == NULL)
+				return false;
+
+			pagehdr = (XLogPageHeader) p;
+			pagehdrsize = (ptr.xrecoff % XLOG_SEG_SIZE == 0) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD;
+
+			Assert(pagehdr->xlp_magic == XLOG_PAGE_MAGIC);
+			Assert(pagehdrsize == XLogPageHeaderSize(pagehdr));
+
+			/*
+			 * If the rest of the record header has not been copied in place
+			 * yet, bail out.
+			 */
+			if ((pagehdr->xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+				return false;
+			p += pagehdrsize + off;
+		}
+		else
+		{
+			p += offsetof(XLogRecord, xl_prev);
+		}
+		Assert (((uint64) p) % XLOG_BLCKSZ >= SizeOfXLogShortPHD);
+		*((XLogRecPtr *) p) = LastFinalized;
+
+		/* xl_crc might be on next page, if xl_prev was not */
+		ptr = recstart;
+		p = recstartp;
+
+		freespace = INSERT_FREESPACE(ptr);
+		if (freespace < offsetof(XLogRecord, xl_crc) + sizeof(pg_crc32))
+		{
+			XLogPageHeader pagehdr;
+			int		pagehdrsize;
+			int		off = offsetof(XLogRecord, xl_crc) - freespace;
+
+			XLByteAdvance(ptr, freespace);
+			p = GetXLogBuffer(ptr, true);
+			if (p == NULL)
+				return false;
+
+			pagehdr = (XLogPageHeader) p;
+			pagehdrsize = (ptr.xrecoff % XLOG_SEG_SIZE == 0) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD;
+
+			Assert(pagehdr->xlp_magic == XLOG_PAGE_MAGIC);
+			Assert(pagehdrsize == XLogPageHeaderSize(pagehdr));
+
+			if ((pagehdr->xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+				return false;
+			p += pagehdrsize + off;
+		}
+		else
+		{
+			p += offsetof(XLogRecord, xl_crc);
+		}
+		Assert (((uint64) p) % XLOG_BLCKSZ >= SizeOfXLogShortPHD);
+
+		/* Update CRC with xl_prev, finish it with FIN_CRC32, and write back */
+		rdata_crc = *((pg_crc32 *) p);
+		COMP_CRC32(rdata_crc, ((char *) &LastFinalized), sizeof(XLogRecPtr));
+		FIN_CRC32(rdata_crc);
+		*((pg_crc32 *) p) = rdata_crc;
+
+		/*
+		 * Update FinalizedUpto to end of record, or end of page where this
+		 * record began, if it didn't fit page.
+		 */
+		ptr = recstart;
+		freespace = INSERT_FREESPACE(ptr);
+		if (len <= freespace)
+		{
+			XLByteAdvance(ptr, len);
+			Insert->ExpectingContRecord = false;
+		}
+		else
+		{
+			XLByteAdvance(ptr, freespace);
+			Insert->ExpectingContRecord = true;
+		}
+		FinalizedUpto = XLogCtl->Insert.FinalizedUpto = ptr;
+		EndPos = ptr;
+		/*
+		 * Update LastFinalizedRecord, so that we can set xl_prev link on
+		 * the next record correctly.
+		 */
+		XLogCtl->Insert.LastFinalizedRecord = recstart;
+	}
+
+#ifdef NOT_USED
+	elog(LOG, "FINALIZE @ %X/%X - %X/%X",
+		 StartPos.xlogid, StartPos.xrecoff, EndPos.xlogid, EndPos.xrecoff);
+#endif
+
+	return true;
+}
+
+/*
+ * Wait for any insertions < upto to finish.
+ *
+ * Returns a value >= upto, which indicates the oldest in-progress insertion
+ * that we saw (or if there are non in-progress, the next insert position).
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+	if (MyProc == NULL)
+		elog(PANIC, "cannot wait without a PGPROC structure");
+
+	if (XLByteLE(upto, FinalizedUpto))
+		return FinalizedUpto;
+
+	/*
+	 * XXX: Busy-loop until we succeed to finalize up to the requested
+	 * point
+	 */
+	for (;;)
+	{
+		/* Only allow one process to finalize at a time */
+		LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
+
+		/* While we're at it, finalize as far as we can. */
+		while (FinalizeRecord());
+		FinalizedUpto = XLogCtl->Insert.FinalizedUpto;
+
+		LWLockRelease(WALInsertTailLock);
+
+		/* Is this enough? */
+		if (XLogRecPtrIsInvalid(upto) || XLByteLE(upto, FinalizedUpto))
+			return FinalizedUpto;
+	}
 }
 
 /*
@@ -1445,31 +2194,34 @@ XLogArchiveCleanup(const char *xlog)
 }
 
 /*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page.  If we can do this for free (without an extra lock),
- * we do so here.  Otherwise the caller must do it.  We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
  */
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 {
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
-	int			nextidx = NextBufIdx(Insert->curridx);
-	bool		update_needed = true;
+	int			nextidx;
 	XLogRecPtr	OldPageRqstPtr;
 	XLogwrtRqst WriteRqst;
-	XLogRecPtr	NewPageEndPtr;
+	XLogRecPtr	NewPageEndPtr = InvalidXLogRecPtr;
 	XLogRecPtr	NewPageBeginPtr;
 	XLogPageHeader NewPage;
+	int			npages = 0;
+
+	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+
+	/*
+	 * Now that we have the lock, check if someone initialized the page
+	 * already.
+	 */
+/* XXX: fix indentation before commit */
+while (!XLByteLT(upto, XLogCtl->xlblocks[XLogCtl->curridx]) || opportunistic)
+{
+	nextidx = NextBufIdx(XLogCtl->curridx);
 
 	/*
 	 * Get ending-offset of the buffer page we need to replace (this may be
@@ -1479,10 +2231,12 @@ AdvanceXLInsertBuffer(bool new_segment)
 	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
 	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
 	{
-		/* nope, got work to do... */
-		XLogRecPtr	FinishedPageRqstPtr;
-
-		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+		/*
+		 * Nope, got work to do. If we just want to pre-initialize as much as
+		 * we can without flushing, give up now.
+		 */
+		if (opportunistic)
+			break;
 
 		/* Before waiting, get info_lck and update LogwrtResult */
 		{
@@ -1490,21 +2244,27 @@ AdvanceXLInsertBuffer(bool new_segment)
 			volatile XLogCtlData *xlogctl = XLogCtl;
 
 			SpinLockAcquire(&xlogctl->info_lck);
-			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
-				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
+			if (XLByteLT(xlogctl->LogwrtRqst.Write, OldPageRqstPtr))
+				xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
 			LogwrtResult = xlogctl->LogwrtResult;
 			SpinLockRelease(&xlogctl->info_lck);
 		}
 
-		update_needed = false;	/* Did the shared-request update */
-
 		/*
 		 * Now that we have an up-to-date LogwrtResult value, see if we still
 		 * need to write it or if someone else already did.
 		 */
 		if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
 		{
-			/* Must acquire write lock */
+			/*
+			 * Must acquire write lock. Release WALBufMappingLock first, to
+			 * make sure that all insertions that we need to wait for can
+			 * finish (up to this same position). Otherwise we risk deadlock.
+			 */
+			LWLockRelease(WALBufMappingLock);
+
+			WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
 			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 			LogwrtResult = XLogCtl->LogwrtResult;
 			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
@@ -1514,18 +2274,18 @@ AdvanceXLInsertBuffer(bool new_segment)
 			}
 			else
 			{
-				/*
-				 * Have to write buffers while holding insert lock. This is
-				 * not good, so only write as much as we absolutely must.
-				 */
+				/* Have to write it ourselves */
 				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
 				WriteRqst.Write = OldPageRqstPtr;
 				WriteRqst.Flush.xlogid = 0;
 				WriteRqst.Flush.xrecoff = 0;
-				XLogWrite(WriteRqst, false, false);
+				XLogWrite(WriteRqst, false);
 				LWLockRelease(WALWriteLock);
 				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
 			}
+			/* Re-acquire WALBufMappingLock and retry */
+			LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+			continue;
 		}
 	}
 
@@ -1533,25 +2293,16 @@ AdvanceXLInsertBuffer(bool new_segment)
 	 * Now the next buffer slot is free and we can set it up to be the next
 	 * output page.
 	 */
-	NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
-
-	if (new_segment)
-	{
-		/* force it to a segment start point */
-		if (NewPageBeginPtr.xrecoff % XLogSegSize != 0)
-			XLByteAdvance(NewPageBeginPtr,
-						  XLogSegSize - NewPageBeginPtr.xrecoff % XLogSegSize);
-	}
+	NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx];
 
 	NewPageEndPtr = NewPageBeginPtr;
 	XLByteAdvance(NewPageEndPtr, XLOG_BLCKSZ);
-	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
-	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
-	Insert->curridx = nextidx;
-	Insert->currpage = NewPage;
+	Assert(NewPageEndPtr.xrecoff % XLOG_BLCKSZ == 0);
+	Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
+	Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
-	Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
 	/*
 	 * Be sure to re-zero the buffer so that bytes beyond what we've written
@@ -1567,6 +2318,7 @@ AdvanceXLInsertBuffer(bool new_segment)
 	/* NewPage->xlp_info = 0; */	/* done by memset */
 	NewPage   ->xlp_tli = ThisTimeLineID;
 	NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+	/* NewPage	  ->xlp_rem_len = InvalidXLogRecPtr; */	/* done by memset */
 
 	/*
 	 * If online backup is not in progress, mark the header to indicate that
@@ -1594,11 +2346,28 @@ AdvanceXLInsertBuffer(bool new_segment)
 		NewLongPage->xlp_seg_size = XLogSegSize;
 		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
 		NewPage   ->xlp_info |= XLP_LONG_HEADER;
-
-		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
 	}
 
-	return update_needed;
+	/*
+	 * Make sure the initialization of the page becomes visible to others
+	 * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+	 * holding a lock.
+	 */
+	pg_write_barrier();
+
+	*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
+
+	XLogCtl->curridx = nextidx;
+
+	npages++;
+}
+	LWLockRelease(WALBufMappingLock);
+
+#ifdef WAL_DEBUG
+	if (npages > 0)
+		elog(DEBUG1, "initialized %d pages, upto %X/%X",
+			 npages, NewPageEndPtr.xlogid, NewPageEndPtr.xrecoff);
+#endif
 }
 
 /*
@@ -1630,16 +2399,12 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
  * This option allows us to avoid uselessly issuing multiple writes when a
  * single one would do.
  *
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment.  (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. And you must've called
+ * WaitXLogInsertionsToFinish(WriteRqst) before grabbing the lock to make sure
+ * the data is ready to write.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
 	XLogCtlWrite *Write = &XLogCtl->Write;
 	bool		ispartialpage;
@@ -1688,14 +2453,14 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 		 * if we're passed a bogus WriteRqst.Write that is past the end of the
 		 * last page that's been initialized by AdvanceXLInsertBuffer.
 		 */
-		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
+		XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+		if (!XLByteLT(LogwrtResult.Write, EndPtr))
 			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
 				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
-				 XLogCtl->xlblocks[curridx].xlogid,
-				 XLogCtl->xlblocks[curridx].xrecoff);
+				 EndPtr.xlogid, EndPtr.xrecoff);
 
 		/* Advance LogwrtResult.Write to end of current buffer page */
-		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
+		LogwrtResult.Write = EndPtr;
 		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
 
 		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
@@ -1778,6 +2543,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 								XLogFileNameP(ThisTimeLineID, openLogSegNo),
 								openLogOff, (unsigned long) nbytes)));
 			}
+#ifdef CLOBBER_FREED_MEMORY
+			if (!ispartialpage)
+				memset(from, 0x7E, nbytes);
+			else if (npages > 1)
+				memset(from, 0x7E, nbytes - XLOG_BLCKSZ);
+#endif
 
 			/* Update state for write */
 			openLogOff += nbytes;
@@ -1791,16 +2562,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 			 * later. Doing it here ensures that one and only one backend will
 			 * perform this fsync.
 			 *
-			 * We also do this if this is the last page written for an xlog
-			 * switch.
-			 *
 			 * This is also the right place to notify the Archiver that the
 			 * segment is ready to copy to archival storage, and to update the
 			 * timer for archive_timeout, and to signal for a checkpoint if
 			 * too many logfile segments have been used since the last
 			 * checkpoint.
 			 */
-			if (finishing_seg || (xlog_switch && last_iteration))
+			if (finishing_seg)
 			{
 				issue_xlog_fsync(openLogFile, openLogSegNo);
 				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
@@ -1865,7 +2633,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 				openLogFile = XLogFileOpen(openLogSegNo);
 				openLogOff = 0;
 			}
+			elog(LOG, "flushing seg %ld (explicit)", openLogSegNo);
 			issue_xlog_fsync(openLogFile, openLogSegNo);
+			elog(LOG, "done flushing seg %ld (explicit)", openLogSegNo);
 		}
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
@@ -2066,6 +2836,7 @@ XLogFlush(XLogRecPtr record)
 	{
 		/* use volatile pointer to prevent code rearrangement */
 		volatile XLogCtlData *xlogctl = XLogCtl;
+		XLogRecPtr	insertpos;
 
 		/* read LogwrtResult and update local state */
 		SpinLockAcquire(&xlogctl->info_lck);
@@ -2079,6 +2850,12 @@ XLogFlush(XLogRecPtr record)
 			break;
 
 		/*
+		 * Before actually performing the write, wait for all in-flight
+		 * insertions to the pages we're about to write to finish.
+		 */
+		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
+		/*
 		 * Try to get the write lock. If we can't get it immediately, wait
 		 * until it's released, and recheck if we still need to do the flush
 		 * or if the backend that held the lock did it for us already. This
@@ -2098,31 +2875,10 @@ XLogFlush(XLogRecPtr record)
 		LogwrtResult = XLogCtl->LogwrtResult;
 		if (!XLByteLE(record, LogwrtResult.Flush))
 		{
-			/* try to write/flush later additions to XLOG as well */
-			if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
-			{
-				XLogCtlInsert *Insert = &XLogCtl->Insert;
-				uint32		freespace = INSERT_FREESPACE(Insert);
+			WriteRqst.Write = insertpos;
+			WriteRqst.Flush = insertpos;
 
-				if (freespace == 0)		/* buffer is full */
-					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-				else
-				{
-					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-					if (WriteRqstPtr.xrecoff == 0)
-						WriteRqstPtr.xlogid--;
-					WriteRqstPtr.xrecoff -= freespace;
-				}
-				LWLockRelease(WALInsertLock);
-				WriteRqst.Write = WriteRqstPtr;
-				WriteRqst.Flush = WriteRqstPtr;
-			}
-			else
-			{
-				WriteRqst.Write = WriteRqstPtr;
-				WriteRqst.Flush = record;
-			}
-			XLogWrite(WriteRqst, false, false);
+			XLogWrite(WriteRqst, false);
 		}
 		LWLockRelease(WALWriteLock);
 		/* done */
@@ -2240,7 +2996,8 @@ XLogBackgroundFlush(void)
 
 	START_CRIT_SECTION();
 
-	/* now wait for the write lock */
+	/* now wait for any in-progress insertions to finish and get write lock */
+	WaitXLogInsertionsToFinish(WriteRqstPtr);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
@@ -2249,13 +3006,19 @@ XLogBackgroundFlush(void)
 
 		WriteRqst.Write = WriteRqstPtr;
 		WriteRqst.Flush = WriteRqstPtr;
-		XLogWrite(WriteRqst, flexible, false);
+		XLogWrite(WriteRqst, flexible);
 		wrote_something = true;
 	}
 	LWLockRelease(WALWriteLock);
 
 	END_CRIT_SECTION();
 
+	/*
+	 * Great, done. To take some work off the critical path, try to initialize
+	 * as many of the no-longer-needed WAL buffers for future use as we can.
+	 */
+	AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
 	return wrote_something;
 }
 
@@ -5066,6 +5829,7 @@ XLOGShmemSize(void)
 
 	/* XLogCtl */
 	size = sizeof(XLogCtlData);
+
 	/* xlblocks array */
 	size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
 	/* extra alignment padding for XLOG I/O buffers */
@@ -5091,8 +5855,7 @@ XLOGShmemInit(void)
 
 	ControlFile = (ControlFileData *)
 		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
-	XLogCtl = (XLogCtlData *)
-		ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
+	allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
 
 	if (foundCFile || foundXLog)
 	{
@@ -5100,7 +5863,7 @@ XLOGShmemInit(void)
 		Assert(foundCFile && foundXLog);
 		return;
 	}
-
+	XLogCtl = (XLogCtlData *) allocptr;
 	memset(XLogCtl, 0, sizeof(XLogCtlData));
 
 	/*
@@ -5108,7 +5871,7 @@ XLOGShmemInit(void)
 	 * multiple of the alignment for same, so no extra alignment padding is
 	 * needed here.
 	 */
-	allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
+	allocptr += sizeof(XLogCtlData);
 	XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
 	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
 	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
@@ -5128,7 +5891,12 @@ XLOGShmemInit(void)
 	XLogCtl->SharedRecoveryInProgress = true;
 	XLogCtl->SharedHotStandbyActive = false;
 	XLogCtl->WalWriterSleeping = false;
-	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+	XLogCtl->Insert.LastFinalizedRecord = InvalidXLogRecPtr;
+	XLogCtl->Insert.FinalizedUpto = InvalidXLogRecPtr;
+	XLogCtl->Insert.ExpectingContRecord = false;
+
+	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
 
@@ -6006,6 +6774,7 @@ StartupXLOG(void)
 	bool		backupEndRequired = false;
 	bool		backupFromStandby = false;
 	DBState		dbstate_at_startup;
+	int			firstIdx;
 
 	/*
 	 * Read control file and check XLOG status looks valid.
@@ -6258,7 +7027,7 @@ StartupXLOG(void)
 
 	lastFullPageWrites = checkPoint.fullPageWrites;
 
-	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+	RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
 
 	if (XLByteLT(RecPtr, checkPoint.redo))
 		ereport(PANIC,
@@ -6814,9 +7583,13 @@ StartupXLOG(void)
 	openLogFile = XLogFileOpen(openLogSegNo);
 	openLogOff = 0;
 	Insert = &XLogCtl->Insert;
-	Insert->PrevRecord = LastRec;
-	XLogCtl->xlblocks[0].xlogid = (openLogSegNo * XLOG_SEG_SIZE) >> 32;
-	XLogCtl->xlblocks[0].xrecoff =
+	Insert->LastFinalizedRecord = LastRec;
+
+	firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
+	XLogCtl->curridx = firstIdx;
+
+	XLogCtl->xlblocks[firstIdx].xlogid = (openLogSegNo * XLOG_SEG_SIZE) >> 32;
+	XLogCtl->xlblocks[firstIdx].xrecoff =
 		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
 
 	/*
@@ -6824,10 +7597,11 @@ StartupXLOG(void)
 	 * record spans, not the one it starts in.	The last block is indeed the
 	 * one we want to use.
 	 */
-	Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
-	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
-	Insert->currpos = (char *) Insert->currpage +
-		(EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+	Assert(readOff == (XLogCtl->xlblocks[firstIdx].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
+	memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], readBuf, XLOG_BLCKSZ);
+	Insert->FinalizedUpto = EndOfLog;
+	Insert->ExpectingContRecord = false;
+	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
@@ -6836,12 +7610,12 @@ StartupXLOG(void)
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
-	freespace = INSERT_FREESPACE(Insert);
+	freespace = INSERT_FREESPACE(EndOfLog);
 	if (freespace > 0)
 	{
 		/* Make sure rest of page is zero */
-		MemSet(Insert->currpos, 0, freespace);
-		XLogCtl->Write.curridx = 0;
+		MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog.xrecoff % XLOG_BLCKSZ, 0, freespace);
+		XLogCtl->Write.curridx = firstIdx;
 	}
 	else
 	{
@@ -6853,7 +7627,7 @@ StartupXLOG(void)
 		 * this is sufficient.	The first actual attempt to insert a log
 		 * record will advance the insert state.
 		 */
-		XLogCtl->Write.curridx = NextBufIdx(0);
+		XLogCtl->Write.curridx = NextBufIdx(firstIdx);
 	}
 
 	/* Pre-scan prepared transactions to find out the range of XIDs present */
@@ -6864,7 +7638,7 @@ StartupXLOG(void)
 	 * XLOG_FPW_CHANGE record before resource manager writes cleanup
 	 * WAL records or checkpoint record is written.
 	 */
-	Insert->fullPageWrites = lastFullPageWrites;
+	Insert->fullPageWrites = doPageWrites = lastFullPageWrites;
 	LocalSetXLogInsertAllowed();
 	UpdateFullPageWrites();
 	LocalXLogInsertAllowed = -1;
@@ -7332,21 +8106,29 @@ InitXLOGAccess(void)
 }
 
 /*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
  */
 XLogRecPtr
 GetRedoRecPtr(void)
 {
 	/* use volatile pointer to prevent code rearrangement */
 	volatile XLogCtlData *xlogctl = XLogCtl;
+	XLogRecPtr ptr;
 
+	/*
+	 * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+	 * grabbed WALInsertShareLock to read the master copy, someone might update
+	 * it just after we've released the lock.
+	 */
 	SpinLockAcquire(&xlogctl->info_lck);
-	Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
-	RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+	ptr = xlogctl->RedoRecPtr;
 	SpinLockRelease(&xlogctl->info_lck);
 
+	if (XLByteLT(RedoRecPtr, ptr))
+		RedoRecPtr = xlogctl->RedoRecPtr;
+
 	return RedoRecPtr;
 }
 
@@ -7355,7 +8137,7 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
+ * For that, we don't need to acquire WALInsertShareLock which is
  * heavily contended, and an approximation is enough for the current
  * usage of this function.
  */
@@ -7630,6 +8412,8 @@ LogCheckpointEnd(bool restartpoint)
 void
 CreateCheckPoint(int flags)
 {
+	/* use volatile pointer to prevent code rearrangement */
+	volatile XLogCtlData *xlogctl = XLogCtl;
 	bool		shutdown;
 	CheckPoint	checkPoint;
 	XLogRecPtr	recptr;
@@ -7641,6 +8425,7 @@ CreateCheckPoint(int flags)
 	XLogSegNo	insert_logSegNo;
 	TransactionId *inCommitXids;
 	int			nInCommit;
+	XLogRecPtr	curInsert;
 
 	/*
 	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7709,10 +8494,11 @@ CreateCheckPoint(int flags)
 		checkPoint.oldestActiveXid = InvalidTransactionId;
 
 	/*
-	 * We must hold WALInsertLock while examining insert state to determine
+	 * We must hold insertpos_lck while examining insert state to determine
 	 * the checkpoint REDO pointer.
 	 */
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+	WALInsertLockAcquire(LW_EXCLUSIVE);
+	curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
 	/*
 	 * If this isn't a shutdown or forced checkpoint, and we have not switched
@@ -7724,7 +8510,7 @@ CreateCheckPoint(int flags)
 	 * (Perhaps it'd make even more sense to checkpoint only when the previous
 	 * checkpoint record is in a different xlog page?)
 	 *
-	 * While holding the WALInsertLock we find the current WAL insertion point
+	 * While holding insertpos_lck we find the current WAL insertion point
 	 * and compare that with the starting point of the last checkpoint, which
 	 * is the redo pointer. We use the redo pointer because the start and end
 	 * points of a checkpoint can be hundreds of files apart on large systems
@@ -7733,14 +8519,11 @@ CreateCheckPoint(int flags)
 	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
 				  CHECKPOINT_FORCE)) == 0)
 	{
-		XLogRecPtr	curInsert;
-
-		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
 		XLByteToSeg(curInsert, insert_logSegNo);
 		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logSegNo);
 		if (insert_logSegNo == redo_logSegNo)
 		{
-			LWLockRelease(WALInsertLock);
+			WALInsertLockRelease(LW_EXCLUSIVE);
 			LWLockRelease(CheckpointLock);
 			END_CRIT_SECTION();
 			return;
@@ -7767,18 +8550,19 @@ CreateCheckPoint(int flags)
 	 * the buffer flush work.  Those XLOG records are logically after the
 	 * checkpoint, even though physically before it.  Got that?
 	 */
-	freespace = INSERT_FREESPACE(Insert);
+	freespace = INSERT_FREESPACE(curInsert);
 	if (freespace == 0)
 	{
-		(void) AdvanceXLInsertBuffer(false);
-		/* OK to ignore update return flag, since we will do flush anyway */
-		freespace = INSERT_FREESPACE(Insert);
+		if (curInsert.xrecoff % XLogSegSize == 0)
+			curInsert.xrecoff += SizeOfXLogLongPHD;
+		else
+			curInsert.xrecoff += SizeOfXLogShortPHD;
 	}
-	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+	checkPoint.redo = curInsert;
 
 	/*
 	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-	 * must be done while holding the insert lock AND the info_lck.
+	 * must be done while holding the insert lock.
 	 *
 	 * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
 	 * pointing past where it really needs to point.  This is okay; the only
@@ -7787,20 +8571,18 @@ CreateCheckPoint(int flags)
 	 * XLogInserts that happen while we are dumping buffers must assume that
 	 * their buffer changes are not included in the checkpoint.
 	 */
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile XLogCtlData *xlogctl = XLogCtl;
-
-		SpinLockAcquire(&xlogctl->info_lck);
-		RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
-		SpinLockRelease(&xlogctl->info_lck);
-	}
+	RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
 	/*
 	 * Now we can release WAL insert lock, allowing other xacts to proceed
 	 * while we are flushing disk buffers.
 	 */
-	LWLockRelease(WALInsertLock);
+	WALInsertLockRelease(LW_EXCLUSIVE);
+
+	/* Update the info_lck-protected copy of RedoRecPtr as well */
+	SpinLockAcquire(&xlogctl->info_lck);
+	xlogctl->RedoRecPtr = checkPoint.redo;
+	SpinLockRelease(&xlogctl->info_lck);
 
 	/*
 	 * If enabled, log checkpoint start.  We postpone this until now so as not
@@ -7932,7 +8714,9 @@ CreateCheckPoint(int flags)
 	 */
 	if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
 		ereport(PANIC,
-				(errmsg("concurrent transaction log activity while database system is shutting down")));
+				(errmsg("concurrent transaction log activity while database system is shutting down (%X/%X vs %X/%X",
+						checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
+						ProcLastRecPtr.xlogid, ProcLastRecPtr.xrecoff)));
 
 	/*
 	 * Select point at which we can truncate the log, which we base on the
@@ -8185,15 +8969,18 @@ CreateRestartPoint(int flags)
 	 * the number of segments replayed since last restartpoint, and request a
 	 * restartpoint if it exceeds checkpoint_segments.
 	 *
-	 * You need to hold WALInsertLock and info_lck to update it, although
-	 * during recovery acquiring WALInsertLock is just pro forma, because
-	 * there is no other processes updating Insert.RedoRecPtr.
+	 * Like in CreatecheckPoint(), hold WALInsertLock to update it, although
+	 * during recovery acquiring insertpos_lck is just pro forma, because no
+	 * WAL insertions are happening.
 	 */
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-	SpinLockAcquire(&xlogctl->info_lck);
+	WALInsertLockAcquire(LW_EXCLUSIVE);
 	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+	WALInsertLockRelease(LW_EXCLUSIVE);
+
+	/* Also update the info_lck-protected copy */
+	SpinLockAcquire(&xlogctl->info_lck);
+	xlogctl->RedoRecPtr = lastCheckPoint.redo;
 	SpinLockRelease(&xlogctl->info_lck);
-	LWLockRelease(WALInsertLock);
 
 	/*
 	 * Prepare to accumulate statistics.
@@ -8461,7 +9248,7 @@ XLogReportParameters(void)
 void
 UpdateFullPageWrites(void)
 {
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
 
 	/*
 	 * Do nothing if full_page_writes has not been changed.
@@ -8484,9 +9271,9 @@ UpdateFullPageWrites(void)
 	 */
 	if (fullPageWrites)
 	{
-		LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+		WALInsertLockAcquire(LW_EXCLUSIVE);
 		Insert->fullPageWrites = true;
-		LWLockRelease(WALInsertLock);
+		WALInsertLockRelease(LW_EXCLUSIVE);
 	}
 
 	/*
@@ -8507,9 +9294,9 @@ UpdateFullPageWrites(void)
 
 	if (!fullPageWrites)
 	{
-		LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+		WALInsertLockAcquire(LW_EXCLUSIVE);
 		Insert->fullPageWrites = false;
-		LWLockRelease(WALInsertLock);
+		WALInsertLockRelease(LW_EXCLUSIVE);
 	}
 	END_CRIT_SECTION();
 }
@@ -9070,6 +9857,7 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
 XLogRecPtr
 do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
 {
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
 	bool		exclusive = (labelfile == NULL);
 	bool		backup_started_in_recovery = false;
 	XLogRecPtr	checkpointloc;
@@ -9131,26 +9919,26 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
 	 * Note that forcePageWrites has no effect during an online backup from
 	 * the standby.
 	 *
-	 * We must hold WALInsertLock to change the value of forcePageWrites, to
-	 * ensure adequate interlocking against XLogInsert().
+	 * We must hold WALInsertLock to change the value of forcePageWrites,
+	 * to ensure adequate interlocking against XLogInsert().
 	 */
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+	WALInsertLockAcquire(LW_EXCLUSIVE);
 	if (exclusive)
 	{
-		if (XLogCtl->Insert.exclusiveBackup)
+		if (Insert->exclusiveBackup)
 		{
-			LWLockRelease(WALInsertLock);
+			WALInsertLockRelease(LW_EXCLUSIVE);
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("a backup is already in progress"),
 					 errhint("Run pg_stop_backup() and try again.")));
 		}
-		XLogCtl->Insert.exclusiveBackup = true;
+		Insert->exclusiveBackup = true;
 	}
 	else
-		XLogCtl->Insert.nonExclusiveBackups++;
-	XLogCtl->Insert.forcePageWrites = true;
-	LWLockRelease(WALInsertLock);
+		Insert->nonExclusiveBackups++;
+	Insert->forcePageWrites = true;
+	WALInsertLockRelease(LW_EXCLUSIVE);
 
 	/* Ensure we release forcePageWrites if fail below */
 	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -9263,13 +10051,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
 			 * taking a checkpoint right after another is not that expensive
 			 * either because only few buffers have been dirtied yet.
 			 */
-			LWLockAcquire(WALInsertLock, LW_SHARED);
-			if (XLByteLT(XLogCtl->Insert.lastBackupStart, startpoint))
+			WALInsertLockAcquire(LW_EXCLUSIVE);
+			if (XLByteLT(Insert->lastBackupStart, startpoint))
 			{
-				XLogCtl->Insert.lastBackupStart = startpoint;
+				Insert->lastBackupStart = startpoint;
 				gotUniqueStartpoint = true;
 			}
-			LWLockRelease(WALInsertLock);
+			WALInsertLockRelease(LW_EXCLUSIVE);
 		} while (!gotUniqueStartpoint);
 
 		XLByteToSeg(startpoint, _logSegNo);
@@ -9353,27 +10141,28 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
 static void
 pg_start_backup_callback(int code, Datum arg)
 {
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
 	bool		exclusive = DatumGetBool(arg);
 
 	/* Update backup counters and forcePageWrites on failure */
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+	WALInsertLockAcquire(LW_EXCLUSIVE);
 	if (exclusive)
 	{
-		Assert(XLogCtl->Insert.exclusiveBackup);
-		XLogCtl->Insert.exclusiveBackup = false;
+		Assert(Insert->exclusiveBackup);
+		Insert->exclusiveBackup = false;
 	}
 	else
 	{
-		Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
-		XLogCtl->Insert.nonExclusiveBackups--;
+		Assert(Insert->nonExclusiveBackups > 0);
+		Insert->nonExclusiveBackups--;
 	}
 
-	if (!XLogCtl->Insert.exclusiveBackup &&
-		XLogCtl->Insert.nonExclusiveBackups == 0)
+	if (!Insert->exclusiveBackup &&
+		Insert->nonExclusiveBackups == 0)
 	{
-		XLogCtl->Insert.forcePageWrites = false;
+		Insert->forcePageWrites = false;
 	}
-	LWLockRelease(WALInsertLock);
+	WALInsertLockRelease(LW_EXCLUSIVE);
 }
 
 /*
@@ -9386,6 +10175,7 @@ pg_start_backup_callback(int code, Datum arg)
 XLogRecPtr
 do_pg_stop_backup(char *labelfile, bool waitforarchive)
 {
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
 	bool		exclusive = (labelfile == NULL);
 	bool		backup_started_in_recovery = false;
 	XLogRecPtr	startpoint;
@@ -9438,9 +10228,9 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 	/*
 	 * OK to update backup counters and forcePageWrites
 	 */
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+	WALInsertLockAcquire(LW_EXCLUSIVE);
 	if (exclusive)
-		XLogCtl->Insert.exclusiveBackup = false;
+		Insert->exclusiveBackup = false;
 	else
 	{
 		/*
@@ -9449,16 +10239,16 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 		 * backups, it is expected that each do_pg_start_backup() call is
 		 * matched by exactly one do_pg_stop_backup() call.
 		 */
-		Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
-		XLogCtl->Insert.nonExclusiveBackups--;
+		Assert(Insert->nonExclusiveBackups > 0);
+		Insert->nonExclusiveBackups--;
 	}
 
-	if (!XLogCtl->Insert.exclusiveBackup &&
-		XLogCtl->Insert.nonExclusiveBackups == 0)
+	if (!Insert->exclusiveBackup &&
+		Insert->nonExclusiveBackups == 0)
 	{
-		XLogCtl->Insert.forcePageWrites = false;
+		Insert->forcePageWrites = false;
 	}
-	LWLockRelease(WALInsertLock);
+	WALInsertLockRelease(LW_EXCLUSIVE);
 
 	if (exclusive)
 	{
@@ -9736,16 +10526,18 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 void
 do_pg_abort_backup(void)
 {
-	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-	Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
-	XLogCtl->Insert.nonExclusiveBackups--;
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+
+	WALInsertLockAcquire(LW_EXCLUSIVE);
+	Assert(Insert->nonExclusiveBackups > 0);
+	Insert->nonExclusiveBackups--;
 
-	if (!XLogCtl->Insert.exclusiveBackup &&
-		XLogCtl->Insert.nonExclusiveBackups == 0)
+	if (!Insert->exclusiveBackup &&
+		Insert->nonExclusiveBackups == 0)
 	{
-		XLogCtl->Insert.forcePageWrites = false;
+		Insert->forcePageWrites = false;
 	}
-	LWLockRelease(WALInsertLock);
+	WALInsertLockRelease(LW_EXCLUSIVE);
 }
 
 /*
@@ -9799,14 +10591,14 @@ GetStandbyFlushRecPtr(void)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
-	XLogRecPtr	current_recptr;
+	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+	uint64		current_bytepos;
 
-	LWLockAcquire(WALInsertLock, LW_SHARED);
-	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
-	LWLockRelease(WALInsertLock);
+	SpinLockAcquire(&Insert->insertpos_lck);
+	current_bytepos = Insert->CurrBytePos;
+	SpinLockRelease(&Insert->insertpos_lck);
 
-	return current_recptr;
+	return XLogBytePosToRecPtr(current_bytepos);
 }
 
 /*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 26469c4..8d6567f 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1753,9 +1753,10 @@ GetOldestActiveTransactionId(void)
  * the result is somewhat indeterminate, but we don't really care.  Even in
  * a multiprocessor with delayed writes to shared memory, it should be certain
  * that setting of inCommit will propagate to shared memory when the backend
- * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
- * it's already inserted its commit record.  Whether it takes a little while
- * for clearing of inCommit to propagate is unimportant for correctness.
+ * takes a lock to write the WAL record, so we cannot fail to see an xact as
+ * inCommit if it's already inserted its commit record.  Whether it takes a
+ * little while for clearing of inCommit to propagate is unimportant for
+ * correctness.
  */
 int
 GetTransactionsInCommit(TransactionId **xids_p)
diff --git a/src/backend/storage/lmgr/spin.c b/src/backend/storage/lmgr/spin.c
index d262efa..479ef9a 100644
--- a/src/backend/storage/lmgr/spin.c
+++ b/src/backend/storage/lmgr/spin.c
@@ -56,6 +56,9 @@ SpinlockSemas(void)
 	 *
 	 * For now, though, we just need a few spinlocks (10 should be plenty)
 	 * plus one for each LWLock and one for each buffer header.
+	 *
+	 * XXX: remember to adjust this for the number of spinlocks needed by the
+	 * xlog.c changes before committing!
 	 */
 	return NumLWLocks() + NBuffers + 10;
 }
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index a958856..03f854e 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -163,8 +163,7 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 
 /* Check if an xrecoff value is in a plausible range */
 #define XRecOffIsValid(xrecoff) \
-		((xrecoff) % XLOG_BLCKSZ >= SizeOfXLogShortPHD && \
-		(XLOG_BLCKSZ - (xrecoff) % XLOG_BLCKSZ) >= SizeOfXLogRecord)
+		((xrecoff) % XLOG_BLCKSZ >= SizeOfXLogShortPHD)
 
 /*
  * The XLog directory and control file (relative to $PGDATA)
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index ac45ee6..2883549 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -247,7 +247,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of resource consumption during sort operations;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 6b59efc..3d9d5d9 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -53,7 +53,7 @@ typedef enum LWLockId
 	ProcArrayLock,
 	SInvalReadLock,
 	SInvalWriteLock,
-	WALInsertLock,
+	WALBufMappingLock,
 	WALWriteLock,
 	ControlFileLock,
 	CheckpointLock,
@@ -79,6 +79,15 @@ typedef enum LWLockId
 	SerializablePredicateLockListLock,
 	OldSerXidLock,
 	SyncRepLock,
+	WALInsertTailLock,
+	FirstWALInsertShareLock,
+	WALInsertShareLock2,
+	WALInsertShareLock3,
+	WALInsertShareLock4,
+	WALInsertShareLock5,
+	WALInsertShareLock6,
+	WALInsertShareLock7,
+	LastWALInsertShareLock,
 	/* Individual lock IDs end here */
 	FirstBufMappingLock,
 	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
