From d6dc1128f75d883332945ab27f98a8c70b83b607 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 27 Dec 2023 16:40:27 -0500
Subject: [PATCH v6 2/6] Add LSNTimeStream for converting LSN <-> time

Add a new structure, LSNTimeStream, consisting of LSNTimes -- each an
LSN, time pair. The LSNTimeStream is fixed size, so when a new LSNTime
is inserted to a full LSNTimeStream, an LSNTime is dropped and the new
LSNTime is inserted. We drop the LSNTime whose absence would cause the
least error when interpolating between its adjoining points.

LSN <-> time conversions can be done using linear interpolation with two
LSNTimes on the LSNTimeStream.

This commit does not add a global instance of LSNTimeStream. It adds the
structures and functions needed to maintain and access such a stream.
---
 src/backend/utils/activity/pgstat_wal.c | 323 ++++++++++++++++++++++++
 src/include/pgstat.h                    |  32 +++
 src/tools/pgindent/typedefs.list        |   2 +
 3 files changed, 357 insertions(+)

diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index e2a3f6b865c..affab8437c8 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -17,8 +17,12 @@
 
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "executor/instrument.h"
+#include "math.h"
+#include "utils/builtins.h"
 #include "utils/pgstat_internal.h"
+#include "utils/timestamp.h"
 
 
 PgStat_PendingWalStats PendingWalStats = {0};
@@ -32,6 +36,11 @@ PgStat_PendingWalStats PendingWalStats = {0};
 static WalUsage prevWalUsage;
 
 
+static void lsntime_insert(LSNTimeStream *stream, TimestampTz time, XLogRecPtr lsn);
+
+XLogRecPtr	estimate_lsn_at_time(const LSNTimeStream *stream, TimestampTz time);
+TimestampTz estimate_time_at_lsn(const LSNTimeStream *stream, XLogRecPtr lsn);
+
 /*
  * Calculate how much WAL usage counters have increased and update
  * shared WAL and IO statistics.
@@ -192,3 +201,317 @@ pgstat_wal_snapshot_cb(void)
 		   sizeof(pgStatLocal.snapshot.wal));
 	LWLockRelease(&stats_shmem->lock);
 }
+
+/*
+ * Given three LSNTimes, calculate the area of the triangle they form were they
+ * plotted with time on the X axis and LSN on the Y axis. An illustration:
+ *
+ *   LSN
+ *    |
+ *    |                                                         * right
+ *    |
+ *    |
+ *    |
+ *    |                                                * mid    * C
+ *    |
+ *    |
+ *    |
+ *    |  * left                                        * B      * A
+ *    |
+ *    +------------------------------------------------------------------
+ *
+ * The area of the triangle with vertices (left, mid, right) is the error
+ * incurred over the interval [left, right] were we to interpolate with just
+ * [left, right] rather than [left, mid) and [mid, right).
+ */
+static float
+lsn_ts_calculate_error_area(LSNTime *left, LSNTime *mid, LSNTime *right)
+{
+	float		left_time = left->time,
+				left_lsn = left->lsn;
+	float		mid_time = mid->time,
+				mid_lsn = mid->lsn;
+	float		right_time = right->time,
+				right_lsn = right->lsn;
+
+	/* Area of the rectangle with opposing corners left and right */
+	float		rectangle_all = (right_time - left_time) * (right_lsn - left_lsn);
+
+	/* Area of the right triangle with vertices left, right, and A */
+	float		triangle1 = rectangle_all / 2;
+
+	/* Area of the right triangle with vertices left, mid, and B */
+	float		triangle2 = (mid_lsn - left_lsn) * (mid_time - left_time) / 2;
+
+	/* Area of the right triangle with vertices mid, right, and C */
+	float		triangle3 = (right_lsn - mid_lsn) * (right_time - mid_time) / 2;
+
+	/* Area of the rectangle with vertices mid, A, B, and C */
+	float		rectangle_part = (right_lsn - mid_lsn) * (mid_time - left_time);
+
+	/* Area of the triangle with vertices left, mid, and right */
+	return triangle1 - triangle2 - triangle3 - rectangle_part;
+}
+
+/*
+ * Determine which LSNTime to drop from a full LSNTimeStream. Once the LSNTime
+ * is dropped, points between it and either of its adjacent LSNTimes will be
+ * interpolated between those two LSNTimes instead. To keep the LSNTimeStream
+ * as accurate as possible, drop the LSNTime whose absence would have the least
+ * impact on future interpolations.
+ *
+ * We determine the error that would be introduced by dropping a point on the
+ * stream by calculating the area of the triangle formed by the LSNTime and its
+ * adjacent LSNTimes. We do this for each LSNTime in the stream (except for the
+ * first and last LSNTimes) and choose the LSNTime with the smallest error
+ * (area). We avoid extrapolation by never dropping the first or last points.
+ */
+static unsigned int
+lsntime_to_drop(LSNTimeStream *stream)
+{
+	double		min_area;
+	unsigned int target_point;
+
+	/* Don't drop points if free space available */
+	Assert(stream->length == LSNTIMESTREAM_VOLUME);
+
+	min_area = lsn_ts_calculate_error_area(&stream->data[0],
+										   &stream->data[1],
+										   &stream->data[2]);
+
+	target_point = 1;
+
+	for (int i = 1; i < stream->length - 1; i++)
+	{
+		LSNTime    *left = &stream->data[i - 1];
+		LSNTime    *mid = &stream->data[i];
+		LSNTime    *right = &stream->data[i + 1];
+		float		area = lsn_ts_calculate_error_area(left, mid, right);
+
+		if (fabs(area) < fabs(min_area))
+		{
+			min_area = area;
+			target_point = i;
+		}
+	}
+
+	return target_point;
+}
+
+/*
+ * Insert a new LSNTime into the LSNTimeStream in the first available element,
+ * or, if there are no empty elements, drop an LSNTime from the stream, move
+ * all the subsequent LSNTimes down and insert the new LSNTime into the tail.
+ */
+void
+lsntime_insert(LSNTimeStream *stream, TimestampTz time,
+			   XLogRecPtr lsn)
+{
+	unsigned int drop;
+	LSNTime		entrant = {.lsn = lsn,.time = time};
+
+	if (stream->length < LSNTIMESTREAM_VOLUME)
+	{
+		/*
+		 * The new entry should exceed the most recent entry to ensure time
+		 * moves forward on the stream.
+		 */
+		Assert(stream->length == 0 ||
+			   (lsn >= stream->data[stream->length - 1].lsn &&
+				time >= stream->data[stream->length - 1].time));
+
+		/*
+		 * If there are unfilled elements in the stream, insert the passed-in
+		 * LSNTime into the current tail of the array.
+		 */
+		stream->data[stream->length++] = entrant;
+		return;
+	}
+
+	drop = lsntime_to_drop(stream);
+
+	/*
+	 * Drop the LSNTime at index drop by copying the array from drop - 1 to
+	 * drop
+	 */
+	memmove(&stream->data[drop],
+			&stream->data[drop + 1],
+			sizeof(LSNTime) * (stream->length - 1 - drop));
+
+	stream->data[stream->length - 1] = entrant;
+}
+
+/*
+ * Translate time to a LSN using the provided stream. The stream will not
+ * be modified.
+ */
+XLogRecPtr
+estimate_lsn_at_time(const LSNTimeStream *stream, TimestampTz time)
+{
+	XLogRecPtr	result;
+	int64		time_elapsed,
+				lsns_elapsed;
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the database has been restarted, PgStartLSN may be after our oldest
+	 * value. In that case, use the oldest value in the time stream as the
+	 * start.
+	 */
+	if (stream->length > 0 && start.time > stream->data[0].time)
+		start = stream->data[0];
+
+	/*
+	 * If the LSN is before our oldest known LSN, the best we can do is return
+	 * our oldest known time.
+	 */
+	if (time < start.time)
+		return start.lsn;
+
+	/*
+	 * If the provided time is after now, the current LSN is our best
+	 * estimate.
+	 */
+	if (time >= end.time)
+		return end.lsn;
+
+	/*
+	 * Loop through the stream. Stop at the first LSNTime later than our
+	 * target time. This LSNTime will be our interpolation end point. If
+	 * there's an LSNTime earlier than that, that will be our interpolation
+	 * start point.
+	 */
+	for (int i = 0; i < stream->length; i++)
+	{
+		if (stream->data[i].time < time)
+			continue;
+
+		end = stream->data[i];
+		if (i > 0)
+			start = stream->data[i - 1];
+		goto stop;
+	}
+
+	/*
+	 * If we exhausted the stream, then use its latest LSNTime as our
+	 * interpolation start point.
+	 */
+	if (stream->length > 0)
+		start = stream->data[stream->length - 1];
+
+stop:
+
+	/*
+	 * In rare cases, the start and end LSN could be the same. If, for
+	 * example, no new records have been inserted since the last one recorded
+	 * in the LSNTimeStream and we are looking for the LSN corresponding to
+	 * the current time.
+	 */
+	if (end.lsn == start.lsn)
+		return end.lsn;
+
+	Assert(end.lsn > start.lsn);
+
+	/*
+	 * It should be extremely rare (if not impossible) for the start time and
+	 * end time to be the same. In this case, just return an LSN halfway
+	 * between the two.
+	 */
+	if (end.time == start.time)
+		return start.lsn + ((end.lsn - start.lsn) / 2);
+
+	Assert(end.time > start.time);
+
+	time_elapsed = end.time - start.time;
+	lsns_elapsed = end.lsn - start.lsn;
+
+	result = (double) (time - start.time) / time_elapsed * lsns_elapsed + start.lsn;
+	return Max(result, 0);
+}
+
+/*
+ * Translate lsn to a time using the provided stream. The stream will not
+ * be modified.
+ */
+TimestampTz
+estimate_time_at_lsn(const LSNTimeStream *stream, XLogRecPtr lsn)
+{
+	int64		time_elapsed,
+				lsns_elapsed;
+	TimestampTz result;
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the database has been restarted, PgStartLSN may be after our oldest
+	 * value. In that case, use the oldest value in the time stream as the
+	 * start.
+	 */
+	if (stream->length > 0 && start.time > stream->data[0].time)
+		start = stream->data[0];
+
+	/*
+	 * If the LSN is before our oldest known LSN, the best we can do is return
+	 * our oldest known time.
+	 */
+	if (lsn < start.lsn)
+		return start.time;
+
+	/*
+	 * If the target LSN is after the current insert LSN, the current time is
+	 * our best estimate.
+	 */
+	if (lsn >= end.lsn)
+		return end.time;
+
+	/*
+	 * Loop through the stream. Stop at the first LSNTime later than our
+	 * target time. This LSNTime will be our interpolation end point. If
+	 * there's an LSNTime earlier than that, that will be our interpolation
+	 * start point.
+	 */
+	for (int i = 0; i < stream->length; i++)
+	{
+		if (stream->data[i].lsn < lsn)
+			continue;
+
+		end = stream->data[i];
+		if (i > 0)
+			start = stream->data[i - 1];
+		goto stop;
+	}
+
+	/*
+	 * If we exhausted the stream, then use its earliest LSNTime as our
+	 * interpolation end point.
+	 */
+	if (stream->length > 0)
+		start = stream->data[stream->length - 1];
+
+stop:
+
+	/* It should be nearly impossible to have the same start and end time. */
+	if (end.time == start.time)
+		return end.time;
+
+	Assert(end.time > start.time);
+
+	/*
+	 * In rare cases, the start and end LSN could be the same. If, for
+	 * example, no new records have been inserted since the last one recorded
+	 * in the LSNTimeStream and we are looking for the LSN corresponding to
+	 * the current time. In this case, just return a time halfway between
+	 * start and end.
+	 */
+	if (end.lsn == start.lsn)
+		return start.time + ((end.time - start.time) / 2);
+
+	Assert(end.lsn > start.lsn);
+
+	time_elapsed = end.time - start.time;
+	lsns_elapsed = end.lsn - start.lsn;
+
+	result = (double) (lsn - start.lsn) / lsns_elapsed * time_elapsed + start.time;
+	return Max(result, 0);
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6b99bb8aadf..825cdc8f73a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -11,6 +11,7 @@
 #ifndef PGSTAT_H
 #define PGSTAT_H
 
+#include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
@@ -428,6 +429,37 @@ typedef struct PgStat_StatTabEntry
 	PgStat_Counter autoanalyze_count;
 } PgStat_StatTabEntry;
 
+/*
+ * The elements of an LSNTimeStream. Each LSNTime represents one or more time,
+ * LSN pairs. The LSN is typically the insert LSN recorded at the time.
+ */
+typedef struct LSNTime
+{
+	TimestampTz time;
+	XLogRecPtr	lsn;
+} LSNTime;
+
+#define LSNTIMESTREAM_VOLUME 64
+
+/*
+ * An LSN time stream is an array consisting of LSNTimes from least to most
+ * recent. The array is filled before any element is dropped. Once the
+ * LSNTimeStream length == volume (the array is full), an LSNTime is dropped,
+ * the subsequent LSNTimes are moved down by 1, and the new LSNTime is inserted
+ * at the tail.
+ *
+ * When dropping an LSNTime, we attempt to pick the member which would
+ * introduce the least error into the stream. See lsntime_to_drop() for more
+ * details.
+ *
+ * Use the stream for LSN <-> time conversion using linear interpolation.
+ */
+typedef struct LSNTimeStream
+{
+	int			length;
+	LSNTime		data[LSNTIMESTREAM_VOLUME];
+} LSNTimeStream;
+
 typedef struct PgStat_WalStats
 {
 	PgStat_Counter wal_records;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8de9978ad8d..d924855069c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1587,6 +1587,8 @@ LogicalTapeSet
 LsnReadQueue
 LsnReadQueueNextFun
 LsnReadQueueNextStatus
+LSNTime
+LSNTimeStream
 LtreeGistOptions
 LtreeSignature
 MAGIC
-- 
2.34.1

