From 800b0610f430f965e9216a374afe638bbec7bb6f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 27 Dec 2023 16:40:27 -0500
Subject: [PATCH v3 2/5] Add LSNTimeline for converting LSN <-> time

Add a new structure, LSNTimeline, consisting of LSNTimes -- each an LSN,
time pair. Each LSNTime can represent multiple logical LSN, time pairs,
referred to as members. LSN <-> time conversions can be done using
linear interpolation with two LSNTimes on the LSNTimeline.

This commit does not add a global instance of LSNTimeline. It adds the
structures and functions needed to maintain and access such a timeline.
---
 src/backend/utils/activity/pgstat_wal.c | 224 ++++++++++++++++++++++++
 src/include/pgstat.h                    |  44 +++++
 src/tools/pgindent/typedefs.list        |   2 +
 3 files changed, 270 insertions(+)

diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index 1a3c0e1a669..96e84319f6f 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -17,8 +17,12 @@
 
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "utils/pgstat_internal.h"
 #include "executor/instrument.h"
+#include "utils/builtins.h"
+#include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 
 PgStat_PendingWalStats PendingWalStats = {0};
@@ -32,6 +36,12 @@ PgStat_PendingWalStats PendingWalStats = {0};
 static WalUsage prevWalUsage;
 
 
+static void lsntime_merge(LSNTime *target, LSNTime *src);
+static void lsntime_insert(LSNTimeline *timeline, TimestampTz time, XLogRecPtr lsn);
+
+XLogRecPtr	estimate_lsn_at_time(const LSNTimeline *timeline, TimestampTz time);
+TimestampTz estimate_time_at_lsn(const LSNTimeline *timeline, XLogRecPtr lsn);
+
 /*
  * Calculate how much WAL usage counters have increased and update
  * shared WAL and IO statistics.
@@ -184,3 +194,217 @@ pgstat_wal_snapshot_cb(void)
 		   sizeof(pgStatLocal.snapshot.wal));
 	LWLockRelease(&stats_shmem->lock);
 }
+
+/*
+ * Set *target to be the earlier of *target or *src.
+ */
+static void
+lsntime_merge(LSNTime *target, LSNTime *src)
+{
+	LSNTime		result;
+	uint64		new_members = target->members + src->members;
+
+	if (target->time < src->time)
+		result = *target;
+	else if (src->time < target->time)
+		result = *src;
+	else if (target->lsn < src->lsn)
+		result = *target;
+	else if (src->lsn < target->lsn)
+		result = *src;
+	else
+		result = *target;
+
+	*target = result;
+	target->members = new_members;
+	src->members = 1;
+}
+
+static int
+lsntime_merge_target(LSNTimeline *timeline)
+{
+	/* Don't merge if free space available */
+	Assert(timeline->length == LSNTIMELINE_VOLUME);
+
+	for (int i = timeline->length; i-- > 0;)
+	{
+		/*
+		 * An array element can represent up to twice the number of members
+		 * represented by the preceding array element.
+		 */
+		if (timeline->data[i].members < (2 * timeline->data[i - 1].members))
+			return i;
+	}
+
+	/* Should not be reachable or we are out of space */
+	Assert(false);
+}
+
+/*
+ * Insert a new LSNTime into the LSNTimeline in the first available element,
+ * or, if there are no empty elements, insert it into the element at index 0,
+ * merge the logical members of two old buckets and move the intervening
+ * elements down by one.
+ */
+void
+lsntime_insert(LSNTimeline *timeline, TimestampTz time,
+			   XLogRecPtr lsn)
+{
+	int			merge_target;
+	LSNTime		entrant = {.lsn = lsn,.time = time,.members = 1};
+
+	if (timeline->length < LSNTIMELINE_VOLUME)
+	{
+		/*
+		 * The new entry should exceed the most recent entry to ensure time
+		 * moves forward on the timeline.
+		 */
+		Assert(timeline->length == 0 ||
+			   (lsn >= timeline->data[LSNTIMELINE_VOLUME - timeline->length].lsn &&
+				time >= timeline->data[LSNTIMELINE_VOLUME - timeline->length].time));
+
+		/*
+		 * If there are unfilled elements in the timeline, then insert the
+		 * passed-in LSNTime into the tail of the array.
+		 */
+		timeline->length++;
+		timeline->data[LSNTIMELINE_VOLUME - timeline->length] = entrant;
+		return;
+	}
+
+	/*
+	 * If all elements in the timeline represent at least one member, merge
+	 * the oldest element whose membership is < 2x its predecessor with its
+	 * preceding member. Then shift all elements preceding these two elements
+	 * down by one and insert the passed-in LSNTime at array index 0.
+	 */
+	merge_target = lsntime_merge_target(timeline);
+	Assert(merge_target >= 0 && merge_target < timeline->length);
+	lsntime_merge(&timeline->data[merge_target], &timeline->data[merge_target - 1]);
+	memmove(&timeline->data[1], &timeline->data[0], sizeof(LSNTime) * merge_target - 1);
+	timeline->data[0] = entrant;
+}
+
+/*
+ * Translate time to a LSN using the provided timeline. The timeline will not
+ * be modified.
+ */
+XLogRecPtr
+estimate_lsn_at_time(const LSNTimeline *timeline, TimestampTz time)
+{
+	XLogRecPtr	result;
+	int64		time_elapsed,
+				lsns_elapsed;
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the provided time is before DB startup, the best we can do is return
+	 * the start LSN.
+	 */
+	if (time < start.time)
+		return start.lsn;
+
+	/*
+	 * If the provided time is after now, the current LSN is our best
+	 * estimate.
+	 */
+	if (time >= end.time)
+		return end.lsn;
+
+	/*
+	 * Loop through the timeline. Stop at the first LSNTime earlier than our
+	 * target time. This LSNTime will be our interpolation start point. If
+	 * there's an LSNTime later than that, then that will be our interpolation
+	 * end point.
+	 */
+	for (int i = LSNTIMELINE_VOLUME - timeline->length; i < LSNTIMELINE_VOLUME; i++)
+	{
+		if (timeline->data[i].time > time)
+			continue;
+
+		start = timeline->data[i];
+		if (i > 0)
+			end = timeline->data[i - 1];
+		goto stop;
+	}
+
+	/*
+	 * If we exhausted the timeline, then use its earliest LSNTime as our
+	 * interpolation end point.
+	 */
+	if (timeline->length > 0)
+		end = timeline->data[timeline->length - 1];
+
+stop:
+	Assert(end.time > start.time);
+	Assert(end.lsn > start.lsn);
+	time_elapsed = end.time - start.time;
+	Assert(time_elapsed != 0);
+	lsns_elapsed = end.lsn - start.lsn;
+	Assert(lsns_elapsed != 0);
+	result = (double) (time - start.time) / time_elapsed * lsns_elapsed + start.lsn;
+	return Max(result, 0);
+}
+
+/*
+ * Translate lsn to a time using the provided timeline. The timeline will not
+ * be modified.
+ */
+TimestampTz
+estimate_time_at_lsn(const LSNTimeline *timeline, XLogRecPtr lsn)
+{
+	int64		time_elapsed,
+				lsns_elapsed;
+	TimestampTz result;
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the LSN is before DB startup, the best we can do is return that
+	 * time.
+	 */
+	if (lsn <= start.lsn)
+		return start.time;
+
+	/*
+	 * If the target LSN is after the current insert LSN, the current time is
+	 * our best estimate.
+	 */
+	if (lsn >= end.lsn)
+		return end.time;
+
+	/*
+	 * Loop through the timeline. Stop at the first LSNTime earlier than our
+	 * target LSN. This LSNTime will be our interpolation start point. If
+	 * there's an LSNTime later than that, then that will be our interpolation
+	 * end point.
+	 */
+	for (int i = LSNTIMELINE_VOLUME - timeline->length; i < LSNTIMELINE_VOLUME; i++)
+	{
+		if (timeline->data[i].lsn > lsn)
+			continue;
+
+		start = timeline->data[i];
+		if (i > 0)
+			end = timeline->data[i - 1];
+		goto stop;
+	}
+
+	/*
+	 * If we exhausted the timeline, then use its earliest LSNTime as our
+	 * interpolation end point.
+	 */
+	if (timeline->length > 0)
+		end = timeline->data[timeline->length - 1];
+
+stop:
+	Assert(end.time > start.time);
+	Assert(end.lsn > start.lsn);
+	time_elapsed = end.time - start.time;
+	Assert(time_elapsed != 0);
+	lsns_elapsed = end.lsn - start.lsn;
+	Assert(lsns_elapsed != 0);
+	result = (double) (lsn - start.lsn) / lsns_elapsed * time_elapsed + start.time;
+	return Max(result, 0);
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 2136239710e..1926ddb00ed 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -11,6 +11,7 @@
 #ifndef PGSTAT_H
 #define PGSTAT_H
 
+#include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
@@ -428,6 +429,49 @@ typedef struct PgStat_StatTabEntry
 	PgStat_Counter autoanalyze_count;
 } PgStat_StatTabEntry;
 
+/*
+ * The elements of an LSNTimeline. Each LSNTime represents one or more time,
+ * LSN pairs. The LSN is typically the insert LSN recorded at the time. Members
+ * is the number of logical members -- each a time, LSN pair -- represented in
+ * the LSNTime.
+ */
+typedef struct LSNTime
+{
+	TimestampTz time;
+	XLogRecPtr	lsn;
+	uint64		members;
+} LSNTime;
+
+#define LSNTIMELINE_VOLUME 64
+/*
+ * A timeline consists of LSNTimes from most to least recent. The array is
+ * filled from end to start before the contents of any elements are merged.
+ * Once the LSNTimeline length == volume (the array is full), old elements are
+ * merged to make space for new elements at index 0. When merging logical
+ * members, each element of the array in the timeline may represent twice as
+ * many logical members as the preceding element.
+ *
+ * This gives more recent times greater precision than less recent ones. An
+ * array of size 64 and an unsigned 64-bit number for the number of members
+ * should provide sufficient capacity without accounting for what to do when
+ * all elements of the array are at capacity.
+ *
+ * After every element has at least one logical member, when a new LSNTime is
+ * inserted, the oldest array element whose logical membership is < 2x its
+ * predecessor is the merge target. Its preceding element is merged into it.
+ * Then all of the intervening elements are moved down by one and the new
+ * LSNTime is inserted at index 0.
+ *
+ * Merging two elements is combining their members and assigning the lesser
+ * LSNTime. Use the timeline for LSN <-> time conversion using linear
+ * interpolation.
+ */
+typedef struct LSNTimeline
+{
+	int			length;
+	LSNTime		data[LSNTIMELINE_VOLUME];
+} LSNTimeline;
+
 typedef struct PgStat_WalStats
 {
 	PgStat_Counter wal_records;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d808aad8b05..aef83230836 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1525,6 +1525,8 @@ LogicalTapeSet
 LsnReadQueue
 LsnReadQueueNextFun
 LsnReadQueueNextStatus
+LSNTime
+LSNTimeline
 LtreeGistOptions
 LtreeSignature
 MAGIC
-- 
2.37.2

