From f492af31c1b9917aa27ba3ad76560e59f3fd5c9b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 27 Dec 2023 16:40:27 -0500
Subject: [PATCH v5 2/6] Add LSNTimeStream for converting LSN <-> time

Add a new structure, LSNTimeStream, consisting of LSNTimes -- each an
LSN, time pair. The LSNTimeStream is fixed size, so when a new LSNTime
is inserted to a full LSNTimeStream, an LSNTime is dropped and the new
LSNTime is inserted. We drop the LSNTime whose absence would cause the
least error when interpolating between its adjoining points.

LSN <-> time conversions can be done using linear interpolation with two
LSNTimes on the LSNTimeStream.

This commit does not add a global instance of LSNTimeStream. It adds the
structures and functions needed to maintain and access such a stream.
---
 src/backend/utils/activity/pgstat_wal.c | 233 ++++++++++++++++++++++++
 src/include/pgstat.h                    |  32 ++++
 src/tools/pgindent/typedefs.list        |   2 +
 3 files changed, 267 insertions(+)

diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index 0e374f133a9..cef9429994c 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -17,8 +17,11 @@
 
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "executor/instrument.h"
+#include "utils/builtins.h"
 #include "utils/pgstat_internal.h"
+#include "utils/timestamp.h"
 
 
 PgStat_PendingWalStats PendingWalStats = {0};
@@ -32,6 +35,11 @@ PgStat_PendingWalStats PendingWalStats = {0};
 static WalUsage prevWalUsage;
 
 
+static void lsntime_insert(LSNTimeStream *stream, TimestampTz time, XLogRecPtr lsn);
+
+XLogRecPtr	estimate_lsn_at_time(const LSNTimeStream *stream, TimestampTz time);
+TimestampTz estimate_time_at_lsn(const LSNTimeStream *stream, XLogRecPtr lsn);
+
 /*
  * Calculate how much WAL usage counters have increased and update
  * shared WAL and IO statistics.
@@ -184,3 +192,228 @@ pgstat_wal_snapshot_cb(void)
 		   sizeof(pgStatLocal.snapshot.wal));
 	LWLockRelease(&stats_shmem->lock);
 }
+
+/*
+ * Given three LSNTimes, calculate the area of the triangle they form were they
+ * plotted with time on the X axis and LSN on the Y axis.
+ */
+static int
+lsn_ts_calculate_error_area(LSNTime *left, LSNTime *mid, LSNTime *right)
+{
+	int			rectangle_all = (right->time - left->time) * (right->lsn - left->lsn);
+	int			triangle1 = rectangle_all / 2;
+	int			triangle2 = (mid->lsn - left->lsn) * (mid->time - left->time) / 2;
+	int			triangle3 = (right->lsn - mid->lsn) * (right->time - mid->time) / 2;
+	int			rectangle_part = (right->lsn - mid->lsn) * (mid->time - left->time);
+
+	return rectangle_all - triangle1 - triangle2 - triangle3 - rectangle_part;
+}
+
+/*
+ * Determine which LSNTime to drop from a full LSNTimeStream. Once the LSNTime
+ * is dropped, points between it and either of its adjacent LSNTimes will be
+ * interpolated between those two LSNTimes instead. To keep the LSNTimeStream
+ * as accurate as possible, drop the LSNTime whose absence would have the least
+ * impact on future interpolations.
+ *
+ * We determine the error that would be introduced by dropping a point on the
+ * stream by calculating the area of the triangle formed by the LSNTime and its
+ * adjacent LSNTimes. We do this for each LSNTime in the stream (except for the
+ * first and last LSNTimes) and choose the LSNTime with the smallest error
+ * (area). We avoid extrapolation by never dropping the first or last points.
+ */
+static int
+lsntime_to_drop(LSNTimeStream *stream)
+{
+	int			min_area = INT_MAX;
+	int			target_point = stream->length - 1;
+
+	/* Don't drop points if free space available */
+	Assert(stream->length == LSNTIMESTREAM_VOLUME);
+
+	for (int i = stream->length - 1; i-- > 0;)
+	{
+		LSNTime    *left = &stream->data[i - 1];
+		LSNTime    *mid = &stream->data[i];
+		LSNTime    *right = &stream->data[i + 1];
+		int			area = lsn_ts_calculate_error_area(left, mid, right);
+
+		if (abs(area) < abs(min_area))
+		{
+			min_area = area;
+			target_point = i;
+		}
+	}
+
+	return target_point;
+}
+
+/*
+ * Insert a new LSNTime into the LSNTimeStream in the first available element,
+ * or, if there are no empty elements, drop an LSNTime from the stream, move
+ * all LSNTimes down and insert the new LSNTime into the element at index 0.
+ */
+void
+lsntime_insert(LSNTimeStream *stream, TimestampTz time,
+			   XLogRecPtr lsn)
+{
+	int			drop;
+	LSNTime		entrant = {.lsn = lsn,.time = time};
+
+	if (stream->length < LSNTIMESTREAM_VOLUME)
+	{
+		/*
+		 * The new entry should exceed the most recent entry to ensure time
+		 * moves forward on the stream.
+		 */
+		Assert(stream->length == 0 ||
+			   (lsn >= stream->data[LSNTIMESTREAM_VOLUME - stream->length].lsn &&
+				time >= stream->data[LSNTIMESTREAM_VOLUME - stream->length].time));
+
+		/*
+		 * If there are unfilled elements in the stream, insert the passed-in
+		 * LSNTime into the tail of the array.
+		 */
+		stream->length++;
+		stream->data[LSNTIMESTREAM_VOLUME - stream->length] = entrant;
+		return;
+	}
+
+	drop = lsntime_to_drop(stream);
+	if (drop < 0 || drop >= stream->length)
+	{
+		elog(WARNING, "Unable to insert LSNTime to LSNTimeStream. Drop failed.");
+		return;
+	}
+
+	/*
+	 * Drop the LSNTime at index drop by copying the array from drop - 1 to
+	 * drop
+	 */
+	memmove(&stream->data[1], &stream->data[0], sizeof(LSNTime) * drop);
+	stream->data[0] = entrant;
+}
+
+/*
+ * Translate time to a LSN using the provided stream. The stream will not
+ * be modified.
+ */
+XLogRecPtr
+estimate_lsn_at_time(const LSNTimeStream *stream, TimestampTz time)
+{
+	XLogRecPtr	result;
+	int64		time_elapsed,
+				lsns_elapsed;
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the provided time is before DB startup, the best we can do is return
+	 * the start LSN.
+	 */
+	if (time < start.time)
+		return start.lsn;
+
+	/*
+	 * If the provided time is after now, the current LSN is our best
+	 * estimate.
+	 */
+	if (time >= end.time)
+		return end.lsn;
+
+	/*
+	 * Loop through the stream. Stop at the first LSNTime earlier than our
+	 * target time. This LSNTime will be our interpolation start point. If
+	 * there's an LSNTime later than that, then that will be our interpolation
+	 * end point.
+	 */
+	for (int i = LSNTIMESTREAM_VOLUME - stream->length; i < LSNTIMESTREAM_VOLUME; i++)
+	{
+		if (stream->data[i].time > time)
+			continue;
+
+		start = stream->data[i];
+		if (i > LSNTIMESTREAM_VOLUME)
+			end = stream->data[i - 1];
+		goto stop;
+	}
+
+	/*
+	 * If we exhausted the stream, then use its earliest LSNTime as our
+	 * interpolation end point.
+	 */
+	if (stream->length > 0)
+		end = stream->data[LSNTIMESTREAM_VOLUME - 1];
+
+stop:
+	Assert(end.time > start.time);
+	Assert(end.lsn > start.lsn);
+	time_elapsed = end.time - start.time;
+	Assert(time_elapsed != 0);
+	lsns_elapsed = end.lsn - start.lsn;
+	Assert(lsns_elapsed != 0);
+	result = (double) (time - start.time) / time_elapsed * lsns_elapsed + start.lsn;
+	return Max(result, 0);
+}
+
+/*
+ * Translate lsn to a time using the provided stream. The stream will not
+ * be modified.
+ */
+TimestampTz
+estimate_time_at_lsn(const LSNTimeStream *stream, XLogRecPtr lsn)
+{
+	int64		time_elapsed,
+				lsns_elapsed;
+	TimestampTz result;
+	LSNTime		start = {.time = PgStartTime,.lsn = PgStartLSN};
+	LSNTime		end = {.time = GetCurrentTimestamp(),.lsn = GetXLogInsertRecPtr()};
+
+	/*
+	 * If the LSN is before DB startup, the best we can do is return that
+	 * time.
+	 */
+	if (lsn <= start.lsn)
+		return start.time;
+
+	/*
+	 * If the target LSN is after the current insert LSN, the current time is
+	 * our best estimate.
+	 */
+	if (lsn >= end.lsn)
+		return end.time;
+
+	/*
+	 * Loop through the stream. Stop at the first LSNTime earlier than our
+	 * target LSN. This LSNTime will be our interpolation start point. If
+	 * there's an LSNTime later than that, then that will be our interpolation
+	 * end point.
+	 */
+	for (int i = LSNTIMESTREAM_VOLUME - stream->length; i < LSNTIMESTREAM_VOLUME; i++)
+	{
+		if (stream->data[i].lsn > lsn)
+			continue;
+
+		start = stream->data[i];
+		if (i > LSNTIMESTREAM_VOLUME - stream->length)
+			end = stream->data[i - 1];
+		goto stop;
+	}
+
+	/*
+	 * If we exhausted the stream, then use its earliest LSNTime as our
+	 * interpolation end point.
+	 */
+	if (stream->length > 0)
+		end = stream->data[LSNTIMESTREAM_VOLUME - 1];
+
+stop:
+	Assert(end.time > start.time);
+	Assert(end.lsn > start.lsn);
+	time_elapsed = end.time - start.time;
+	Assert(time_elapsed != 0);
+	lsns_elapsed = end.lsn - start.lsn;
+	Assert(lsns_elapsed != 0);
+	result = (double) (lsn - start.lsn) / lsns_elapsed * time_elapsed + start.time;
+	return Max(result, 0);
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 2136239710e..af348be839c 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -11,6 +11,7 @@
 #ifndef PGSTAT_H
 #define PGSTAT_H
 
+#include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
@@ -428,6 +429,37 @@ typedef struct PgStat_StatTabEntry
 	PgStat_Counter autoanalyze_count;
 } PgStat_StatTabEntry;
 
+/*
+ * The elements of an LSNTimeStream. Each LSNTime represents one or more time,
+ * LSN pairs. The LSN is typically the insert LSN recorded at the time.
+ */
+typedef struct LSNTime
+{
+	TimestampTz time;
+	XLogRecPtr	lsn;
+} LSNTime;
+
+#define LSNTIMESTREAM_VOLUME 64
+
+/*
+ * An LSN time stream is an array consisting of LSNTimes from most to least
+ * recent. The array is filled from end to start before the contents of any
+ * elements are merged. Once the LSNTimeStream length == volume (the array is
+ * full), an LSNTime is dropped, the new LSNTime is added at index 0, and the
+ * intervening LSNTimes are moved down by one.
+ *
+ * When dropping an LSNTime, we attempt to pick the member which would
+ * introduce the least error into the stream. See lsntime_to_drop() for more
+ * details.
+ *
+ * Use the stream for LSN <-> time conversion using linear interpolation.
+ */
+typedef struct LSNTimeStream
+{
+	int			length;
+	LSNTime		data[LSNTIMESTREAM_VOLUME];
+} LSNTimeStream;
+
 typedef struct PgStat_WalStats
 {
 	PgStat_Counter wal_records;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 82b3b411fb5..a5851d44b16 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1584,6 +1584,8 @@ LogicalTapeSet
 LsnReadQueue
 LsnReadQueueNextFun
 LsnReadQueueNextStatus
+LSNTime
+LSNTimeStream
 LtreeGistOptions
 LtreeSignature
 MAGIC
-- 
2.34.1

