From 6742e3b31ac53ef54458d64e34feeac7d4c710d2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 18 Jun 2020 13:36:23 +0900
Subject: [PATCH v2 1/2] Make pg_stat_replication view consistent

min_safe_lsn is not consistent within the result of a query. Make it
consistent. On the way fixing that min_safe_lsn is changed to show the
second byte of a segment instead of first byte in order to users can
get the intended segment file name from pg_walfile_name.
---
 src/backend/access/transam/xlog.c   | 15 +++++++------
 src/backend/replication/slotfuncs.c | 35 ++++++++++++++++++++---------
 src/include/access/xlog.h           |  4 +++-
 3 files changed, 35 insertions(+), 19 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..940f5fcb18 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9484,6 +9484,9 @@ CreateRestartPoint(int flags)
  * Report availability of WAL for the given target LSN
  *		(typically a slot's restart_lsn)
  *
+ * currWriteLSN and lastRemovedSeg is the results of XLogGetLastRemovedSegno()
+ * and GetXLogWriteRecPtr() respectively at the referring time.
+ *
  * Returns one of the following enum values:
  * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
  *   of max_wal_size.
@@ -9498,9 +9501,9 @@ CreateRestartPoint(int flags)
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
 WALAvailability
-GetWALAvailability(XLogRecPtr targetLSN)
+GetWALAvailability(XLogRecPtr targetLSN, XLogRecPtr currWriteLSN,
+				   XLogSegNo lastRemovedSeg)
 {
-	XLogRecPtr	currpos;		/* current write LSN */
 	XLogSegNo	currSeg;		/* segid of currpos */
 	XLogSegNo	targetSeg;		/* segid of targetLSN */
 	XLogSegNo	oldestSeg;		/* actual oldest segid */
@@ -9513,21 +9516,19 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
-	currpos = GetXLogWriteRecPtr();
-
 	/* calculate oldest segment currently needed by slots */
 	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
-	KeepLogSeg(currpos, &oldestSlotSeg);
+	KeepLogSeg(currWriteLSN, &oldestSlotSeg);
 
 	/*
 	 * Find the oldest extant segment file. We get 1 until checkpoint removes
 	 * the first WAL segment file since startup, which causes the status being
 	 * wrong under certain abnormal conditions but that doesn't actually harm.
 	 */
-	oldestSeg = XLogGetLastRemovedSegno() + 1;
+	oldestSeg = lastRemovedSeg + 1;
 
 	/* calculate oldest segment by max_wal_size and wal_keep_segments */
-	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	XLByteToSeg(currWriteLSN, currSeg, wal_segment_size);
 	keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
 							  wal_segment_size) + 1;
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 1b929a603e..063c6dd435 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -243,6 +243,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int			slotno;
+	XLogSegNo	last_removed_seg;
+	XLogRecPtr	curr_write_lsn;
+	XLogRecPtr	min_safe_lsn = 0;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -272,6 +275,24 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	rsinfo->setResult = tupstore;
 	rsinfo->setDesc = tupdesc;
 
+	/*
+	 * Remember the last removed segment and current WAL write LSN at this
+	 * point for the consistency in this table. Since there's no interlock
+	 * between slot data and checkpointer, the segment can be removed
+	 * in-between, but that doesn't make any practical difference.  Also we
+	 * calculate safe_min_lsn here as the same value is shown for all slots.
+	 */
+	last_removed_seg = XLogGetLastRemovedSegno();
+	curr_write_lsn = GetXLogWriteRecPtr();
+
+	/*
+	 * Show the next byte after segment boundary as min_safe_lsn so that
+	 * pg_walfile_name() returns the correct file name for the value.
+	 */
+	if (max_slot_wal_keep_size_mb >= 0 && last_removed_seg != 0)
+		XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 1,
+								wal_segment_size, min_safe_lsn);
+
 	MemoryContextSwitchTo(oldcontext);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -282,7 +303,6 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
-		XLogSegNo	last_removed_seg;
 		int			i;
 
 		if (!slot->in_use)
@@ -342,7 +362,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		walstate = GetWALAvailability(slot_contents.data.restart_lsn,
+									  curr_write_lsn, last_removed_seg);
 
 		switch (walstate)
 		{
@@ -366,16 +387,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				elog(ERROR, "invalid walstate: %d", (int) walstate);
 		}
 
-		if (max_slot_wal_keep_size_mb >= 0 &&
-			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
-			((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
-		{
-			XLogRecPtr	min_safe_lsn;
-
-			XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
-									wal_segment_size, min_safe_lsn);
+		if (min_safe_lsn != 0)
 			values[i++] = Int64GetDatum(min_safe_lsn);
-		}
 		else
 			nulls[i++] = true;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e917dfe92d..bbe00e7195 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -326,7 +326,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN,
+										  XLogRecPtr currWriteLSN,
+										  XLogSegNo last_removed_seg);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
-- 
2.18.4

