Clock-skew management in logical replication

Started by Nisha Moondover 1 year ago11 messages
#1Nisha Moond
nisha.moond412@gmail.com
1 attachment(s)

Hello Hackers,
(CC people involved in the earlier discussion)

While considering the implementation of timestamp-based conflict
resolution (last_update_wins) in logical replication (see [1]/messages/by-id/CAJpy0uD0-DpYVMtsxK5R=zszXauZBayQMAYET9sWr_w0CNWXxQ@mail.gmail.com), there
was a feedback at [2]/messages/by-id/CAFiTN-uTycjZWdp1kEpN9w7b7SQpoGL5zyg_qZzjpY_vr2+Ksg@mail.gmail.com and the discussion on whether or not to manage
clock-skew at database level. We tried to research the history of
clock-skew related discussions in Postgres itself and summarized that
at [3]/messages/by-id/CAA4eK1Jn4r-y+bkW=JaKCbxEz=jawzQAS1Z4wAd8jT+1B0RL2w@mail.gmail.com.

We also analyzed how other databases deal with it. Based on our
research, the other classic RDBMS like Oracle and IBM, using similar
timestamp-based resolution methods, do not address clock-skew at the
database level. Instead, they recommend using external time
synchronization solutions, such as NTP.

- Oracle while handling conflicts[2]/messages/by-id/CAFiTN-uTycjZWdp1kEpN9w7b7SQpoGL5zyg_qZzjpY_vr2+Ksg@mail.gmail.com assumes clocks are synchronized
and relies on external tools like NTP for time synchronization between
nodes[4]https://www.oracle.com/cn/a/tech/docs/technical-resources/wp-oracle-goldengate-activeactive-final2-1.pdf.
- IBM Informix, similarly, recommends using their network commands to
ensure clock synchronization across nodes[5]https://docs.oracle.com/en/operating-systems/oracle-linux/8/network/network-ConfiguringNetworkTime.html.

Other postgres dependent databases like EDB-BDR and YugabyteDB provide
GUC parameters to manage clock-skew within the database:

- EDB-BDR allows configuration of parameters like
bdr.maximum_clock_skew and bdr.maximum_clock_skew_action to define
acceptable skew and actions when it exceeds[6]https://www.ibm.com/docs/en/informix-servers/14.10?topic=environment-time-synchronization.
- YugabyteDB offers a GUC max_clock_skew_usec setting, which causes
the node to crash if the clock-skew exceeds the specified value[7]https://www.enterprisedb.com/docs/pgd/latest/reference/pgd-settings/#bdrmaximum_clock_skew.

There are, of course, other approaches to managing clock-skew used by
distributed systems, such as NTP daemons, centralized logical clocks,
atomic clocks (as in Google Spanner), and time sync services like
AWS[4]https://www.oracle.com/cn/a/tech/docs/technical-resources/wp-oracle-goldengate-activeactive-final2-1.pdf.

Implementing any of these time-sync services for CDR seems quite a bit
of deviation and a big project in itself, which we are not sure is
really needed. At best, for users' aid, we should provide some GUCs
based implementation to handle clock-skew in logical replication. The
idea is that users should be able to handle clock-skew outside of the
database. But in worst case scenarios, users can rely on these GUCs.

We have attempted to implement a patch which manages clock-skew in
logical replication. It works based on these new GUCs: (see [10]/messages/by-id/CAJpy0uDCW+vrBoUZWrBWPjsM=9wwpwbpZuZa8Raj3VqeVYs3PQ@mail.gmail.com for
detailed discussion)

- max_logical_rep_clock_skew: Defines the tolerable limit for clock-skew.
- max_logical_rep_clock_skew_action: Configures the action when
clock-skew exceeds the limit.
- max_logical_rep_clock_skew_wait: Limits the maximum wait time if the
action is configured as "wait."

The proposed idea is implemented in attached patch v1. Thank you
Shveta for implementing it.
Thanks Kuroda-san for assisting in the research.

Thoughts? Looking forward to hearing others' opinions!

[1]: /messages/by-id/CAJpy0uD0-DpYVMtsxK5R=zszXauZBayQMAYET9sWr_w0CNWXxQ@mail.gmail.com
[2]: /messages/by-id/CAFiTN-uTycjZWdp1kEpN9w7b7SQpoGL5zyg_qZzjpY_vr2+Ksg@mail.gmail.com
[3]: /messages/by-id/CAA4eK1Jn4r-y+bkW=JaKCbxEz=jawzQAS1Z4wAd8jT+1B0RL2w@mail.gmail.com
[4]: https://www.oracle.com/cn/a/tech/docs/technical-resources/wp-oracle-goldengate-activeactive-final2-1.pdf
[5]: https://docs.oracle.com/en/operating-systems/oracle-linux/8/network/network-ConfiguringNetworkTime.html
[6]: https://www.ibm.com/docs/en/informix-servers/14.10?topic=environment-time-synchronization
[7]: https://www.enterprisedb.com/docs/pgd/latest/reference/pgd-settings/#bdrmaximum_clock_skew
[8]: https://support.yugabyte.com/hc/en-us/articles/4403707404173-Too-big-clock-skew-leading-to-error-messages-or-tserver-crashes
[9]: https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-time-sync-service-microsecond-accurate-time/
[10]: /messages/by-id/CAJpy0uDCW+vrBoUZWrBWPjsM=9wwpwbpZuZa8Raj3VqeVYs3PQ@mail.gmail.com

--
Thanks,
Nisha

Attachments:

v1-0001-Implements-Clock-skew-management-between-nodes.patchapplication/octet-stream; name=v1-0001-Implements-Clock-skew-management-between-nodes.patchDownload
From f99fe4b164fcaa58675f832b1497d70b42c22c91 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Thu, 5 Sep 2024 15:31:38 +0530
Subject: [PATCH v1] Implements Clock-skew management between nodes.

This patch attempts to manage clock skew between nodes by
introducing three new GUCs:
a) max_logical_rep_clock_skew
b) max_logical_rep_clock_skew_action
c) max_logical_rep_clock_skew_wait

If the timestamp of the currently replayed transaction is in the future
compared to the current time on the subscriber and the difference is
larger than 'max_logical_rep_clock_skew', then the action configured
in 'max_logical_rep_clock_skew_action' is performed by the apply worker.

If user configures 'wait' in 'max_logical_rep_clock_skew_action', then
apply worker will wait during 'begin' of transaction to bring clock-skew
within permissible range of 'max_logical_rep_clock_skew'.

There could be cases where actual clock skew is large while the configured
'max_logical_rep_clock_skew' is small. Then the apply worker may have to
wait for a longer period to manage the clock skew. To control this
maximum wait time, a new GUC, 'max_logical_rep_clock_skew_wait' is
provided.  This allows the user to set a cap on how long the apply
worker should wait. If the computed wait time exceeds this value,
the apply worker will error out without waiting.
---
 .../replication/logical/applyparallelworker.c |  14 +-
 src/backend/replication/logical/worker.c      | 125 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  40 ++++++
 src/backend/utils/misc/postgresql.conf.sample |   9 +-
 src/include/replication/logicalworker.h       |  18 +++
 src/include/replication/worker_internal.h     |   2 +-
 src/include/utils/timestamp.h                 |   1 +
 src/tools/pgindent/typedefs.list              |   1 +
 9 files changed, 205 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index e7f7d4c5e4..eb68437654 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -312,6 +312,13 @@ pa_can_start(void)
 	if (!AllTablesyncsReady())
 		return false;
 
+	/*
+	 * Do not start a new parallel worker if user has configured max clock
+	 * skew, as we need the commit timestamp in the beginning.
+	 */
+	if ((max_logical_rep_clock_skew > LR_CLOCK_SKEW_DEFAULT))
+		return false;
+
 	return true;
 }
 
@@ -696,9 +703,14 @@ pa_process_spooled_messages_if_required(void)
 	}
 	else if (fileset_state == FS_READY)
 	{
+		/*
+		 * Currently we do not support starting parallel apply worker when
+		 * clock skew is configured, thus it is okay to pass 0 as
+		 * origin-timestamp here.
+		 */
 		apply_spooled_messages(&MyParallelShared->fileset,
 							   MyParallelShared->xid,
-							   InvalidXLogRecPtr);
+							   InvalidXLogRecPtr, 0);
 		pa_set_fileset_state(MyParallelShared, FS_EMPTY);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 925dff9cc4..454f07deb6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -318,6 +318,20 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool		InitializingApplyWorker = false;
 
+/*
+ * GUC support
+ */
+const struct config_enum_entry logical_rep_clock_skew_action_options[] = {
+	{"error", LR_CLOCK_SKEW_ACTION_ERROR, false},
+	{"wait", LR_CLOCK_SKEW_ACTION_WAIT, false},
+	{NULL, 0, false}
+};
+
+/* GUCs */
+int			max_logical_rep_clock_skew = LR_CLOCK_SKEW_DEFAULT;
+int			max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR;
+int			max_logical_rep_clock_skew_wait = 300;	/* 5 mins */
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -982,6 +996,95 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+/*
+ * Manage clock skew between nodes.
+ *
+ * It checks if the remote timestamp is ahead of the local clock
+ * and if the difference exceeds max_logical_rep_clock_skew, it performs
+ * the action specified by the max_logical_rep_clock_skew_action.
+ */
+static void
+manage_clock_skew(TimestampTz origin_timestamp)
+{
+	TimestampTz current;
+	TimestampTz delayUntil;
+	long		msecs;
+	int			rc;
+
+	/* nothing to do if no max clock skew configured */
+	if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT)
+		return;
+
+	current = GetCurrentTimestamp();
+
+	/*
+	 * If the timestamp of the currently replayed transaction is in the future
+	 * compared to the current time on the subscriber and the difference is
+	 * larger than max_logical_rep_clock_skew, then perform the action
+	 * specified by the max_logical_rep_clock_skew_action setting.
+	 */
+	if (origin_timestamp > current &&
+		TimestampDifferenceExceeds(current, origin_timestamp,
+								   max_logical_rep_clock_skew * 1000))
+	{
+		if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg_internal("clock skew exceeds max_logical_rep_clock_skew (%d seconds)",
+									 max_logical_rep_clock_skew)));
+
+		/* Perform the wait */
+		while (true)
+		{
+			delayUntil =
+				TimestampTzMinusSeconds(origin_timestamp,
+										max_logical_rep_clock_skew);
+
+			/* Exit without waiting if it's already past 'delayUntil' time */
+			msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
+													delayUntil);
+			if (msecs <= 0)
+				break;
+
+			/* The wait time should not exceed max_logical_rep_clock_skew_wait */
+			if (msecs > (max_logical_rep_clock_skew_wait * 1000L))
+				ereport(ERROR,
+						(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+						 errmsg_internal("clock skew wait time exceeds max_logical_rep_clock_skew_wait (%d seconds)",
+										 max_logical_rep_clock_skew_wait)));
+
+			elog(DEBUG2, "delaying apply for %ld milliseconds to manage clock skew",
+				 msecs);
+
+			/* Sleep until we are signaled or msecs have elapsed */
+			rc = WaitLatch(MyLatch,
+						   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+						   msecs,
+						   WAIT_EVENT_LOGICAL_CLOCK_SKEW);
+
+			/* Exit the loop if msecs have elapsed */
+			if (rc & WL_TIMEOUT)
+				break;
+
+			if (rc & WL_LATCH_SET)
+			{
+				ResetLatch(MyLatch);
+				CHECK_FOR_INTERRUPTS();
+			}
+
+			/*
+			 * This might change max_logical_rep_clock_skew and
+			 * max_logical_rep_clock_skew_wait.
+			 */
+			if (ConfigReloadPending)
+			{
+				ConfigReloadPending = false;
+				ProcessConfigFile(PGC_SIGHUP);
+			}
+		}
+	}
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -1003,6 +1106,9 @@ apply_handle_begin(StringInfo s)
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
+
+	/* Check if there is any clock skew and perform configured action */
+	manage_clock_skew(begin_data.committime);
 }
 
 /*
@@ -1060,6 +1166,9 @@ apply_handle_begin_prepare(StringInfo s)
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
+
+	/* Check if there is any clock skew and perform configured action */
+	manage_clock_skew(begin_data.prepare_time);
 }
 
 /*
@@ -1315,7 +1424,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
-								   prepare_data.xid, prepare_data.prepare_lsn);
+								   prepare_data.xid, prepare_data.prepare_lsn,
+								   prepare_data.prepare_time);
 
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
@@ -2020,7 +2130,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
  */
 void
 apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-					   XLogRecPtr lsn)
+					   XLogRecPtr lsn,
+					   TimestampTz origin_timestamp)
 {
 	int			nchanges;
 	char		path[MAXPGPATH];
@@ -2073,6 +2184,13 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 
 	end_replication_step();
 
+	/*
+	 * If origin_timestamp is provided by caller, then check clock skew with
+	 * respect to the passed time and take configured action.
+	 */
+	if (origin_timestamp)
+		manage_clock_skew(origin_timestamp);
+
 	/*
 	 * Read the entries one by one and pass them through the same logic as in
 	 * apply_dispatch.
@@ -2178,7 +2296,8 @@ apply_handle_stream_commit(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
-								   commit_data.commit_lsn);
+								   commit_data.commit_lsn,
+								   commit_data.committime);
 
 			apply_handle_commit_internal(&commit_data);
 
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 8efb4044d6..0ebad6fcab 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -59,6 +59,7 @@ CHECKPOINTER_MAIN	"Waiting in main loop of checkpointer process."
 LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
+LOGICAL_CLOCK_SKEW	"Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
 REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..c768a11963 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -68,6 +68,7 @@
 #include "postmaster/walsummarizer.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
@@ -482,6 +483,7 @@ extern const struct config_enum_entry archive_mode_options[];
 extern const struct config_enum_entry recovery_target_action_options[];
 extern const struct config_enum_entry wal_sync_method_options[];
 extern const struct config_enum_entry dynamic_shared_memory_options[];
+extern const struct config_enum_entry logical_rep_clock_skew_action_options[];
 
 /*
  * GUC option variables that are exported from this module
@@ -3714,6 +3716,33 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets maximum clock skew tolerance between logical "
+						 "replication nodes beyond which action configured "
+						 "in max_logical_rep_clock_skew_action is triggered."),
+			gettext_noop("-1 turns this check off."),
+			GUC_UNIT_S
+		},
+		&max_logical_rep_clock_skew,
+		LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"max_logical_rep_clock_skew_wait", PGC_SIGHUP, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets max limit on how long apply worker shall wait to "
+						 "bring clock skew within permissible range of max_logical_rep_clock_skew. "
+						 "If the computed wait time is more than this value, "
+						 "apply worker will error out without waiting."),
+			gettext_noop("0 turns this limit off."),
+			GUC_UNIT_S
+		},
+		&max_logical_rep_clock_skew_wait,
+		300, 0, 3600,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
@@ -4991,6 +5020,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS,
+			gettext_noop("Sets the action to perform if a clock skew higher "
+						 "than max_logical_rep_clock_skew is detected."),
+			NULL
+		},
+		&max_logical_rep_clock_skew_action,
+		LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"track_functions", PGC_SUSET, STATS_CUMULATIVE,
 			gettext_noop("Collects function-level statistics on database activity."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..6424432362 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -383,7 +383,14 @@
 					# (change requires restart)
 #max_sync_workers_per_subscription = 2	# taken from max_logical_replication_workers
 #max_parallel_apply_workers_per_subscription = 2	# taken from max_logical_replication_workers
-
+#max_logical_rep_clock_skew = -1	# maximum clock skew tolerance between logical
+					# replication nodes beyond which action configured in
+					# 'max_logical_rep_clock_skew_action' is triggered.
+#max_logical_rep_clock_skew_action = error # error or wait
+					   # (change requires restart)
+#max_logical_rep_clock_skew_wait = 300 # max limit on how long apply worker
+					# shall wait to bring clock skew within permissible
+					# range of max_logical_rep_clock_skew.
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index a18d79d1b2..7cb03062ac 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,7 +14,25 @@
 
 #include <signal.h>
 
+/*
+ * The default for max_logical_rep_clock_skew is -1, which means ignore clock
+ * skew (the check is turned off).
+ */
+#define LR_CLOCK_SKEW_DEFAULT -1
+
+/*
+ * Worker Clock Skew Action.
+ */
+typedef enum
+{
+	LR_CLOCK_SKEW_ACTION_ERROR,
+	LR_CLOCK_SKEW_ACTION_WAIT,
+} LogicalRepClockSkewAction;
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
+extern PGDLLIMPORT int max_logical_rep_clock_skew;
+extern PGDLLIMPORT int max_logical_rep_clock_skew_action;
+extern PGDLLIMPORT int max_logical_rep_clock_skew_wait;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 9646261d7e..95b2a5286d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -268,7 +268,7 @@ extern void stream_stop_internal(TransactionId xid);
 
 /* Common streaming function to apply all the spooled messages */
 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-								   XLogRecPtr lsn);
+								   XLogRecPtr lsn, TimestampTz origin_timestamp);
 
 extern void apply_dispatch(StringInfo s);
 
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index a6ce03ed46..53b828d89d 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X)
 /* Macros for doing timestamp arithmetic without assuming timestamp's units */
 #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000))
 #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000))
+#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000))
 
 
 /* Set at postmaster start */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b6135f0347..5a4e86fccd 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1566,6 +1566,7 @@ LogicalOutputPluginWriterPrepareWrite
 LogicalOutputPluginWriterUpdateProgress
 LogicalOutputPluginWriterWrite
 LogicalRepBeginData
+LogicalRepClockSkewAction
 LogicalRepCommitData
 LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
-- 
2.34.1

#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Nisha Moond (#1)
Re: Clock-skew management in logical replication

Nisha Moond <nisha.moond412@gmail.com> writes:

While considering the implementation of timestamp-based conflict
resolution (last_update_wins) in logical replication (see [1]), there
was a feedback at [2] and the discussion on whether or not to manage
clock-skew at database level.

FWIW, I cannot see why we would do anything beyond suggesting that
people run NTP. That's standard anyway on the vast majority of
machines these days. Why would we add complexity that we have
to maintain (and document) in order to cater to somebody not doing
that?

regards, tom lane

#3shihao zhong
zhong950419@gmail.com
In reply to: Nisha Moond (#1)
Re: Clock-skew management in logical replication

Nisha Moond <nisha.moond412@gmail.com> writes:

Thoughts? Looking forward to hearing others' opinions!

Had a productive conversation with Amit Kaplia today about time skew
in distributed systems, and wanted to share some thoughts.
Essentially, we're grappling with the classic distributed snapshot
problem. In a multi-active environment, where multiple nodes can
independently process transactions, it becomes crucial to determine
the visibility of these transactions across the system. Time skew,
where different machines have different timestamps make it a hard
problem. How can we ensure consistent transaction ordering and
visibility when time itself is unreliable?

As you mentioned, there are several ways to tackle the time skew
problem in distributed systems. These approaches generally fall into
three main categories:

1. Centralized Timestamps (Timestamp Oracle)

Mechanism: A dedicated server acts as a single source of truth for
time, eliminating skew by providing timestamps to all nodes. Google
Percolator and TiDB use this approach.
Consistency level: Serializable
Pros: Simple to implement.
Cons: High latency for cross-geo transactions due to reliance on a
central server. Can become a bottleneck.

2. Atomic Clocks (True Time)

Mechanism: Utilizes highly accurate atomic clocks to provide a
globally consistent view of time, as seen in Google Spanner.
Consistency level: External Serializable
Pros: Very high consistency level (externally consistent).
Cons: Requires specialized and expensive hardware. Adds some latency
to transactions, though less than centralized timestamps.

3. Hybrid Logical Clocks

Mechanism: CombinesNTP for rough time synchronization with logical
clocks for finer-grained ordering. Yugabyte and CockroachDB employ
this strategy.
Consistency level: Serializable
Pros: Avoids the need for specialized hardware.
Cons: Can introduce significant latency to transactions.

4 Local Clocks

Mechanism: Just use logical clock
Consistency level: Eventual Consistency
Pros: Simple implementation
Cons: The consistency level is very low

Of the four implementations considered, only local clocks and the HLC
approach offer a 'pure database' solution. Given PostgreSQL's
practical use cases, I recommend starting with a local clock
implementation. However, recognizing the increasing prevalence of
distributed clock services, we should also implement a pluggable time
access method. This allows users to integrate with different time
services as needed.

In the mid-term, implementing the HLC approach would provide highly
consistent snapshot reads. This offers a significant advantage for
many use cases.

Long-term, we should consider integrating with a distributed time
service like AWS Time Sync Service. This ensures high accuracy and
scalability for demanding applications.

Thanks,
Shihao

#4Joe Conway
mail@joeconway.com
In reply to: shihao zhong (#3)
Re: Clock-skew management in logical replication

On 9/21/24 01:31, shihao zhong wrote:

Nisha Moond <nisha.moond412@gmail.com> writes:

Thoughts? Looking forward to hearing others' opinions!

Had a productive conversation with Amit Kaplia today about time skew
in distributed systems, and wanted to share some thoughts.
Essentially, we're grappling with the classic distributed snapshot
problem. In a multi-active environment, where multiple nodes can
independently process transactions, it becomes crucial to determine
the visibility of these transactions across the system. Time skew,
where different machines have different timestamps make it a hard
problem. How can we ensure consistent transaction ordering and
visibility when time itself is unreliable?

As you mentioned, there are several ways to tackle the time skew
problem in distributed systems. These approaches generally fall into
three main categories:

1. Centralized Timestamps (Timestamp Oracle)
2. Atomic Clocks (True Time)
3. Hybrid Logical Clocks
4 Local Clocks

I recommend ...<snip>... implement a pluggable time access method. This
allows users to integrate with different time services as needed.

Huge +1

In the mid-term, implementing the HLC approach would provide highly
consistent snapshot reads. This offers a significant advantage for
many use cases.

agreed

Long-term, we should consider integrating with a distributed time
service like AWS Time Sync Service. This ensures high accuracy and
scalability for demanding applications.

I think the pluggable access method should make this possible, no?

--
Joe Conway
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

#5shihao zhong
zhong950419@gmail.com
In reply to: Joe Conway (#4)
Re: Clock-skew management in logical replication
Show quoted text

Long-term, we should consider integrating with a distributed time
service like AWS Time Sync Service. This ensures high accuracy and
scalability for demanding applications.

I think the pluggable access method should make > this possible, no?

I am sorry that I did not explain clearly in previous email. What do I
mean is the pluggable time access method should provide the mechanism to
use customized time service. But there is no out of box solution for
customer who want to use customized time service. I am suggesting we
provide some default implementation for popular used time service like AWS
time sync service. Maybe that should be done outside of the mainstream but
this is something provide better user experience

--
Joe Conway
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Joe Conway (#4)
Re: Clock-skew management in logical replication

On Sun, Sep 22, 2024 at 7:24 PM Joe Conway <mail@joeconway.com> wrote:

On 9/21/24 01:31, shihao zhong wrote:

Nisha Moond <nisha.moond412@gmail.com> writes:

Thoughts? Looking forward to hearing others' opinions!

Had a productive conversation with Amit Kaplia today about time skew
in distributed systems, and wanted to share some thoughts.
Essentially, we're grappling with the classic distributed snapshot
problem. In a multi-active environment, where multiple nodes can
independently process transactions, it becomes crucial to determine
the visibility of these transactions across the system. Time skew,
where different machines have different timestamps make it a hard
problem. How can we ensure consistent transaction ordering and
visibility when time itself is unreliable?

As you mentioned, there are several ways to tackle the time skew
problem in distributed systems. These approaches generally fall into
three main categories:

1. Centralized Timestamps (Timestamp Oracle)
2. Atomic Clocks (True Time)
3. Hybrid Logical Clocks
4 Local Clocks

I recommend ...<snip>... implement a pluggable time access method. This
allows users to integrate with different time services as needed.

Huge +1

The one idea to provide user control over timestamps that are used for
'latest_write_wins' strategy could be to let users specify the values
in a special column in the table that will be used to resolve
conflicts.

CREATE TABLE foo(c1 int, c2 timestamp default conflict_fn, CHECK CONFLICTS(c2));

Now, for column c2 user can provide its function which can provide
value for each row that can be used to resolve conflict. If the
table_level conflict column is provided then that will be used to
resolve conflicts, otherwise, the default commit timestamp provided by
commit_ts module will be used to resolve conflict.

On the apply-side, we will use a condition like:
if ((source_new_column_value > replica_current_column_value) ||
operation.type == "delete")
apply_update();

In the above example case, source_new_column_value and
replica_current_column_value will be column c2 on publisher and
subscriber. Note, that in the above case, we allowed deletes to always
win as the delete operation doesn't update the column values. We can
choose a different strategy to apply deletes like comparing the
existing column values as well.

Note that MYSQL [1]https://dev.mysql.com/doc/refman/9.0/en/mysql-cluster-replication-schema.html#ndb-replication-ndb-replication and Oracle's Timesten [2]https://docs.oracle.com/en/database/other-databases/timesten/22.1/replication/configuring-timestamp-comparison.html#GUID-C8B0580B-B577-435F-8726-4AF341A09806 provide a similar
strategy at the table level for conflict resolution to avoid reliance
on system clocks.

Though this provides a way for users to control values required for
conflict resolution, I prefer a simple approach at least for the first
version which is to document that users should ensure time
synchronization via NTP. Even Oracle mentions the same in their docs
[3]: https://www.oracle.com/cn/a/tech/docs/technical-resources/wp-oracle-goldengate-activeactive-final2-1.pdf
databases are identical to one another and it’s recommended that all
database servers are configured to maintain accurate time through a
time server using the network time protocol (NTP). Even in
environments where databases span different time zones, all database
clocks must be set to the same time zone or Coordinated Universal Time
(UTC) must be used to maintain accurate time. Failure to maintain
accurate and synchronized time across the databases in an
active-active replication environment will result in data integrity
issues.")

[1]: https://dev.mysql.com/doc/refman/9.0/en/mysql-cluster-replication-schema.html#ndb-replication-ndb-replication
[2]: https://docs.oracle.com/en/database/other-databases/timesten/22.1/replication/configuring-timestamp-comparison.html#GUID-C8B0580B-B577-435F-8726-4AF341A09806
[3]: https://www.oracle.com/cn/a/tech/docs/technical-resources/wp-oracle-goldengate-activeactive-final2-1.pdf

--
With Regards,
Amit Kapila.

#7Nisha Moond
nisha.moond412@gmail.com
In reply to: Tom Lane (#2)
Re: Clock-skew management in logical replication

On Fri, Sep 20, 2024 at 7:51 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Nisha Moond <nisha.moond412@gmail.com> writes:

While considering the implementation of timestamp-based conflict
resolution (last_update_wins) in logical replication (see [1]), there
was a feedback at [2] and the discussion on whether or not to manage
clock-skew at database level.

FWIW, I cannot see why we would do anything beyond suggesting that
people run NTP. That's standard anyway on the vast majority of
machines these days. Why would we add complexity that we have
to maintain (and document) in order to cater to somebody not doing
that?

regards, tom lane

Thank you for your response.

I agree with suggesting users to run NTP and we can recommend it in
the docs rather than introducing additional complexities.

In my research on setting up NTP servers on Linux, I found that
Chrony[1]https://chrony-project.org/index.html is a lightweight and efficient solution for time
synchronization across nodes. Another reliable option is the classic
NTP daemon (ntpd)[2]https://www.ntp.org/documentation/4.2.8-series/, which is also easy to configure and maintain.
Both Chrony and ntpd can be used to configure a local machine as an
NTP server for localized time synchronization, or as clients syncing
from public NTP servers such as 'ntp.ubuntu.com' (default ntp server
pool for Ubuntu systems) or 'time.google.com'(Google Public NTP).
For example, on Ubuntu, Chrony is straightforward to install and
configure[3]https://documentation.ubuntu.com/server/how-to/networking/serve-ntp-with-chrony/. Comprehensive NTP(ntpd) configuration guides are
available for various Linux distributions, such as Ubuntu[4]https://askubuntu.com/questions/14558/how-do-i-setup-a-local-ntp-server and
RedHat-Linux[5]https://docs.redhat.com/en/documentation/red_hat_enterprise_linux/7/html/system_administrators_guide/ch-configuring_ntp_using_ntpd#s1-Understanding_the_ntpd_Configuration_File.

Further, I’m exploring options for implementing NTP on Windows systems.

[1]: https://chrony-project.org/index.html
[2]: https://www.ntp.org/documentation/4.2.8-series/
[3]: https://documentation.ubuntu.com/server/how-to/networking/serve-ntp-with-chrony/
[4]: https://askubuntu.com/questions/14558/how-do-i-setup-a-local-ntp-server
[5]: https://docs.redhat.com/en/documentation/red_hat_enterprise_linux/7/html/system_administrators_guide/ch-configuring_ntp_using_ntpd#s1-Understanding_the_ntpd_Configuration_File

Thanks,
Nisha

#8Nisha Moond
nisha.moond412@gmail.com
In reply to: Nisha Moond (#7)
Re: Clock-skew management in logical replication

On Mon, Sep 23, 2024 at 4:00 PM Nisha Moond <nisha.moond412@gmail.com> wrote:

On Fri, Sep 20, 2024 at 7:51 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Nisha Moond <nisha.moond412@gmail.com> writes:

While considering the implementation of timestamp-based conflict
resolution (last_update_wins) in logical replication (see [1]), there
was a feedback at [2] and the discussion on whether or not to manage
clock-skew at database level.

FWIW, I cannot see why we would do anything beyond suggesting that
people run NTP. That's standard anyway on the vast majority of
machines these days. Why would we add complexity that we have
to maintain (and document) in order to cater to somebody not doing
that?

regards, tom lane

Thank you for your response.

I agree with suggesting users to run NTP and we can recommend it in
the docs rather than introducing additional complexities.

In my research on setting up NTP servers on Linux, I found that
Chrony[1] is a lightweight and efficient solution for time
synchronization across nodes. Another reliable option is the classic
NTP daemon (ntpd)[2], which is also easy to configure and maintain.
Both Chrony and ntpd can be used to configure a local machine as an
NTP server for localized time synchronization, or as clients syncing
from public NTP servers such as 'ntp.ubuntu.com' (default ntp server
pool for Ubuntu systems) or 'time.google.com'(Google Public NTP).
For example, on Ubuntu, Chrony is straightforward to install and
configure[3]. Comprehensive NTP(ntpd) configuration guides are
available for various Linux distributions, such as Ubuntu[4] and
RedHat-Linux[5].

Further, I’m exploring options for implementing NTP on Windows systems.

Windows platforms provide built-in time synchronization services. As a
client, they allow users to sync system time using internet or public
NTP servers. This can be easily configured by selecting a public NTP
server directly in the Date and Time settings. More details can be
found at [1]https://learn.microsoft.com/en-us/windows-server/networking/windows-time-service/how-the-windows-time-service-works.

Additionally, Windows servers can be configured as NTP servers for
localized time synchronization within a network, allowing other nodes
to sync with them. Further instructions on configuring an NTP server
on Windows can be found at [2]https://learn.microsoft.com/en-us/troubleshoot/windows-server/active-directory/configure-authoritative-time-server.

[1]: https://learn.microsoft.com/en-us/windows-server/networking/windows-time-service/how-the-windows-time-service-works
[2]: https://learn.microsoft.com/en-us/troubleshoot/windows-server/active-directory/configure-authoritative-time-server

Thanks,
Nisha

#9Michael Paquier
michael@paquier.xyz
In reply to: Tom Lane (#2)
Re: Clock-skew management in logical replication

On Fri, Sep 20, 2024 at 10:21:34AM -0400, Tom Lane wrote:

FWIW, I cannot see why we would do anything beyond suggesting that
people run NTP. That's standard anyway on the vast majority of
machines these days. Why would we add complexity that we have
to maintain (and document) in order to cater to somebody not doing
that?

Agreed. I am on the same boat as you are here. I don't think that
the database should be in charge of taking like decisions based on a
clock that may have gone crazy. Precise clocks are a difficult
problem, for sure, but this patch is just providing a workaround for a
problem that should not be linked to the backend engine by default and
I agree that we will feel better if we neither maintain this stuff nor
enter in this territory.

Making that more pluggable, though, has the merit to let out-of-core
folks do what they want, even if we may finish with an community
ecosystem that has more solutions than the number of fingers on one
hand. I've seen multiple ways of solving conflicts across multiple
logical nodes in the past years, some being clock-based, some not,
with more internal strictly-monotonic counting solution to solve any
conflicts.
--
Michael

#10Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#6)
RE: Clock-skew management in logical replication

Dear hackers,

Though this provides a way for users to control values required for
conflict resolution, I prefer a simple approach at least for the first
version which is to document that users should ensure time
synchronization via NTP. Even Oracle mentions the same in their docs

I researched some cloud services and found that the time-sync services on the
cloud are integrated with the NTP or PTP direct connection. This means that there
are no specific APIs to synchronize the machine clock. Based on that,
I also agree with the simple approach (just document). I feel the synchronization
can be regarded as the low-layer task and can rely on the OS.

The below part shows the status of cloud vendors and Oracle.

## AWS case

AWS provides a "Time Sync Service" [1]https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configure-ec2-ntp.html that can be used via NTP. The source server
is at 169.254.169.123; users can modify the configuration file to refer to it shown below.

```
server 169.254.169.123 prefer iburst
```

Or users can even directly connect to the local and accurate hardware clock.

## GCP case

GCP compute engines must use an NTP server on the GCP cloud [2]https://cloud.google.com/compute/docs/instances/configure-ntp, located at
metadata.google.internal, or other public NTP servers. The configuration will
look like this:

```
server metadata.google.internal iburst
```

## Oracle case

Oracle RAC requires that all participants are well synchronized by NTP.
Formally, it had an automatic synchronization feature called "Cluster Time
Synchronization Service (CTSS)." It is de-supported in Oracle Database 23ai [3]https://docs.oracle.com/en/database/oracle/oracle-database/23/cwlin/server-configuration-checklist-for-oracle-grid-infrastructure.html.

[1]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configure-ec2-ntp.html
[2]: https://cloud.google.com/compute/docs/instances/configure-ntp
[3]: https://docs.oracle.com/en/database/oracle/oracle-database/23/cwlin/server-configuration-checklist-for-oracle-grid-infrastructure.html

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#10)
Re: Clock-skew management in logical replication

On Wed, Sep 25, 2024 at 3:09 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Though this provides a way for users to control values required for
conflict resolution, I prefer a simple approach at least for the first
version which is to document that users should ensure time
synchronization via NTP. Even Oracle mentions the same in their docs

I researched some cloud services and found that the time-sync services on the
cloud are integrated with the NTP or PTP direct connection. This means that there
are no specific APIs to synchronize the machine clock. Based on that,
I also agree with the simple approach (just document). I feel the synchronization
can be regarded as the low-layer task and can rely on the OS.

The below part shows the status of cloud vendors and Oracle.

## AWS case

AWS provides a "Time Sync Service" [1] that can be used via NTP. The source server
is at 169.254.169.123; users can modify the configuration file to refer to it shown below.

```
server 169.254.169.123 prefer iburst
```

Or users can even directly connect to the local and accurate hardware clock.

## GCP case

GCP compute engines must use an NTP server on the GCP cloud [2], located at
metadata.google.internal, or other public NTP servers. The configuration will
look like this:

```
server metadata.google.internal iburst
```

If NTP already provides a way to configure other time-sync services as
shown by you then I don't think we need to do more at this stage
except to document it with the conflict resolution patch. In the
future, we may want to provide an additional column in the table with
a special meaning that can help in conflict resolution.

--
With Regards,
Amit Kapila.