From 54d6baf71e0e73131bb03fe641fd9bdaddf18a93 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 5 Aug 2024 20:29:51 -0400
Subject: [PATCH v7 1/4] Add LSNTimeStream API for converting LSN <-> time

Add a new structure, LSNTimeStream, consisting of LSNTimes -- each an
LSN, time pair. This structure is intended to reflect the WAL generation
rate. It can be used to determine a time range in which an LSN was
inserted or an LSN range covering a particular time. These could be used
to interpolate a more specific point in the range.

It produces ranges and not specific time <-> LSN conversions because an
LSNTimeStream is lossy. An LSNTimeStream is fixed size, so when a new
LSNTime is inserted to a full LSNTimeStream, an LSNTime is dropped and
the new LSNTime is inserted. We drop the LSNTime whose absence would
cause the least error when interpolating between its adjoining points.

This commit does not add any instances of LSNTimeStream.
---
 src/backend/utils/activity/pgstat_wal.c | 414 ++++++++++++++++++++++++
 src/include/pgstat.h                    |  45 +++
 src/tools/pgindent/typedefs.list        |   2 +
 3 files changed, 461 insertions(+)

diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index e2a3f6b865c..95ec65a51ff 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -17,8 +17,11 @@
 
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "executor/instrument.h"
+#include "math.h"
 #include "utils/pgstat_internal.h"
+#include "utils/timestamp.h"
 
 
 PgStat_PendingWalStats PendingWalStats = {0};
@@ -31,6 +34,23 @@ PgStat_PendingWalStats PendingWalStats = {0};
  */
 static WalUsage prevWalUsage;
 
+static double lsn_ts_calculate_error_area(LSNTime *left,
+										  LSNTime *mid,
+										  LSNTime *right);
+static unsigned char lsntime_to_drop(LSNTimeStream *stream);
+static void lsntime_insert(LSNTimeStream *stream, TimestampTz time,
+						   XLogRecPtr lsn);
+
+static void stream_get_bounds_for_lsn(const LSNTimeStream *stream,
+									  XLogRecPtr target_lsn,
+									  LSNTime *lower,
+									  LSNTime *upper);
+
+static void stream_get_bounds_for_time(const LSNTimeStream *stream,
+									   TimestampTz target_time,
+									   LSNTime *lower,
+									   LSNTime *upper);
+
 
 /*
  * Calculate how much WAL usage counters have increased and update
@@ -192,3 +212,397 @@ pgstat_wal_snapshot_cb(void)
 		   sizeof(pgStatLocal.snapshot.wal));
 	LWLockRelease(&stats_shmem->lock);
 }
+
+/*
+ * Given three LSNTimes, calculate the area of the triangle they form were
+ * they plotted with time on the X axis and LSN on the Y axis. An
+ * illustration:
+ *
+ *   LSN
+ *    |
+ *    |                                                         * right
+ *    |
+ *    |
+ *    |
+ *    |                                                * mid    * C
+ *    |
+ *    |
+ *    |
+ *    |  * left                                        * B      * A
+ *    |
+ *    +------------------------------------------------------------------
+ *
+ * The area of the triangle with vertices (left, mid, right) is the error
+ * incurred over the interval [left, right] were we to interpolate with just
+ * [left, right] rather than [left, mid] and [mid, right].
+ */
+static double
+lsn_ts_calculate_error_area(LSNTime *left, LSNTime *mid, LSNTime *right)
+{
+	double		left_time = left->time,
+				left_lsn = left->lsn;
+	double		mid_time = mid->time,
+				mid_lsn = mid->lsn;
+	double		right_time = right->time,
+				right_lsn = right->lsn;
+	double		rectangle_all,
+				triangle1,
+				triangle2,
+				triangle3,
+				rectangle_part,
+				area_to_subtract;
+
+	/* Area of the rectangle with opposing corners left and right */
+	rectangle_all = (right_time - left_time) * (right_lsn - left_lsn);
+
+	/* Area of the right triangle with vertices left, right, and A */
+	triangle1 = rectangle_all / 2;
+
+	/* Area of the right triangle with vertices left, mid, and B */
+	triangle2 = (mid_lsn - left_lsn) * (mid_time - left_time) / 2;
+
+	/* Area of the right triangle with vertices mid, right, and C */
+	triangle3 = (right_lsn - mid_lsn) * (right_time - mid_time) / 2;
+
+	/* Area of the rectangle with vertices mid, A, B, and C */
+	rectangle_part = (right_lsn - mid_lsn) * (mid_time - left_time);
+
+	/* Sum up the area to subtract first to produce a more precise answer */
+	area_to_subtract = triangle2 + triangle3 + rectangle_part;
+
+	/* Area of the triangle with vertices left, mid, and right */
+	return fabs(triangle1 - area_to_subtract);
+}
+
+/*
+ * Determine which LSNTime to drop from a full LSNTimeStream.
+ * Drop the LSNTime whose absence would introduce the least error into future
+ * linear interpolation on the stream.
+ *
+ * We determine the error that would be introduced by dropping a point on the
+ * stream by calculating the area of the triangle formed by the LSNTime and
+ * its adjacent LSNTimes. We do this for each LSNTime in the stream (except
+ * for the first and last LSNTimes) and choose the LSNTime with the smallest
+ * error (area).
+ *
+ * We avoid extrapolation by never dropping the first or last points.
+ */
+static unsigned char
+lsntime_to_drop(LSNTimeStream *stream)
+{
+	double		min_area;
+	unsigned char target_point;
+
+	/* Don't drop points if free spots available are available */
+	Assert(stream->length == LSNTIMESTREAM_VOLUME);
+	StaticAssertStmt(LSNTIMESTREAM_VOLUME >= 3, "LSNTIMESTREAM_VOLUME < 3");
+
+	min_area = lsn_ts_calculate_error_area(&stream->data[0],
+										   &stream->data[1],
+										   &stream->data[2]);
+
+	target_point = 1;
+
+	for (size_t i = 2; i < stream->length - 1; i++)
+	{
+		LSNTime    *left = &stream->data[i - 1];
+		LSNTime    *mid = &stream->data[i];
+		LSNTime    *right = &stream->data[i + 1];
+		double		area = lsn_ts_calculate_error_area(left, mid, right);
+
+		if (area < min_area)
+		{
+			min_area = area;
+			target_point = i;
+		}
+	}
+
+	return target_point;
+}
+
+/*
+ * Insert a new LSNTime into the LSNTimeStream in the first available element.
+ * If there are no empty elements, drop an LSNTime from the stream to make
+ * room for the new LSNTime.
+ */
+static void
+lsntime_insert(LSNTimeStream *stream, TimestampTz time,
+			   XLogRecPtr lsn)
+{
+	unsigned char drop;
+	LSNTime		entrant = {.lsn = lsn,.time = time};
+
+	if (stream->length < LSNTIMESTREAM_VOLUME)
+	{
+		/*
+		 * Time must move forward on the stream. If the clock moves backwards,
+		 * for example in an NTP correction, we'll just skip inserting this
+		 * LSNTime.
+		 *
+		 * Translating LSN <-> time is most meaningful if the LSNTimeStream
+		 * entries are the position of a single location in the WAL over time.
+		 * Though time must monotonically increase, it is valid to insert
+		 * multiple LSNTimes with the same LSN. Imagine a period of time in
+		 * which no new WAL records are inserted.
+		 */
+		if (stream->length > 0 &&
+			(time <= stream->data[stream->length - 1].time ||
+			 lsn < stream->data[stream->length - 1].lsn))
+		{
+			ereport(WARNING,
+					errmsg("Won't insert non-monotonic \"%lu, %s\" to LSNTimeStream.",
+						   lsn, timestamptz_to_str(time)));
+			return;
+		}
+
+		stream->data[stream->length++] = entrant;
+		return;
+	}
+
+	drop = lsntime_to_drop(stream);
+
+	memmove(&stream->data[drop],
+			&stream->data[drop + 1],
+			sizeof(LSNTime) * (stream->length - 1 - drop));
+
+	stream->data[stream->length - 1] = entrant;
+}
+
+
+/*
+ * Returns a range of LSNTimes starting at lower and ending at upper and
+ * covering the target_time. If target_time is before the stream, lower will
+ * contain the minimum values for the datatypes. If target_time is newer than
+ * the stream, upper will contain the maximum values for the datatypes.
+ */
+static void
+stream_get_bounds_for_time(const LSNTimeStream *stream,
+						   TimestampTz target_time,
+						   LSNTime *lower,
+						   LSNTime *upper)
+{
+	Assert(lower && upper);
+
+	/*
+	 * If the target_time is "off the stream" -- either the stream has no
+	 * members or the target_time is older than all values in the stream or
+	 * newer than all values -- the lower and/or upper bounds may be the min
+	 * or max value for the datatypes, respectively.
+	 */
+	*lower = LSNTIME_INIT(InvalidXLogRecPtr, INT64_MIN);
+	*upper = LSNTIME_INIT(UINT64_MAX, INT64_MAX);
+
+	/*
+	 * If the LSNTimeStream has no members, it provides no information about
+	 * the range.
+	 */
+	if (stream->length == 0)
+	{
+		elog(DEBUG1,
+			 "Attempt to identify LSN bounds for time: \"%s\" using empty LSNTimeStream.",
+			 timestamptz_to_str(target_time));
+		return;
+	}
+
+	/*
+	 * If the target_time is older than the stream, the oldest member in the
+	 * stream is our upper bound.
+	 */
+	if (target_time <= stream->data[0].time)
+	{
+		*upper = stream->data[0];
+		if (target_time == stream->data[0].time)
+			*lower = stream->data[0];
+		return;
+	}
+
+	/*
+	 * Loop through the stream and stop at the first LSNTime newer than or
+	 * equal to our target time. Skip the first LSNTime, as we know it is
+	 * older than our target time.
+	 */
+	for (size_t i = 1; i < stream->length; i++)
+	{
+		if (target_time == stream->data[i].time)
+		{
+			*lower = stream->data[i];
+			*upper = stream->data[i];
+			return;
+		}
+
+		if (target_time < stream->data[i].time)
+		{
+			/* Time must increase monotonically on the stream. */
+			Assert(stream->data[i - 1].time <
+				   stream->data[i].time);
+			*lower = stream->data[i - 1];
+			*upper = stream->data[i];
+			return;
+		}
+	}
+
+	/*
+	 * target_time is newer than the stream, so the newest member in the
+	 * stream is our lower bound.
+	 */
+	*lower = stream->data[stream->length - 1];
+}
+
+/*
+ * Try to find an upper and lower bound for the possible LSN values at the
+ * provided target_time. If the target_time doesn't fall on the provided
+ * LSNTimeStream, we compare the target_time to the current time and see if we
+ * can fill in a missing boundary. Note that we do not consult the
+ * current time if the target_time fell on the stream -- even if doing so
+ * might provide a tighter range.
+ */
+void
+lsn_bounds_for_time(const LSNTimeStream *stream, TimestampTz target_time,
+					LSNTime *lower, LSNTime *upper)
+{
+	TimestampTz current_time;
+	XLogRecPtr	current_lsn;
+
+	stream_get_bounds_for_time(stream, target_time, lower, upper);
+
+	/*
+	 * We found valid upper and lower bounds for target_time, so we're done.
+	 */
+	if (lower->lsn != InvalidXLogRecPtr && upper->lsn != UINT64_MAX)
+		return;
+
+	/*
+	 * The target_time was either off the stream or the stream has no members.
+	 * In either case, see if we can use the current time and LSN to provide
+	 * one (or both) of the bounds.
+	 */
+	current_time = GetCurrentTimestamp();
+	current_lsn = GetXLogInsertRecPtr();
+
+	if (lower->lsn == InvalidXLogRecPtr && target_time >= current_time)
+		*lower = LSNTIME_INIT(current_lsn, current_time);
+
+	if (upper->lsn == UINT64_MAX && target_time <= current_time)
+		*upper = LSNTIME_INIT(current_lsn, current_time);
+
+	Assert(upper->lsn >= lower->lsn);
+}
+
+/*
+ * Returns a range of LSNTimes starting at lower and ending at upper and
+ * covering the target_lsn. If target_lsn is before the stream, lower will
+ * contain the minimum values for the datatypes. If target_time is newer than
+ * the stream, upper will contain the maximum values for the datatypes.
+ */
+static void
+stream_get_bounds_for_lsn(const LSNTimeStream *stream,
+						  XLogRecPtr target_lsn,
+						  LSNTime *lower,
+						  LSNTime *upper)
+{
+	Assert(lower && upper);
+
+	/*
+	 * If the target_time is "off the stream" -- either the stream has no
+	 * members or the target_time is older than all values in the stream or
+	 * newer than all values -- the lower and/or upper bounds may be the min
+	 * or max value for the datatypes, respectively.
+	 */
+	*lower = LSNTIME_INIT(InvalidXLogRecPtr, INT64_MIN);
+	*upper = LSNTIME_INIT(UINT64_MAX, INT64_MAX);
+
+	/*
+	 * If the LSNTimeStream has no members, it provides no information about
+	 * the range.
+	 */
+	if (stream->length == 0)
+	{
+		elog(DEBUG1,
+			 "Attempt to identify time bounds for LSN: \"%lu\" using empty LSNTimeStream.",
+			 target_lsn);
+		return;
+	}
+
+	/*
+	 * If the target_lsn is older than the stream, the oldest member in the
+	 * stream is our upper bound.
+	 */
+	if (target_lsn <= stream->data[0].lsn)
+	{
+		*upper = stream->data[0];
+		if (target_lsn == stream->data[0].lsn)
+			*lower = stream->data[0];
+		return;
+	}
+
+	/*
+	 * Loop through the stream and stop at the first LSNTime newer than or
+	 * equal to our target time. Skip the first LSNTime, as we know it is
+	 * older than our target time.
+	 */
+	for (size_t i = 1; i < stream->length; i++)
+	{
+		if (target_lsn == stream->data[i].lsn)
+		{
+			*lower = stream->data[i - 1];
+			*upper = stream->data[i];
+			return;
+		}
+
+		if (target_lsn < stream->data[i].lsn)
+		{
+			/* LSNs must not decrease on the stream. */
+			Assert(stream->data[i - 1].lsn <=
+				   stream->data[i].lsn);
+			*lower = stream->data[i - 1];
+			*upper = stream->data[i];
+			return;
+		}
+	}
+
+	/*
+	 * target_lsn is newer than the stream, so the newest member in the stream
+	 * is our lower bound.
+	 */
+	*lower = stream->data[stream->length - 1];
+}
+
+/*
+ * Try to find an upper and lower bound for the possible times covering the
+ * provided target_lsn. If the target_lsn doesn't fall on the provided
+ * LSNTimeStream, we compare the target_lsn to the current insert LSN and see
+ * if we can fill in a missing boundary. Note that we do not consult
+ * the current insert LSN if the target_lsn fell on the stream -- even if
+ * doing so might provide a tighter range.
+ */
+void
+time_bounds_for_lsn(const LSNTimeStream *stream, XLogRecPtr target_lsn,
+					LSNTime *lower, LSNTime *upper)
+{
+	TimestampTz current_time;
+	XLogRecPtr	current_lsn;
+
+	stream_get_bounds_for_lsn(stream, target_lsn, lower, upper);
+
+	/*
+	 * We found valid upper and lower bounds for target_lsn, so we're done.
+	 */
+	if (lower->time != INT64_MIN && upper->time != INT64_MAX)
+		return;
+
+	/*
+	 * The target_lsn was either off the stream or the stream has no members.
+	 * In either case, see if we can use the current time and LSN to provide
+	 * one (or both) of the bounds.
+	 */
+	current_time = GetCurrentTimestamp();
+	current_lsn = GetXLogInsertRecPtr();
+
+	if (lower->time == INT64_MIN && target_lsn >= current_lsn)
+		*lower = LSNTIME_INIT(current_lsn, current_time);
+
+	if (upper->time == INT64_MAX && target_lsn <= current_lsn)
+		*upper = LSNTIME_INIT(current_lsn, current_time);
+
+	Assert(upper->time >= lower->time);
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55ca..13856e2bef3 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -458,6 +458,45 @@ typedef struct PgStat_StatTabEntry
 	PgStat_Counter autoanalyze_count;
 } PgStat_StatTabEntry;
 
+/*
+ * The elements of an LSNTimeStream. For the LSNTimeStream to be meaningful,
+ * the lsn should be a consistent position in the WAL over time (e.g. the
+ * insert LSN at each time in the stream or the flush LSN at each time).
+ */
+typedef struct LSNTime
+{
+	TimestampTz time;
+	XLogRecPtr	lsn;
+} LSNTime;
+
+/*
+ * Convenience macro returning an LSNTime with the time and LSN set to the
+ * passed in values.
+ */
+#define LSNTIME_INIT(i_lsn, i_time) \
+	((LSNTime) { .lsn = (i_lsn), .time = (i_time) })
+
+#define LSNTIMESTREAM_VOLUME 64
+
+/*
+ * An LSN time stream is an array consisting of LSNTimes from least to most
+ * recent. The array is filled before any element is dropped. Once the
+ * LSNTimeStream length == volume (the array is full), an LSNTime is dropped,
+ * the subsequent LSNTimes are moved down by 1, and the new LSNTime is
+ * inserted at the tail.
+ *
+ * When dropping an LSNTime, we attempt to pick the member which would
+ * introduce the least error into the stream. See lsntime_to_drop() for more
+ * details.
+ *
+ * Use the stream for LSN <-> time conversions.
+ */
+typedef struct LSNTimeStream
+{
+	unsigned char length;
+	LSNTime		data[LSNTIMESTREAM_VOLUME];
+} LSNTimeStream;
+
 typedef struct PgStat_WalStats
 {
 	PgStat_Counter wal_records;
@@ -749,6 +788,12 @@ extern void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_
 
 extern void pgstat_report_wal(bool force);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
+extern void lsn_bounds_for_time(const LSNTimeStream *stream,
+								TimestampTz target_time,
+								LSNTime *lower, LSNTime *upper);
+extern void time_bounds_for_lsn(const LSNTimeStream *stream,
+								XLogRecPtr target_lsn,
+								LSNTime *lower, LSNTime *upper);
 
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 547d14b3e7c..c8d84122976 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1587,6 +1587,8 @@ LogicalTapeSet
 LsnReadQueue
 LsnReadQueueNextFun
 LsnReadQueueNextStatus
+LSNTime
+LSNTimeStream
 LtreeGistOptions
 LtreeSignature
 MAGIC
-- 
2.34.1

