From c4d6badac0ec07a3d4b188eab2078cffdcf57716 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 29 May 2012 20:00:16 +0200
Subject: [PATCH] Fix walsender wakeup handling

The previous coding could miss xlog writeouts at several places. E.g. when wal
was written out by the background writer or even after a commit if
synchronous_commit=off.
This could lead to delays in sending data to the standby of up to 7 seconds.

To fix this move the responsibility of notification to the layer where the
neccessary information is actually present. We take some care not to do the
notification while we hold conteded locks like WALInsertLock or WalWriteLock
locks.

Document the preexisting fact that we rely on SetLatch to be safe from within
signal handlers and critical sections.
---
 src/backend/access/transam/twophase.c |   21 -----------------
 src/backend/access/transam/xact.c     |    7 ------
 src/backend/access/transam/xlog.c     |   18 ++++++++++++++
 src/backend/port/unix_latch.c         |    3 +++
 src/backend/port/win32_latch.c        |    4 ++++
 src/backend/replication/walsender.c   |   42 ++++++++++++++++++++++++++++++++-
 src/include/replication/walsender.h   |    2 ++
 7 files changed, 68 insertions(+), 29 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6db46c0..edbbdf1 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1041,13 +1041,6 @@ EndPrepare(GlobalTransaction gxact)
 
 	/* If we crash now, we have prepared: WAL replay will fix things */
 
-	/*
-	 * Wake up all walsenders to send WAL up to the PREPARE record immediately
-	 * if replication is enabled
-	 */
-	if (max_wal_senders > 0)
-		WalSndWakeup();
-
 	/* write correct CRC and close file */
 	if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
 	{
@@ -2048,13 +2041,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	/* Flush XLOG to disk */
 	XLogFlush(recptr);
 
-	/*
-	 * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
-	 * immediately if replication is enabled
-	 */
-	if (max_wal_senders > 0)
-		WalSndWakeup();
-
 	/* Mark the transaction committed in pg_clog */
 	TransactionIdCommitTree(xid, nchildren, children);
 
@@ -2136,13 +2122,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	XLogFlush(recptr);
 
 	/*
-	 * Wake up all walsenders to send WAL up to the ABORT PREPARED record
-	 * immediately if replication is enabled
-	 */
-	if (max_wal_senders > 0)
-		WalSndWakeup();
-
-	/*
 	 * Mark the transaction aborted in clog.  This is not absolutely necessary
 	 * but we may as well do it while we are here.
 	 */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c71a10e..d697ab8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1137,13 +1137,6 @@ RecordTransactionCommit(void)
 		XLogFlush(XactLastRecEnd);
 
 		/*
-		 * Wake up all walsenders to send WAL up to the COMMIT record
-		 * immediately if replication is enabled
-		 */
-		if (max_wal_senders > 0)
-			WalSndWakeup();
-
-		/*
 		 * Now we may update the CLOG, if we wrote a COMMIT record above
 		 */
 		if (markXidCommitted)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d3650bd..ef64f33 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1014,6 +1014,8 @@ begin:;
 
 		END_CRIT_SECTION();
 
+		/* wakeup the WalSnd now that we released the WALWriteLock */
+		WalSndWakeupProcess();
 		return RecPtr;
 	}
 
@@ -1215,6 +1217,9 @@ begin:;
 
 	END_CRIT_SECTION();
 
+	/* wakeup the WalSnd now that we outside contented locks */
+	WalSndWakeupProcess();
+
 	return RecPtr;
 }
 
@@ -1819,6 +1824,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 			if (finishing_seg || (xlog_switch && last_iteration))
 			{
 				issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
+
+				/* signal that we need to wakeup WalSnd later */
+				WalSndWakeupRequest();
+
 				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
 
 				if (XLogArchivingActive())
@@ -1883,6 +1892,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 				openLogOff = 0;
 			}
 			issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
+
+			/* signal that we need to wakeup WalSnd later */
+			WalSndWakeupRequest();
 		}
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
@@ -2146,6 +2158,9 @@ XLogFlush(XLogRecPtr record)
 
 	END_CRIT_SECTION();
 
+	/* wakeup the WalSnd now that we released the WALWriteLock */
+	WalSndWakeupProcess();
+
 	/*
 	 * If we still haven't flushed to the request point then we have a
 	 * problem; most likely, the requested flush point is past end of XLOG.
@@ -2271,6 +2286,9 @@ XLogBackgroundFlush(void)
 
 	END_CRIT_SECTION();
 
+	/* wakeup the WalSnd now that we released the WALWriteLock */
+	WalSndWakeupProcess();
+
 	return wrote_something;
 }
 
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index e64282c..50731ac 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -416,6 +416,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
  * NB: when calling this in a signal handler, be sure to save and restore
  * errno around it.  (That's standard practice in most signal handlers, of
  * course, but we used to omit it in handlers that only set a flag.)
+ *
+ * NB: this function is called from critical sections and signal handlers so
+ * throwing an error is not a good idea.
  */
 void
 SetLatch(volatile Latch *latch)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index 05b3426..988da53 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -246,6 +246,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	return result;
 }
 
+/*
+ * The comments above the unix implementation (unix_latch.c) of this function
+ * apply here as well.
+ */
 void
 SetLatch(volatile Latch *latch)
 {
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5f93812..459127b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -106,6 +106,12 @@ static StringInfoData reply_message;
  */
 static TimestampTz last_reply_timestamp;
 
+
+/*
+ * State for WalSndWakeupRequest
+ */
+static bool wroteNewXlogData = false;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -1423,7 +1429,12 @@ WalSndShmemInit(void)
 	}
 }
 
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
 void
 WalSndWakeup(void)
 {
@@ -1433,6 +1444,35 @@ WalSndWakeup(void)
 		SetLatch(&WalSndCtl->walsnds[i].latch);
 }
 
+/*
+ * Remember that we want to wakeup walsenders later
+ *
+ * This is separated from doing the actual wakeup because the writeout is done
+ * while holding contended locks.
+ */
+void
+WalSndWakeupRequest(void)
+{
+	wroteNewXlogData = true;
+}
+
+/*
+ * wakeup walsenders if there is work to be done
+ */
+void
+WalSndWakeupProcess(void)
+{
+	if(wroteNewXlogData){
+		wroteNewXlogData = false;
+		/*
+		 * Wake up all walsenders to send WAL up to the point where its flushed
+		 * safely to disk.
+		 */
+		if (max_wal_senders > 0)
+			WalSndWakeup();
+	}
+}
+
 /* Set state for current walsender (only called in walsender) */
 void
 WalSndSetState(WalSndState state)
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 128d2db..38191e7 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -31,6 +31,8 @@ extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
 extern void WalSndWakeup(void);
+extern void WalSndWakeupRequest(void);
+extern void WalSndWakeupProcess(void);
 extern void WalSndRqstFileReload(void);
 
 extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
-- 
1.7.10.rc3.3.g19a6c.dirty

