From 3368c16e7a8f30216e7d9579f5d2ca3b923259d5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 18 Nov 2016 13:14:55 +0900
Subject: [PATCH 2/2] Preserve WAL segments requred by synchronous standbys.

Since synchronous standby doesn't sync non-commit records, a large
transaction may unexpectedly break a sync replication. This patch
makes CreateCheckPoint to preserve all WAL segments required by the
currently established synchronous replication.
---
 src/backend/access/transam/xlog.c | 26 ++++++++++++++++++++++++++
 src/backend/replication/syncrep.c | 23 ++++++++++-------------
 src/include/replication/syncrep.h |  4 ++++
 3 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6cec027..195272e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -49,6 +49,7 @@
 #include "replication/slot.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/barrier.h"
@@ -8628,12 +8629,37 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 	{
 		XLogSegNo	_logSegNo;
+		bool		in_sync, am_sync;
+		XLogRecPtr	repwriteptr, repflushptr, repapplyptr;
 
 		/* Update the average distance between checkpoints. */
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
 		XLByteToSeg(PriorRedoPtr, _logSegNo);
 		KeepLogSeg(recptr, &_logSegNo);
+
+		/*
+		 * If I am under satisfied synchronous replication, refrain from
+		 * removing segments apparently required by them. Refferring to write
+		 * pointer is enough.
+		 */
+		in_sync = SyncRepGetOldestSyncRecPtr(&repwriteptr, &repflushptr,
+											 &repapplyptr, &am_sync, true);
+		if (in_sync && repwriteptr != InvalidXLogRecPtr)
+		{
+			XLogSegNo synckeep;
+
+			XLByteToSeg(repwriteptr, synckeep);
+			if (synckeep < _logSegNo)
+			{
+				ereport(WARNING,
+						(errmsg("sync replication too retarded. %lu extra WAL segments are preserved (last segno to preesrve is moved from %lx to %lx)",
+								_logSegNo - synckeep, _logSegNo, synckeep),
+						 errhint("If you see this message too frequently, consider increasing wal_keep_segments or max_wal_size.")));
+				_logSegNo = synckeep;
+			}
+		}
+
 		_logSegNo--;
 		RemoveOldXlogFiles(_logSegNo, PriorRedoPtr, recptr);
 	}
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ac29f56..343217bc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -86,10 +86,6 @@ static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
 static int	SyncRepWakeQueue(bool all, int mode);
 
-static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
-						   XLogRecPtr *flushPtr,
-						   XLogRecPtr *applyPtr,
-						   bool *am_sync);
 static int	SyncRepGetStandbyPriority(void);
 
 #ifdef USE_ASSERT_CHECKING
@@ -417,7 +413,7 @@ SyncRepReleaseWaiters(void)
 	 * positions among all sync standbys.
 	 */
 	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
-											&applyPtr, &am_sync);
+											&applyPtr, &am_sync, false);
 
 	/*
 	 * If we are managing a sync standby, though we weren't prior to this,
@@ -473,16 +469,17 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
  *
- * Return false if the number of sync standbys is less than
- * synchronous_standby_names specifies. Otherwise return true and
- * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ * Return true if the oldest WAL locations are set. They are set only when the
+ * synchronous_standby_names is defined and satisfied, and I am connected from
+ * a synchronous standby. The locations are set even when connected from a
+ * synchronous standby if force is true.
  *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *am_sync is set to true on return if this walsender is connecting to a
+ * synchronous standby.
  */
-static bool
+bool
 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-						   XLogRecPtr *applyPtr, bool *am_sync)
+						   XLogRecPtr *applyPtr, bool *am_sync, bool force)
 {
 	List	   *sync_standbys;
 	ListCell   *cell;
@@ -499,7 +496,7 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 	 * Quick exit if we are not managing a sync standby or there are not
 	 * enough synchronous standbys.
 	 */
-	if (!(*am_sync) ||
+	if ((!force && !(*am_sync)) ||
 		SyncRepConfig == NULL ||
 		list_length(sync_standbys) < SyncRepConfig->num_sync)
 	{
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index e4e0e27..0c42523 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -71,6 +71,10 @@ extern List *SyncRepGetSyncStandbys(bool *am_sync);
 
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
+extern bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+									   XLogRecPtr *flushPtr,
+									   XLogRecPtr *applyPtr,
+									   bool *am_sync, bool force);
 
 /* GUC infrastructure */
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
-- 
2.9.2

