From ac47e250cf88b1279556e27c33e9f29806fdc04d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 7 Sep 2017 19:13:22 +0900
Subject: [PATCH 2/2] Add monitoring aid for max_replication_slots.

Adds two columns "live" and "distance" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows how long a slot can live on or how many bytes a
slot have lost if max_slot_wal_keep_size is set.
---
 src/backend/access/transam/xlog.c    | 80 ++++++++++++++++++++++++++++++++++++
 src/backend/catalog/system_views.sql |  4 +-
 src/backend/replication/slotfuncs.c  | 16 +++++++-
 src/include/access/xlog.h            |  1 +
 src/include/catalog/pg_proc.h        |  2 +-
 5 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ae70d7d..c4c8307 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9324,6 +9324,86 @@ CreateRestartPoint(int flags)
 }
 
 /*
+ * Check if the record on the given lsn will be preserved at the next
+ * checkpoint.
+ *
+ * Returns true if it will be preserved. If distance is given, the distance
+ * from origin to the beginning of the first segment kept at the next
+ * checkpoint. It means margin when this function returns true and gap of lost
+ * records when false.
+ *
+ * This function should return the consistent result with KeepLogSeg.
+ */
+bool
+GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr tailpos;
+	uint64 currSeg;
+	uint64 restByteInSeg;
+	uint64 restartSeg;
+	uint64 tailSeg;
+	uint64 keepSegs;
+
+	currpos = GetXLogWriteRecPtr();
+
+	LWLockAcquire(ControlFileLock, LW_SHARED);
+	tailpos = ControlFile->checkPointCopy.redo;
+	LWLockRelease(ControlFileLock);
+
+	/* Move the pointer to the beginning of the segment*/
+	XLByteToSeg(currpos, currSeg);
+	XLByteToSeg(restartLSN, restartSeg);
+	XLByteToSeg(tailpos, tailSeg);
+	restByteInSeg = 0;
+
+	Assert(wal_keep_segments >= 0);
+	Assert(max_slot_wal_keep_size_mb >= 0);
+
+	/*
+	 * WAL are removed by the unit of segment.
+	 */
+	keepSegs = wal_keep_segments + ConvertToXSegs(max_slot_wal_keep_size_mb);
+
+	/*
+	 * If the latest checkpoint's redo point is older than the current head
+	 * minus keep segments, the next checkpoint keeps the redo point's
+	 * segment. Elsewise use current head minus number of segments to keep.
+	 */
+	if (currSeg < tailSeg + keepSegs)
+	{
+		if (currSeg < keepSegs)
+			tailSeg = 0;
+		else
+			tailSeg = currSeg - keepSegs;
+
+		/* In this case, the margin will be the bytes to the next segment */
+		restByteInSeg = XLogSegSize - (currpos % XLogSegSize);
+	}
+
+	elog(LOG, "rest %lx, curr %lx, tail %lx", restartSeg, currSeg, tailSeg);
+	/* Required sements will be removed at the next checkpoint */
+	if (restartSeg < tailSeg)
+	{
+		/* Calculate how may bytes the slot have lost */
+		if (distance)
+		{
+			uint64 restbytes = (restartSeg + 1) * XLogSegSize - restartLSN;
+			*distance =
+				(tailSeg - restartSeg - 1) * XLogSegSize
+				+ restbytes;
+		}
+		return false;
+	}
+
+	/* Margin at the next checkpoint before the slot lose sync  */
+	if (distance)
+		*distance = (restartSeg - tailSeg) * XLogSegSize + restByteInSeg;
+
+	return true;
+}
+
+/*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
  *
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index dc40cde..c55c88b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -793,7 +793,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+			L.live,
+			L.distance
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d4cbd83..52a9d26 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -182,7 +182,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -304,6 +304,20 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (max_slot_wal_keep_size_mb > 0 && restart_lsn != InvalidXLogRecPtr)
+		{
+			uint64 distance;
+
+			values[i++] = BoolGetDatum(GetMarginToSlotSegmentLimit(restart_lsn,
+																   &distance));
+			values[i++] = Int64GetDatum(distance);
+		}
+		else
+		{
+			values[i++] = BoolGetDatum(true);
+			nulls[i++] = true;
+		}
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index f596794..30507eb 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -267,6 +267,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern bool GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 8b33b4e..4bad345 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5293,7 +5293,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,16,3220}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,live,distance}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
-- 
2.9.2

