*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 327,333 **** MarkAsPreparing(TransactionId xid, const char *gid,
  	proc->databaseId = databaseid;
  	proc->roleId = owner;
  	proc->lwWaiting = false;
! 	proc->lwExclusive = false;
  	proc->lwWaitLink = NULL;
  	proc->waitLock = NULL;
  	proc->waitProcLock = NULL;
--- 327,333 ----
  	proc->databaseId = databaseid;
  	proc->roleId = owner;
  	proc->lwWaiting = false;
! 	proc->lwWaitMode = 0;
  	proc->lwWaitLink = NULL;
  	proc->waitLock = NULL;
  	proc->waitProcLock = NULL;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2118,2140 **** XLogFlush(XLogRecPtr record)
  	/* initialize to given target; may increase below */
  	WriteRqstPtr = record;
  
! 	/* read LogwrtResult and update local state */
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		SpinLockAcquire(&xlogctl->info_lck);
  		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
  			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
  		LogwrtResult = xlogctl->LogwrtResult;
  		SpinLockRelease(&xlogctl->info_lck);
- 	}
  
! 	/* done already? */
! 	if (!XLByteLE(record, LogwrtResult.Flush))
! 	{
! 		/* now wait for the write lock */
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  		LogwrtResult = XLogCtl->Write.LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
--- 2118,2160 ----
  	/* initialize to given target; may increase below */
  	WriteRqstPtr = record;
  
! 	/*
! 	 * Now wait until we get the write lock, or someone else does the
! 	 * flush for us.
! 	 */
! 	for (;;)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
+ 		/* read LogwrtResult and update local state */
  		SpinLockAcquire(&xlogctl->info_lck);
  		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
  			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
  		LogwrtResult = xlogctl->LogwrtResult;
  		SpinLockRelease(&xlogctl->info_lck);
  
! 		/* done already? */
! 		if (XLByteLE(record, LogwrtResult.Flush))
! 			break;
! 
! 		/*
! 		 * Try to get the write lock. If we can't get it immediately, wait
! 		 * until it's released, and recheck if we still need to do the flush
! 		 * or if the backend that held the lock did it for us already. This
! 		 * helps to maintain a good rate of group committing when the system
! 		 * is bottlenecked by the speed of fsyncing.
! 		 */
! 		if (!LWLockWaitUntilFree(WALWriteLock, LW_EXCLUSIVE))
! 		{
! 			/*
! 			 * The lock is now free, but we didn't acquire it yet. Before we
! 			 * do, loop back to check if someone else flushed the record for
! 			 * us already.
! 			 */
! 			continue;
! 		}
! 		/* Got the lock */
  		LogwrtResult = XLogCtl->Write.LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
***************
*** 2163,2168 **** XLogFlush(XLogRecPtr record)
--- 2183,2190 ----
  			XLogWrite(WriteRqst, false, false);
  		}
  		LWLockRelease(WALWriteLock);
+ 		/* done */
+ 		break;
  	}
  
  	END_CRIT_SECTION();
*** a/src/backend/storage/lmgr/lwlock.c
--- b/src/backend/storage/lmgr/lwlock.c
***************
*** 430,436 **** LWLockAcquire(LWLockId lockid, LWLockMode mode)
  			elog(PANIC, "cannot wait without a PGPROC structure");
  
  		proc->lwWaiting = true;
! 		proc->lwExclusive = (mode == LW_EXCLUSIVE);
  		proc->lwWaitLink = NULL;
  		if (lock->head == NULL)
  			lock->head = proc;
--- 430,436 ----
  			elog(PANIC, "cannot wait without a PGPROC structure");
  
  		proc->lwWaiting = true;
! 		proc->lwWaitMode = mode;
  		proc->lwWaitLink = NULL;
  		if (lock->head == NULL)
  			lock->head = proc;
***************
*** 565,570 **** LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
--- 565,708 ----
  }
  
  /*
+  * LWLockWaitUntilFree - Wait until a lock is free
+  *
+  * The semantics of this function are a bit funky. If the lock is currently
+  * free, it is acquired in the given mode, and the function returns true. If
+  * the lock isn't immediately free, the function waits until it is released
+  * and returns false, but does not acquire the lock.
+  *
+  * This is currently used for WALWriteLock: when a backend flushes the WAL,
+  * holding WALWriteLock, it can flush the commit records of many other
+  * backends as a side-effect. Those other backends need to wait until the
+  * flush finishes, but don't need to acquire the lock anymore. They can just
+  * wake up, observe that their records have already been flushed, and return.
+  */
+ bool
+ LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode)
+ {
+ 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ 	PGPROC	   *proc = MyProc;
+ 	bool		mustwait;
+ 	int			extraWaits = 0;
+ 
+ 	PRINT_LWDEBUG("LWLockWaitUntilFree", lockid, lock);
+ 
+ 	/* Ensure we will have room to remember the lock */
+ 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
+ 		elog(ERROR, "too many LWLocks taken");
+ 
+ 	/*
+ 	 * Lock out cancel/die interrupts until we exit the code section protected
+ 	 * by the LWLock.  This ensures that interrupts will not interfere with
+ 	 * manipulations of data structures in shared memory.
+ 	 */
+ 	HOLD_INTERRUPTS();
+ 
+ 	/* Acquire mutex.  Time spent holding mutex should be short! */
+ 	SpinLockAcquire(&lock->mutex);
+ 
+ 	/* If I can get the lock, do so quickly. */
+ 	if (mode == LW_EXCLUSIVE)
+ 	{
+ 		if (lock->exclusive == 0 && lock->shared == 0)
+ 		{
+ 			lock->exclusive++;
+ 			mustwait = false;
+ 		}
+ 		else
+ 			mustwait = true;
+ 	}
+ 	else
+ 	{
+ 		if (lock->exclusive == 0)
+ 		{
+ 			lock->shared++;
+ 			mustwait = false;
+ 		}
+ 		else
+ 			mustwait = true;
+ 	}
+ 
+ 	if (mustwait)
+ 	{
+ 		/*
+ 		 * Add myself to wait queue.
+ 		 *
+ 		 * If we don't have a PGPROC structure, there's no way to wait. This
+ 		 * should never occur, since MyProc should only be null during shared
+ 		 * memory initialization.
+ 		 */
+ 		if (proc == NULL)
+ 			elog(PANIC, "cannot wait without a PGPROC structure");
+ 
+ 		proc->lwWaiting = true;
+ 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+ 		proc->lwWaitLink = NULL;
+ 		if (lock->head == NULL)
+ 			lock->head = proc;
+ 		else
+ 			lock->tail->lwWaitLink = proc;
+ 		lock->tail = proc;
+ 
+ 		/* Can release the mutex now */
+ 		SpinLockRelease(&lock->mutex);
+ 
+ 		/*
+ 		 * Wait until awakened. Like in LWLockAcquire, be prepared for bogus
+ 		 * wakups, because we share the semaphore with ProcWaitForSignal.
+ 		 */
+ 		LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "waiting");
+ 
+ #ifdef LWLOCK_STATS
+ 		block_counts[lockid]++;
+ #endif
+ 
+ 		TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+ 
+ 		for (;;)
+ 		{
+ 			/* "false" means cannot accept cancel/die interrupt here. */
+ 			PGSemaphoreLock(&proc->sem, false);
+ 			if (!proc->lwWaiting)
+ 				break;
+ 			extraWaits++;
+ 		}
+ 
+ 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+ 
+ 		LOG_LWDEBUG("LWLockUntilFree", lockid, "awakened");
+ 	}
+ 	else
+ 	{
+ 		/* We are done updating shared state of the lock itself. */
+ 		SpinLockRelease(&lock->mutex);
+ 	}
+ 
+ 	/*
+ 	 * Fix the process wait semaphore's count for any absorbed wakeups.
+ 	 */
+ 	while (extraWaits-- > 0)
+ 		PGSemaphoreUnlock(&proc->sem);
+ 
+ 	if (mustwait)
+ 	{
+ 		/* Failed to get lock, so release interrupt holdoff */
+ 		RESUME_INTERRUPTS();
+ 		LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "failed");
+ 		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
+ 	}
+ 	else
+ 	{
+ 		/* Add lock to list of locks held by this backend */
+ 		held_lwlocks[num_held_lwlocks++] = lockid;
+ 		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
+ 	}
+ 
+ 	return !mustwait;
+ }
+ 
+ /*
   * LWLockRelease - release a previously acquired lock
   */
  void
***************
*** 618,637 **** LWLockRelease(LWLockId lockid)
  			/*
  			 * Remove the to-be-awakened PGPROCs from the queue.  If the front
  			 * waiter wants exclusive lock, awaken him only. Otherwise awaken
! 			 * as many waiters as want shared access.
  			 */
  			proc = head;
! 			if (!proc->lwExclusive)
  			{
  				while (proc->lwWaitLink != NULL &&
! 					   !proc->lwWaitLink->lwExclusive)
  					proc = proc->lwWaitLink;
  			}
  			/* proc is now the last PGPROC to be released */
  			lock->head = proc->lwWaitLink;
  			proc->lwWaitLink = NULL;
! 			/* prevent additional wakeups until retryer gets to run */
! 			lock->releaseOK = false;
  		}
  		else
  		{
--- 756,791 ----
  			/*
  			 * Remove the to-be-awakened PGPROCs from the queue.  If the front
  			 * waiter wants exclusive lock, awaken him only. Otherwise awaken
! 			 * as many waiters as want shared access (or just want to be
! 			 * woken up when the lock becomes free without acquiring it,
! 			 * ie. LWLockWaitUntilFree).
  			 */
+ 			bool releaseOK = true;
+ 
  			proc = head;
! 			if (proc->lwWaitMode != LW_EXCLUSIVE)
  			{
  				while (proc->lwWaitLink != NULL &&
! 					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
! 				{
  					proc = proc->lwWaitLink;
+ 					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+ 						releaseOK = false;
+ 				}
  			}
  			/* proc is now the last PGPROC to be released */
  			lock->head = proc->lwWaitLink;
  			proc->lwWaitLink = NULL;
! 			/*
! 			 * Prevent additional wakeups until retryer gets to run. Backends
! 			 * that are just waiting for the lock to become free don't prevent
! 			 * wakeups, because they might decide that they don't want the
! 			 * lock, after all.
! 			 */
! 			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
! 				releaseOK = false;
! 
! 			lock->releaseOK = releaseOK;
  		}
  		else
  		{
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 361,367 **** InitProcess(void)
  	if (IsAutoVacuumWorkerProcess())
  		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
  	MyProc->lwWaiting = false;
! 	MyProc->lwExclusive = false;
  	MyProc->lwWaitLink = NULL;
  	MyProc->waitLock = NULL;
  	MyProc->waitProcLock = NULL;
--- 361,367 ----
  	if (IsAutoVacuumWorkerProcess())
  		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
  	MyProc->lwWaiting = false;
! 	MyProc->lwWaitMode = 0;
  	MyProc->lwWaitLink = NULL;
  	MyProc->waitLock = NULL;
  	MyProc->waitProcLock = NULL;
***************
*** 516,522 **** InitAuxiliaryProcess(void)
  	MyPgXact->inCommit = false;
  	MyPgXact->vacuumFlags = 0;
  	MyProc->lwWaiting = false;
! 	MyProc->lwExclusive = false;
  	MyProc->lwWaitLink = NULL;
  	MyProc->waitLock = NULL;
  	MyProc->waitProcLock = NULL;
--- 516,522 ----
  	MyPgXact->inCommit = false;
  	MyPgXact->vacuumFlags = 0;
  	MyProc->lwWaiting = false;
! 	MyProc->lwWaitMode = 0;
  	MyProc->lwWaitLink = NULL;
  	MyProc->waitLock = NULL;
  	MyProc->waitProcLock = NULL;
*** a/src/backend/utils/probes.d
--- b/src/backend/utils/probes.d
***************
*** 35,40 **** provider postgresql {
--- 35,42 ----
  	probe lwlock__wait__done(LWLockId, LWLockMode);
  	probe lwlock__condacquire(LWLockId, LWLockMode);
  	probe lwlock__condacquire__fail(LWLockId, LWLockMode);
+ 	probe lwlock__wait__until__free(LWLockId, LWLockMode);
+ 	probe lwlock__wait__until__free__fail(LWLockId, LWLockMode);
  
  	probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
  	probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 94,100 **** typedef enum LWLockId
  typedef enum LWLockMode
  {
  	LW_EXCLUSIVE,
! 	LW_SHARED
  } LWLockMode;
  
  
--- 94,103 ----
  typedef enum LWLockMode
  {
  	LW_EXCLUSIVE,
! 	LW_SHARED,
! 	LW_WAIT_UNTIL_FREE	/* A special mode used in PGPROC->lwlockMode, when
! 						 * waiting for lock to become free. Not to be used
! 						 * as LWLockAcquire argument */
  } LWLockMode;
  
  
***************
*** 105,110 **** extern bool Trace_lwlocks;
--- 108,114 ----
  extern LWLockId LWLockAssign(void);
  extern void LWLockAcquire(LWLockId lockid, LWLockMode mode);
  extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode);
+ extern bool LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode);
  extern void LWLockRelease(LWLockId lockid);
  extern void LWLockReleaseAll(void);
  extern bool LWLockHeldByMe(LWLockId lockid);
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 101,107 **** struct PGPROC
  
  	/* Info about LWLock the process is currently waiting for, if any. */
  	bool		lwWaiting;		/* true if waiting for an LW lock */
! 	bool		lwExclusive;	/* true if waiting for exclusive access */
  	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
  
  	/* Info about lock the process is currently waiting for, if any. */
--- 101,107 ----
  
  	/* Info about LWLock the process is currently waiting for, if any. */
  	bool		lwWaiting;		/* true if waiting for an LW lock */
! 	uint8		lwWaitMode;		/* lwlock mode being waited for */
  	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
  
  	/* Info about lock the process is currently waiting for, if any. */
