From f99fe4b164fcaa58675f832b1497d70b42c22c91 Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Thu, 5 Sep 2024 15:31:38 +0530 Subject: [PATCH v1] Implements Clock-skew management between nodes. This patch attempts to manage clock skew between nodes by introducing three new GUCs: a) max_logical_rep_clock_skew b) max_logical_rep_clock_skew_action c) max_logical_rep_clock_skew_wait If the timestamp of the currently replayed transaction is in the future compared to the current time on the subscriber and the difference is larger than 'max_logical_rep_clock_skew', then the action configured in 'max_logical_rep_clock_skew_action' is performed by the apply worker. If user configures 'wait' in 'max_logical_rep_clock_skew_action', then apply worker will wait during 'begin' of transaction to bring clock-skew within permissible range of 'max_logical_rep_clock_skew'. There could be cases where actual clock skew is large while the configured 'max_logical_rep_clock_skew' is small. Then the apply worker may have to wait for a longer period to manage the clock skew. To control this maximum wait time, a new GUC, 'max_logical_rep_clock_skew_wait' is provided. This allows the user to set a cap on how long the apply worker should wait. If the computed wait time exceeds this value, the apply worker will error out without waiting. --- .../replication/logical/applyparallelworker.c | 14 +- src/backend/replication/logical/worker.c | 125 +++++++++++++++++- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 40 ++++++ src/backend/utils/misc/postgresql.conf.sample | 9 +- src/include/replication/logicalworker.h | 18 +++ src/include/replication/worker_internal.h | 2 +- src/include/utils/timestamp.h | 1 + src/tools/pgindent/typedefs.list | 1 + 9 files changed, 205 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e7f7d4c5e4..eb68437654 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -312,6 +312,13 @@ pa_can_start(void) if (!AllTablesyncsReady()) return false; + /* + * Do not start a new parallel worker if user has configured max clock + * skew, as we need the commit timestamp in the beginning. + */ + if ((max_logical_rep_clock_skew > LR_CLOCK_SKEW_DEFAULT)) + return false; + return true; } @@ -696,9 +703,14 @@ pa_process_spooled_messages_if_required(void) } else if (fileset_state == FS_READY) { + /* + * Currently we do not support starting parallel apply worker when + * clock skew is configured, thus it is okay to pass 0 as + * origin-timestamp here. + */ apply_spooled_messages(&MyParallelShared->fileset, MyParallelShared->xid, - InvalidXLogRecPtr); + InvalidXLogRecPtr, 0); pa_set_fileset_state(MyParallelShared, FS_EMPTY); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 925dff9cc4..454f07deb6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -318,6 +318,20 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +/* + * GUC support + */ +const struct config_enum_entry logical_rep_clock_skew_action_options[] = { + {"error", LR_CLOCK_SKEW_ACTION_ERROR, false}, + {"wait", LR_CLOCK_SKEW_ACTION_WAIT, false}, + {NULL, 0, false} +}; + +/* GUCs */ +int max_logical_rep_clock_skew = LR_CLOCK_SKEW_DEFAULT; +int max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR; +int max_logical_rep_clock_skew_wait = 300; /* 5 mins */ + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -982,6 +996,95 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ExecStoreVirtualTuple(slot); } +/* + * Manage clock skew between nodes. + * + * It checks if the remote timestamp is ahead of the local clock + * and if the difference exceeds max_logical_rep_clock_skew, it performs + * the action specified by the max_logical_rep_clock_skew_action. + */ +static void +manage_clock_skew(TimestampTz origin_timestamp) +{ + TimestampTz current; + TimestampTz delayUntil; + long msecs; + int rc; + + /* nothing to do if no max clock skew configured */ + if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT) + return; + + current = GetCurrentTimestamp(); + + /* + * If the timestamp of the currently replayed transaction is in the future + * compared to the current time on the subscriber and the difference is + * larger than max_logical_rep_clock_skew, then perform the action + * specified by the max_logical_rep_clock_skew_action setting. + */ + if (origin_timestamp > current && + TimestampDifferenceExceeds(current, origin_timestamp, + max_logical_rep_clock_skew * 1000)) + { + if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg_internal("clock skew exceeds max_logical_rep_clock_skew (%d seconds)", + max_logical_rep_clock_skew))); + + /* Perform the wait */ + while (true) + { + delayUntil = + TimestampTzMinusSeconds(origin_timestamp, + max_logical_rep_clock_skew); + + /* Exit without waiting if it's already past 'delayUntil' time */ + msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), + delayUntil); + if (msecs <= 0) + break; + + /* The wait time should not exceed max_logical_rep_clock_skew_wait */ + if (msecs > (max_logical_rep_clock_skew_wait * 1000L)) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg_internal("clock skew wait time exceeds max_logical_rep_clock_skew_wait (%d seconds)", + max_logical_rep_clock_skew_wait))); + + elog(DEBUG2, "delaying apply for %ld milliseconds to manage clock skew", + msecs); + + /* Sleep until we are signaled or msecs have elapsed */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + msecs, + WAIT_EVENT_LOGICAL_CLOCK_SKEW); + + /* Exit the loop if msecs have elapsed */ + if (rc & WL_TIMEOUT) + break; + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* + * This might change max_logical_rep_clock_skew and + * max_logical_rep_clock_skew_wait. + */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + } +} + /* * Handle BEGIN message. */ @@ -1003,6 +1106,9 @@ apply_handle_begin(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.committime); } /* @@ -1060,6 +1166,9 @@ apply_handle_begin_prepare(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.prepare_time); } /* @@ -1315,7 +1424,8 @@ apply_handle_stream_prepare(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, - prepare_data.xid, prepare_data.prepare_lsn); + prepare_data.xid, prepare_data.prepare_lsn, + prepare_data.prepare_time); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -2020,7 +2130,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, */ void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn) + XLogRecPtr lsn, + TimestampTz origin_timestamp) { int nchanges; char path[MAXPGPATH]; @@ -2073,6 +2184,13 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, end_replication_step(); + /* + * If origin_timestamp is provided by caller, then check clock skew with + * respect to the passed time and take configured action. + */ + if (origin_timestamp) + manage_clock_skew(origin_timestamp); + /* * Read the entries one by one and pass them through the same logic as in * apply_dispatch. @@ -2178,7 +2296,8 @@ apply_handle_stream_commit(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, - commit_data.commit_lsn); + commit_data.commit_lsn, + commit_data.committime); apply_handle_commit_internal(&commit_data); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 8efb4044d6..0ebad6fcab 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -59,6 +59,7 @@ CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process." LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." +LOGICAL_CLOCK_SKEW "Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 686309db58..c768a11963 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -68,6 +68,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" @@ -482,6 +483,7 @@ extern const struct config_enum_entry archive_mode_options[]; extern const struct config_enum_entry recovery_target_action_options[]; extern const struct config_enum_entry wal_sync_method_options[]; extern const struct config_enum_entry dynamic_shared_memory_options[]; +extern const struct config_enum_entry logical_rep_clock_skew_action_options[]; /* * GUC option variables that are exported from this module @@ -3714,6 +3716,33 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets maximum clock skew tolerance between logical " + "replication nodes beyond which action configured " + "in max_logical_rep_clock_skew_action is triggered."), + gettext_noop("-1 turns this check off."), + GUC_UNIT_S + }, + &max_logical_rep_clock_skew, + LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX, + NULL, NULL, NULL + }, + + { + {"max_logical_rep_clock_skew_wait", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets max limit on how long apply worker shall wait to " + "bring clock skew within permissible range of max_logical_rep_clock_skew. " + "If the computed wait time is more than this value, " + "apply worker will error out without waiting."), + gettext_noop("0 turns this limit off."), + GUC_UNIT_S + }, + &max_logical_rep_clock_skew_wait, + 300, 0, 3600, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -4991,6 +5020,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the action to perform if a clock skew higher " + "than max_logical_rep_clock_skew is detected."), + NULL + }, + &max_logical_rep_clock_skew_action, + LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options, + NULL, NULL, NULL + }, + { {"track_functions", PGC_SUSET, STATS_CUMULATIVE, gettext_noop("Collects function-level statistics on database activity."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 667e0dc40a..6424432362 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -383,7 +383,14 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers - +#max_logical_rep_clock_skew = -1 # maximum clock skew tolerance between logical + # replication nodes beyond which action configured in + # 'max_logical_rep_clock_skew_action' is triggered. +#max_logical_rep_clock_skew_action = error # error or wait + # (change requires restart) +#max_logical_rep_clock_skew_wait = 300 # max limit on how long apply worker + # shall wait to bring clock skew within permissible + # range of max_logical_rep_clock_skew. #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..7cb03062ac 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,7 +14,25 @@ #include +/* + * The default for max_logical_rep_clock_skew is -1, which means ignore clock + * skew (the check is turned off). + */ +#define LR_CLOCK_SKEW_DEFAULT -1 + +/* + * Worker Clock Skew Action. + */ +typedef enum +{ + LR_CLOCK_SKEW_ACTION_ERROR, + LR_CLOCK_SKEW_ACTION_WAIT, +} LogicalRepClockSkewAction; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; +extern PGDLLIMPORT int max_logical_rep_clock_skew; +extern PGDLLIMPORT int max_logical_rep_clock_skew_action; +extern PGDLLIMPORT int max_logical_rep_clock_skew_wait; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9646261d7e..95b2a5286d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -268,7 +268,7 @@ extern void stream_stop_internal(TransactionId xid); /* Common streaming function to apply all the spooled messages */ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn); + XLogRecPtr lsn, TimestampTz origin_timestamp); extern void apply_dispatch(StringInfo s); diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index a6ce03ed46..53b828d89d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X) /* Macros for doing timestamp arithmetic without assuming timestamp's units */ #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) +#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000)) /* Set at postmaster start */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b6135f0347..5a4e86fccd 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1566,6 +1566,7 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepClockSkewAction LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct -- 2.34.1