From 8590125d66ce366b35251e5aff14db1a858edda9 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 27 Dec 2023 16:40:27 -0500
Subject: [PATCH v2 2/5] Add LSNTimeline for converting LSN <-> time

Add a new structure, LSNTimeline, consisting of LSNTimes -- each an LSN,
time pair. Each LSNTime can represent multiple logical LSN, time pairs,
referred to as members. LSN <-> time conversions can be done using
linear interpolation with two LSNTimes on the LSNTimeline.

This commit does not add a global instance of LSNTimeline. It adds the
structures and functions needed to maintain and access such a timeline.
---
 src/backend/utils/activity/pgstat_wal.c | 199 ++++++++++++++++++++++++
 src/include/pgstat.h                    |  34 ++++
 src/tools/pgindent/typedefs.list        |   2 +
 3 files changed, 235 insertions(+)

diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index 1a3c0e1a669..e8d9660f82e 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -17,8 +17,11 @@
 
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "utils/pgstat_internal.h"
 #include "executor/instrument.h"
+#include "utils/builtins.h"
+#include "utils/timestamp.h"
 
 
 PgStat_PendingWalStats PendingWalStats = {0};
@@ -32,6 +35,12 @@ PgStat_PendingWalStats PendingWalStats = {0};
 static WalUsage prevWalUsage;
 
 
+static void lsntime_absorb(LSNTime *a, const LSNTime *b);
+void lsntime_insert(LSNTimeline *timeline, TimestampTz time, XLogRecPtr lsn);
+
+XLogRecPtr estimate_lsn_at_time(const LSNTimeline *timeline, TimestampTz time);
+TimestampTz estimate_time_at_lsn(const LSNTimeline *timeline, XLogRecPtr lsn);
+
 /*
  * Calculate how much WAL usage counters have increased and update
  * shared WAL and IO statistics.
@@ -184,3 +193,193 @@ pgstat_wal_snapshot_cb(void)
 		   sizeof(pgStatLocal.snapshot.wal));
 	LWLockRelease(&stats_shmem->lock);
 }
+
+/*
+ * Set *a to be the earlier of *a or *b.
+ */
+static void
+lsntime_absorb(LSNTime *a, const LSNTime *b)
+{
+	LSNTime		result;
+	uint64		new_members = a->members + b->members;
+
+	if (a->time < b->time)
+		result = *a;
+	else if (b->time < a->time)
+		result = *b;
+	else if (a->lsn < b->lsn)
+		result = *a;
+	else if (b->lsn < a->lsn)
+		result = *b;
+	else
+		result = *a;
+
+	*a = result;
+	a->members = new_members;
+}
+
+/*
+ * Insert a new LSNTime into the LSNTimeline in the first element with spare
+ * capacity.
+ */
+void
+lsntime_insert(LSNTimeline *timeline, TimestampTz time,
+			   XLogRecPtr lsn)
+{
+	LSNTime		temp;
+	LSNTime		carry = {.lsn = lsn,.time = time,.members = 1};
+
+	for (int i = 0; i < timeline->length; i++)
+	{
+		bool		full;
+		LSNTime    *cur = &timeline->data[i];
+
+		/*
+		 * An array element's capacity to represent members is 2 ^ its
+		 * position in the array.
+		 */
+		full = cur->members >= (1 << i);
+
+		/*
+		 * If the current element is not yet at capacity, then insert the
+		 * passed-in LSNTime into this element by taking the smaller of the it
+		 * and the current LSNTime element. This is required to ensure that
+		 * time moves forward on the timeline.
+		 */
+		if (!full)
+		{
+			Assert(cur->members == carry.members);
+			Assert(cur->members + carry.members <= 1 << i);
+			lsntime_absorb(cur, &carry);
+			return;
+		}
+
+		/*
+		 * If the current element is full, ensure that the inserting LSNTime
+		 * is larger than the current element. This must be true for time to
+		 * move forward on the timeline.
+		 */
+		Assert(carry.lsn >= cur->lsn || carry.time >= cur->time);
+
+		/*
+		 * If the element is at capacity, swap the element with the carry and
+		 * continue on to find an element with space to represent the new
+		 * member.
+		 */
+		temp = *cur;
+		*cur = carry;
+		carry = temp;
+	}
+
+	/*
+	 * Time to use another element in the array -- and increase the length in
+	 * the process
+	 */
+	timeline->data[timeline->length] = carry;
+	timeline->length++;
+}
+
+
+/*
+ * Translate time to a LSN using the provided timeline. The timeline will not
+ * be modified.
+ */
+XLogRecPtr
+estimate_lsn_at_time(const LSNTimeline *timeline, TimestampTz time)
+{
+	TimestampTz time_elapsed;
+	XLogRecPtr	lsns_elapsed;
+	double		result;
+
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the target time is after the current time, our best estimate of the
+	 * LSN is the current insert LSN.
+	 */
+	if (time >= end.time)
+		return end.lsn;
+
+	for (int i = 0; i < timeline->length; i++)
+	{
+		/* Pass times more recent than our target time */
+		if (timeline->data[i].time > time)
+			continue;
+
+		/* Found the first element before our target time */
+		start = timeline->data[i];
+
+		/*
+		 * If there is only one element in the array, use the current time as
+		 * the end of the range. Otherwise it is the element preceding our
+		 * start.
+		 */
+		if (i > 0)
+			end = timeline->data[i - 1];
+		break;
+	}
+
+	time_elapsed = end.time - start.time;
+	Assert(time_elapsed != 0);
+
+	lsns_elapsed = end.lsn - start.lsn;
+	Assert(lsns_elapsed != 0);
+
+	result = (double) (time - start.time) / time_elapsed * lsns_elapsed + start.lsn;
+	if (result < 0)
+		return InvalidXLogRecPtr;
+	return result;
+}
+
+/*
+ * Translate lsn to a time using the provided timeline. The timeline will not
+ * be modified.
+ */
+TimestampTz
+estimate_time_at_lsn(const LSNTimeline *timeline, XLogRecPtr lsn)
+{
+	TimestampTz time_elapsed;
+	XLogRecPtr	lsns_elapsed;
+	TimestampTz result;
+
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the target LSN is after the current insert LSN, the current time is
+	 * our best estimate.
+	 */
+	if (lsn >= end.lsn)
+		return end.time;
+
+	for (int i = 0; i < timeline->length; i++)
+	{
+		/* Pass LSNs more recent than our target LSN */
+		if (timeline->data[i].lsn > lsn)
+			continue;
+
+		/* Found the first element before our target LSN */
+		start = timeline->data[i];
+
+		/*
+		 * If there is only one element in the array, use the current LSN and
+		 * time as the end of the range. Otherwise, use the preceding element
+		 * (the first element occuring before our target LSN in the timeline).
+		 */
+		if (i > 0)
+			end = timeline->data[i - 1];
+		break;
+	}
+
+	time_elapsed = end.time - start.time;
+	Assert(time_elapsed != 0);
+
+	lsns_elapsed = end.lsn - start.lsn;
+	Assert(lsns_elapsed != 0);
+
+	result = (lsn - start.lsn) / lsns_elapsed * time_elapsed + start.time;
+	if (result < 0)
+		return 0;
+	return result;
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 2136239710e..4f25773d681 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -11,6 +11,7 @@
 #ifndef PGSTAT_H
 #define PGSTAT_H
 
+#include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
@@ -428,6 +429,39 @@ typedef struct PgStat_StatTabEntry
 	PgStat_Counter autoanalyze_count;
 } PgStat_StatTabEntry;
 
+/*
+ * The elements of an LSNTimeline. Each LSNTime represents one or more time,
+ * LSN pairs. The LSN is typically the insert LSN recorded at the time. Members
+ * is the number of logical members -- each a time, LSN pair -- represented in
+ * the LSNTime.
+ */
+typedef struct LSNTime
+{
+	TimestampTz time;
+	XLogRecPtr	lsn;
+	uint64		members;
+} LSNTime;
+
+/*
+ * A timeline consists of LSNTimes from most to least recent. Each element of
+ * the array in the timeline may represent 2^array index logical members --
+ * meaning that each element's capacity is twice that of the preceding element.
+ * This gives more recent times greater precision than less recent ones. An
+ * array of size 64 should provide sufficient capacity without accounting for
+ * what to do when all elements of the array are at capacity.
+ *
+ * When LSNTimes are inserted into the timeline, they are absorbed into the
+ * first array element with spare capacity -- with the new combined element
+ * having the lesser of the two values. The timeline's length is the highest
+ * array index representing one or more logical members. Use the timeline for
+ * LSN <-> time conversion using linear interpolation.
+ */
+typedef struct LSNTimeline
+{
+	int			length;
+	LSNTime		data[64];
+} LSNTimeline;
+
 typedef struct PgStat_WalStats
 {
 	PgStat_Counter wal_records;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 90b37b919c2..32057181277 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1525,6 +1525,8 @@ LogicalTapeSet
 LsnReadQueue
 LsnReadQueueNextFun
 LsnReadQueueNextStatus
+LSNTime
+LSNTimeline
 LtreeGistOptions
 LtreeSignature
 MAGIC
-- 
2.37.2

