From 4226b471e1f88f0e5c55715c46671baf13d8c42d Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 17 Apr 2017 10:44:27 +0900 Subject: [PATCH 1/2] Add a GUC parameter apply_worker_timeout. Terminate replication connections that are inactive longer than the specified number of milliseconds. --- doc/src/sgml/config.sgml | 31 +++++++++++++++++++++++---- src/backend/replication/logical/worker.c | 11 ++++++---- src/backend/utils/misc/guc.c | 12 +++++++++++ src/backend/utils/misc/postgresql.conf.sample | 2 ++ src/include/replication/logicalworker.h | 2 ++ 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 744c5e8..f584431 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3421,9 +3421,8 @@ ANY num_sync ( num_sync ( + apply_worker_timeout (integer) + + apply_worker_timeout configuration parameter + + + + + Terminate replication connections that are inactive longer + than the specified number of milliseconds. This is useful for + the receiving subscriber to detect a publisher node crash or network + outage. + + + A value of zero disables the timeout mechanism. This parameter + can only be set in + the postgresql.conf file or on the server command line. + + + The default value is 60 seconds. + + + + + diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 656d399..8d65837 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -86,6 +86,9 @@ #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ +/* GUC variable */ +int apply_worker_timeout; + typedef struct FlushPosition { dlist_node node; @@ -1150,7 +1153,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * We didn't receive anything new. If we haven't heard * anything from the server for more than - * wal_receiver_timeout / 2, ping the server. Also, if + * apply_worker_timeout / 2, ping the server. Also, if * it's been longer than wal_receiver_status_interval * since the last update we sent, send a status update to * the master anyway, to report any progress in applying @@ -1162,14 +1165,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Check if time since last receive from standby has * reached the configured limit. */ - if (wal_receiver_timeout > 0) + if (apply_worker_timeout > 0) { TimestampTz now = GetCurrentTimestamp(); TimestampTz timeout; timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); + apply_worker_timeout); if (now >= timeout) ereport(ERROR, @@ -1182,7 +1185,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); + (apply_worker_timeout / 2)); if (now >= timeout) { requestReply = true; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9ad8361..378bad2 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -63,6 +63,7 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" @@ -1828,6 +1829,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"apply_worker_timeout", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the maximum wait time to receive data from the publisher."), + NULL, + GUC_UNIT_MS + }, + &apply_worker_timeout, + 60 * 1000, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 1435d92..20b5bb7 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -278,6 +278,8 @@ #max_logical_replication_workers = 4 # taken from max_worker_processes #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers +#apply_worker_timeout = 60s # time that apply worker waits for + # communication from publisher# #------------------------------------------------------------------------------ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 3e0affa..6395850 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -12,6 +12,8 @@ #ifndef LOGICALWORKER_H #define LOGICALWORKER_H +extern int apply_worker_timeout; + extern void ApplyWorkerMain(Datum main_arg); #endif /* LOGICALWORKER_H */ -- 2.8.1