suppressing useless wakeups in logical/worker.c

Started by Nathan Bossartabout 3 years ago25 messages
#1Nathan Bossart
nathandbossart@gmail.com
1 attachment(s)

Hi hackers,

I've attached an attempt at porting a similar change to 05a7be9 [0]/messages/by-id/CA+hUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA@mail.gmail.com to
logical/worker.c. The bulk of the patch is lifted from the walreceiver
patch, but I did need to add a hack for waking up after
wal_retrieve_retry_interval to start sync workers. This hack involves a
new wakeup variable that process_syncing_tables_for_apply() sets.

For best results, this patch should be applied on top of [1]/messages/by-id/20221122004119.GA132961@nathanxps13, which is an
attempt at fixing all the stuff that only runs within a reasonable
timeframe because logical worker processes currently wake up at least once
a second. With the attached patch applied, those periodic wakeups are
gone, so we need to make sure we wake up the logical workers as needed.

[0]: /messages/by-id/CA+hUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA@mail.gmail.com
[1]: /messages/by-id/20221122004119.GA132961@nathanxps13

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v1-0001-suppress-unnecessary-wakeups-in-logical-worker.c.patchtext/x-diff; charset=us-asciiDownload
From d1af5b4b3073984fba1e5e86b9c034a96b9e235a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Thu, 1 Dec 2022 20:50:21 -0800
Subject: [PATCH v1 1/1] suppress unnecessary wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  20 +++
 src/backend/replication/logical/worker.c    | 156 +++++++++++++++-----
 src/include/replication/worker_internal.h   |   3 +
 3 files changed, 142 insertions(+), 37 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..168a2da3a1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -418,6 +418,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (next_sync_start <= GetCurrentTimestamp())
+		next_sync_start = PG_INT64_MAX;
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -612,6 +619,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 rstate->relid);
 						hentry->last_start_time = now;
 					}
+					else if (found)
+					{
+						TimestampTz retry_time = hentry->last_start_time +
+												 (wal_retrieve_retry_interval *
+												  INT64CONST(1000));
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						if (retry_time < next_sync_start)
+							next_sync_start = retry_time;
+					}
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f9efe6c4c6..03dc42ceee 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -196,8 +196,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -279,6 +277,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	NUM_LRW_WAKEUPS
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+TimestampTz next_sync_start;
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -2708,10 +2726,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -2740,6 +2757,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+	next_sync_start = PG_INT64_MAX;
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -2756,6 +2779,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
+		now = GetCurrentTimestamp();
 		if (len != 0)
 		{
 			/* Loop to process all available data (without blocking). */
@@ -2779,9 +2803,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -2835,6 +2859,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				now = GetCurrentTimestamp();
 			}
 		}
 
@@ -2873,14 +2898,43 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = PG_INT64_MAX;
+
+			/* Find soonest wakeup time, to limit our nap. */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/* Also consider special wakeup time for starting sync workers. */
+			if (next_sync_start < now)
+			{
+				/*
+				 * Instead of spinning while we wait for the sync worker to
+				 * start, wait a bit before retrying (unless there's an earlier
+				 * wakeup time).
+				 */
+				nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+			}
+			else
+				nextWakeup = Min(next_sync_start, nextWakeup);
+
+			/*
+			 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+			 * timeouts longer than INT_MAX milliseconds, so we limit the
+			 * result accordingly.  Also, we round up to the next millisecond
+			 * to avoid waking up too early and spinning until one of the
+			 * wakeup times.
+			 */
+			wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 							   fd, wait_time,
 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
-
+		now = GetCurrentTimestamp();
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
@@ -2891,6 +2945,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * If a wakeup time for starting sync workers was set, just set it
+			 * to right now.  It will be recalculated as needed.
+			 */
+			if (next_sync_start != PG_INT64_MAX)
+				next_sync_start = now;
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -2909,31 +2973,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
-			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
 
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
-
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
+			{
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = PG_INT64_MAX;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -2968,7 +3017,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3011,10 +3059,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -4092,3 +4141,36 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+			break;
+		default:
+			break;
+	}
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 2b7114ff6d..dfb7b06796 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -80,6 +80,9 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+/* Next time to attempt starting sync workers. */
+extern PGDLLIMPORT TimestampTz next_sync_start;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
-- 
2.25.1

#2Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Nathan Bossart (#1)
RE: suppressing useless wakeups in logical/worker.c

Dear Nathan,

Thank you for making the patch! I tested your patch, and it basically worked well.
About following part:

```
			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * If a wakeup time for starting sync workers was set, just set it
+			 * to right now.  It will be recalculated as needed.
+			 */
+			if (next_sync_start != PG_INT64_MAX)
+				next_sync_start = now;
 		}
```

Do we have to recalculate the NextWakeup when subscriber receives SIGHUP signal?
I think this may cause the unexpected change like following.

Assuming that wal_receiver_timeout is 60s, and wal_sender_timeout on publisher is
0s (or the network between nodes is disconnected).
And we send SIGHUP signal per 20s to subscriber's postmaster.

Currently the last_recv_time is calcurated when the worker accepts messages,
and the value is used for deciding to send a ping. The worker will exit if the
walsender does not reply.

But in your patch, the apply worker calcurates wakeup[LRW_WAKEUP_PING] and
wakeup[LRW_WAKEUP_TERMINATE] again when it gets SIGHUP, so the worker never sends
ping with requestReply = true, and never exits due to the timeout.

My case seems to be crazy, but there may be another issues if it remains.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

#3Nathan Bossart
nathandbossart@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#2)
Re: suppressing useless wakeups in logical/worker.c

On Mon, Dec 05, 2022 at 01:00:19PM +0000, Hayato Kuroda (Fujitsu) wrote:

But in your patch, the apply worker calcurates wakeup[LRW_WAKEUP_PING] and
wakeup[LRW_WAKEUP_TERMINATE] again when it gets SIGHUP, so the worker never sends
ping with requestReply = true, and never exits due to the timeout.

This is the case for the walreceiver patch, too. If a SIGHUP arrives just
before we are due to ping the server, the ping wakeup time will be pushed
back. To me, this seems unlikely to cause any issues in practice unless
the server is being constantly SIGHUP'd. If we wanted to fix this, we'd
probably need to recompute the wakeup times using the values currently set.
I haven't looked into this too closely, but it doesn't sound tremendously
difficult. Thoughts?

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#4Nathan Bossart
nathandbossart@gmail.com
In reply to: Nathan Bossart (#3)
1 attachment(s)
Re: suppressing useless wakeups in logical/worker.c

rebased for cfbot

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v2-0001-suppress-unnecessary-wakeups-in-logical-worker.c.patchtext/x-diff; charset=us-asciiDownload
From 2466001a3ae6f94aac8eff45b488765e619bea1b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Thu, 1 Dec 2022 20:50:21 -0800
Subject: [PATCH v2 1/1] suppress unnecessary wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  20 +++
 src/backend/replication/logical/worker.c    | 156 +++++++++++++++-----
 src/include/replication/worker_internal.h   |   3 +
 3 files changed, 142 insertions(+), 37 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce7129..88218e1fed 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (next_sync_start <= GetCurrentTimestamp())
+		next_sync_start = PG_INT64_MAX;
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -592,6 +599,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
+					else if (found)
+					{
+						TimestampTz retry_time = hentry->last_start_time +
+												 (wal_retrieve_retry_interval *
+												  INT64CONST(1000));
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						if (retry_time < next_sync_start)
+							next_sync_start = retry_time;
+					}
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 79cda39445..284f11428c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -207,8 +207,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -348,6 +346,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	NUM_LRW_WAKEUPS
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+TimestampTz next_sync_start;
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -3446,10 +3464,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3479,6 +3496,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = &errcallback;
 	apply_error_context_stack = error_context_stack;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+	next_sync_start = PG_INT64_MAX;
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -3495,6 +3518,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
+		now = GetCurrentTimestamp();
 		if (len != 0)
 		{
 			/* Loop to process all available data (without blocking). */
@@ -3518,9 +3542,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -3574,6 +3598,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				now = GetCurrentTimestamp();
 			}
 		}
 
@@ -3612,14 +3637,43 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = PG_INT64_MAX;
+
+			/* Find soonest wakeup time, to limit our nap. */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/* Also consider special wakeup time for starting sync workers. */
+			if (next_sync_start < now)
+			{
+				/*
+				 * Instead of spinning while we wait for the sync worker to
+				 * start, wait a bit before retrying (unless there's an earlier
+				 * wakeup time).
+				 */
+				nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+			}
+			else
+				nextWakeup = Min(next_sync_start, nextWakeup);
+
+			/*
+			 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+			 * timeouts longer than INT_MAX milliseconds, so we limit the
+			 * result accordingly.  Also, we round up to the next millisecond
+			 * to avoid waking up too early and spinning until one of the
+			 * wakeup times.
+			 */
+			wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 							   fd, wait_time,
 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
-
+		now = GetCurrentTimestamp();
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
@@ -3630,6 +3684,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * If a wakeup time for starting sync workers was set, just set it
+			 * to right now.  It will be recalculated as needed.
+			 */
+			if (next_sync_start != PG_INT64_MAX)
+				next_sync_start = now;
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -3648,31 +3712,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
-			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
 
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
-
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
+			{
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = PG_INT64_MAX;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -3708,7 +3757,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3751,10 +3799,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -5034,3 +5083,36 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_SEND_TO_PARALLEL;
 	}
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+			break;
+		default:
+			break;
+	}
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index db891eea8a..ec39711ed1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,9 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+/* Next time to attempt starting sync workers. */
+extern PGDLLIMPORT TimestampTz next_sync_start;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
-- 
2.25.1

#5Tom Lane
tgl@sss.pgh.pa.us
In reply to: Nathan Bossart (#4)
Re: suppressing useless wakeups in logical/worker.c

Nathan Bossart <nathandbossart@gmail.com> writes:

[ v2-0001-suppress-unnecessary-wakeups-in-logical-worker.c.patch ]

I took a look through this, and have a number of mostly-cosmetic
issues:

* It seems wrong that next_sync_start isn't handled as one of the
wakeup[NUM_LRW_WAKEUPS] entries. I see that it needs to be accessed from
another module; but you could handle that without exposing either enum
LogRepWorkerWakeupReason or the array, by providing getter/setter
functions for process_syncing_tables_for_apply() to call.

* This code is far too intimately familiar with the fact that TimestampTz
is an int64 count of microseconds. (I'm picky about that because I
remember that they were once something else, so I wonder if someday
they will be different again.) You could get rid of the PG_INT64_MAX
usages by replacing those with the timestamp infinity macro DT_NOEND;
and I'd even be on board with adding a less-opaque alternate name for
that to datatype/timestamp.h. The various magic-constant multipliers
could perhaps be made less magic by using TimestampTzPlusMilliseconds().

* I think it might be better to construct the enum like this:

+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS
+#define NUM_LRW_WAKEUPS (LRW_WAKEUP_STATUS + 1)
+} LogRepWorkerWakeupReason;

so that you don't have to have a default: case in switches on the
enum value. I'm more worried about somebody adding an enum value
and missing updating a switch statement elsewhere than I am about
somebody adding an enum value and neglecting to update the
immediately-adjacent macro.

* The updates of "now" in LogicalRepApplyLoop seem rather
randomly placed, and I'm not entirely convinced that we'll
always be using a reasonably up-to-date value. Can't we
just update it right before each usage?

* This special handling of next_sync_start seems mighty ugly:

+            /* Also consider special wakeup time for starting sync workers. */
+            if (next_sync_start < now)
+            {
+                /*
+                 * Instead of spinning while we wait for the sync worker to
+                 * start, wait a bit before retrying (unless there's an earlier
+                 * wakeup time).
+                 */
+                nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+            }
+            else
+                nextWakeup = Min(next_sync_start, nextWakeup);

Do we really need the slop? If so, is there a reason why it
shouldn't apply to all possible sources of nextWakeup? (It's
going to be hard to fold next_sync_start into the wakeup[]
array unless you can make this not a special case.)

* It'd probably be worth enlarging the comment for
LogRepWorkerComputeNextWakeup to explain why its API is like that,
perhaps "We ask the caller to pass in the value of "now" because
this frequently avoids multiple calls of GetCurrentTimestamp().
It had better be a reasonably-up-to-date value, though."

regards, tom lane

#6Nathan Bossart
nathandbossart@gmail.com
In reply to: Tom Lane (#5)
2 attachment(s)
Re: suppressing useless wakeups in logical/worker.c

On Tue, Jan 24, 2023 at 06:45:08PM -0500, Tom Lane wrote:

I took a look through this, and have a number of mostly-cosmetic
issues:

Thanks for the detailed review.

* It seems wrong that next_sync_start isn't handled as one of the
wakeup[NUM_LRW_WAKEUPS] entries. I see that it needs to be accessed from
another module; but you could handle that without exposing either enum
LogRepWorkerWakeupReason or the array, by providing getter/setter
functions for process_syncing_tables_for_apply() to call.

* This code is far too intimately familiar with the fact that TimestampTz
is an int64 count of microseconds. (I'm picky about that because I
remember that they were once something else, so I wonder if someday
they will be different again.) You could get rid of the PG_INT64_MAX
usages by replacing those with the timestamp infinity macro DT_NOEND;
and I'd even be on board with adding a less-opaque alternate name for
that to datatype/timestamp.h. The various magic-constant multipliers
could perhaps be made less magic by using TimestampTzPlusMilliseconds().

* I think it might be better to construct the enum like this:

+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS
+#define NUM_LRW_WAKEUPS (LRW_WAKEUP_STATUS + 1)
+} LogRepWorkerWakeupReason;

so that you don't have to have a default: case in switches on the
enum value. I'm more worried about somebody adding an enum value
and missing updating a switch statement elsewhere than I am about
somebody adding an enum value and neglecting to update the
immediately-adjacent macro.

I did all of this in v3.

* The updates of "now" in LogicalRepApplyLoop seem rather
randomly placed, and I'm not entirely convinced that we'll
always be using a reasonably up-to-date value. Can't we
just update it right before each usage?

This came up for walreceiver.c, too. The concern is that
GetCurrentTimestamp() might be rather expensive on systems without
something like the vDSO. I don't know how common that is. If we can rule
that out, then I agree that we should just update it right before each use.

* This special handling of next_sync_start seems mighty ugly:

+            /* Also consider special wakeup time for starting sync workers. */
+            if (next_sync_start < now)
+            {
+                /*
+                 * Instead of spinning while we wait for the sync worker to
+                 * start, wait a bit before retrying (unless there's an earlier
+                 * wakeup time).
+                 */
+                nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+            }
+            else
+                nextWakeup = Min(next_sync_start, nextWakeup);

Do we really need the slop? If so, is there a reason why it
shouldn't apply to all possible sources of nextWakeup? (It's
going to be hard to fold next_sync_start into the wakeup[]
array unless you can make this not a special case.)

I'm not positive it is absolutely necessary. AFAICT the function that
updates this particular wakeup time is conditionally called, so it at least
seems theoretically possible that we could end up spinning in a tight loop
until we attempt to start a new tablesync worker. But perhaps this is
unlikely enough that we needn't worry about it.

I noticed that this wakeup time wasn't being updated when the number of
active tablesync workers is >= max_sync_workers_per_subscription. In v3, I
tried to handle this by setting the wakeup time to a second later for this
case. I think you could ordinarily depend on the tablesync worker's
notify_pid to wake up the apply worker, but that wouldn't work if the apply
worker has restarted.

Ultimately, this particular wakeup time seems to be a special case, and I
probably need to think about it some more. If you have ideas, I'm all
ears.

* It'd probably be worth enlarging the comment for
LogRepWorkerComputeNextWakeup to explain why its API is like that,
perhaps "We ask the caller to pass in the value of "now" because
this frequently avoids multiple calls of GetCurrentTimestamp().
It had better be a reasonably-up-to-date value, though."

I did this in v3. I noticed that many of your comments also applied to the
similar patch that was recently applied to walreceiver.c, so I created
another patch to fix that up.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v3-0001-code-review-for-05a7be9.patchtext/x-diff; charset=us-asciiDownload
From 3b464bf0ccb22e36ab627a5e19981eaf3734d4dd Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 24 Jan 2023 20:52:21 -0800
Subject: [PATCH v3 1/2] code review for 05a7be9

---
 src/backend/replication/walreceiver.c | 31 ++++++++++++++-------------
 src/include/utils/timestamp.h         |  1 +
 2 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 3876c0188d..0563bad0f6 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -122,8 +122,8 @@ typedef enum WalRcvWakeupReason
 	WALRCV_WAKEUP_TERMINATE,
 	WALRCV_WAKEUP_PING,
 	WALRCV_WAKEUP_REPLY,
-	WALRCV_WAKEUP_HSFEEDBACK,
-	NUM_WALRCV_WAKEUPS
+	WALRCV_WAKEUP_HSFEEDBACK
+#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
 } WalRcvWakeupReason;
 
 /*
@@ -525,7 +525,7 @@ WalReceiverMain(void)
 					break;
 
 				/* Find the soonest wakeup time, to limit our nap. */
-				nextWakeup = PG_INT64_MAX;
+				nextWakeup = DT_NOEND;
 				for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
 					nextWakeup = Min(wakeup[i], nextWakeup);
 
@@ -604,7 +604,7 @@ WalReceiverMain(void)
 					if (now >= wakeup[WALRCV_WAKEUP_PING])
 					{
 						requestReply = true;
-						wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
+						wakeup[WALRCV_WAKEUP_PING] = DT_NOEND;
 					}
 
 					XLogWalRcvSendReply(requestReply, requestReply);
@@ -1310,7 +1310,10 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 /*
  * Compute the next wakeup time for a given wakeup reason.  Can be called to
  * initialize a wakeup time, to adjust it for the next wakeup, or to
- * reinitialize it when GUCs have changed.
+ * reinitialize it when GUCs have changed.  We ask the caller to pass in the
+ * value of "now" because this frequently avoids multiple calls of
+ * GetCurrentTimestamp().  It had better be a reasonably up-to-date value
+ * though.
  */
 static void
 WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
@@ -1319,29 +1322,27 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
 	{
 		case WALRCV_WAKEUP_TERMINATE:
 			if (wal_receiver_timeout <= 0)
-				wakeup[reason] = PG_INT64_MAX;
+				wakeup[reason] = DT_NOEND;
 			else
-				wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
 			break;
 		case WALRCV_WAKEUP_PING:
 			if (wal_receiver_timeout <= 0)
-				wakeup[reason] = PG_INT64_MAX;
+				wakeup[reason] = DT_NOEND;
 			else
-				wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
 			break;
 		case WALRCV_WAKEUP_HSFEEDBACK:
 			if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
-				wakeup[reason] = PG_INT64_MAX;
+				wakeup[reason] = DT_NOEND;
 			else
-				wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+				wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
 			break;
 		case WALRCV_WAKEUP_REPLY:
 			if (wal_receiver_status_interval <= 0)
-				wakeup[reason] = PG_INT64_MAX;
+				wakeup[reason] = DT_NOEND;
 			else
-				wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
-			break;
-		default:
+				wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
 			break;
 	}
 }
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 42f802bb9d..1a63bc7c2d 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -82,6 +82,7 @@ IntervalPGetDatum(const Interval *X)
 #define INTERVAL_RANGE(t) (((t) >> 16) & INTERVAL_RANGE_MASK)
 
 #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000))
+#define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000))
 
 
 /* Set at postmaster start */
-- 
2.25.1

v3-0002-suppress-useless-wakeups-in-logical-worker.c.patchtext/x-diff; charset=us-asciiDownload
From 0a4d3a6c62bacd2b5592043ca4ba2408b127f1f5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 24 Jan 2023 21:12:28 -0800
Subject: [PATCH v3 2/2] suppress useless wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  28 +++
 src/backend/replication/logical/worker.c    | 192 ++++++++++++++++----
 src/include/replication/worker_internal.h   |   4 +
 src/tools/pgindent/typedefs.list            |   1 +
 4 files changed, 189 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..573b46b5a2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (LogRepWorkerGetSyncStartWakeup() <= GetCurrentTimestamp())
+		LogRepWorkerClearSyncStartWakeup();
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -592,6 +599,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
+					else
+					{
+						TimestampTz retry_time;
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						retry_time = TimestampTzPlusMilliseconds(hentry->last_start_time,
+																 wal_retrieve_retry_interval);
+						LogRepWorkerUpdateSyncStartWakeup(retry_time);
+					}
+				}
+				else
+				{
+					TimestampTz now = GetCurrentTimestamp();
+					TimestampTz retry_time;
+
+					/* Maybe there will be a free slot in a second... */
+					retry_time = TimestampTzPlusSeconds(now, 1);
+					LogRepWorkerUpdateSyncStartWakeup(retry_time);
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..83fb8c3110 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -208,8 +208,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -351,6 +349,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	LRW_WAKEUP_SYNC_START
+#define NUM_LRW_WAKEUPS (LRW_WAKEUP_SYNC_START + 1)
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -3449,10 +3467,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3482,6 +3499,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = &errcallback;
 	apply_error_context_stack = error_context_stack;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -3498,6 +3520,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
+		now = GetCurrentTimestamp();
 		if (len != 0)
 		{
 			/* Loop to process all available data (without blocking). */
@@ -3521,9 +3544,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -3577,6 +3600,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				now = GetCurrentTimestamp();
 			}
 		}
 
@@ -3615,7 +3639,33 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = DT_NOEND;
+
+			/*
+			 * Since process_syncing_tables() is called conditionally, the
+			 * tablesync worker start wakeup time might be in the past, and we
+			 * can't know for sure when it will be updated again.  Rather than
+			 * spinning in a tight loop in this case, bump this wakeup time by
+			 * a second.
+			 */
+			now = GetCurrentTimestamp();
+			if (wakeup[LRW_WAKEUP_SYNC_START] < now)
+				wakeup[LRW_WAKEUP_SYNC_START] = TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1);
+
+			/* Find soonest wakeup time, to limit our nap. */
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/*
+			 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+			 * timeouts longer than INT_MAX milliseconds, so we limit the
+			 * result accordingly.  Also, we round up to the next millisecond
+			 * to avoid waking up too early and spinning until one of the
+			 * wakeup times.
+			 */
+			wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
@@ -3623,6 +3673,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 							   fd, wait_time,
 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
 
+		now = GetCurrentTimestamp();
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
@@ -3633,6 +3684,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * LogRepWorkerComputeNextWakeup() will have cleared the tablesync
+			 * worker start wakeup time, so we might not wake up to start a new
+			 * worker at the appropriate time.  To deal with this, we set the
+			 * wakeup time to right now so that
+			 * process_syncing_tables_for_apply() recalculates it as soon as
+			 * possible.
+			 */
+			if (!am_tablesync_worker())
+				LogRepWorkerUpdateSyncStartWakeup(now);
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -3651,31 +3716,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
-			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
-
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
 
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
+			{
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = DT_NOEND;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -3711,7 +3761,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3754,10 +3803,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -5056,3 +5106,73 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_APPLY;
 	}
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.  We ask the caller to pass in the
+ * value of "now" because this frequently avoids multiple calls of
+ * GetCurrentTimestamp().  It had better be a reasonably up-to-date value
+ * though.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = DT_NOEND;
+			else
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = DT_NOEND;
+			else
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = DT_NOEND;
+			else
+				wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
+			break;
+		case LRW_WAKEUP_SYNC_START:
+			/*
+			 * This wakeup time is manually set as needed.  This function can
+			 * only be used to initialize its value.
+			 */
+			wakeup[reason] = DT_NOEND;
+			break;
+	}
+}
+
+/*
+ * Retrieve the current wakeup time for starting tablesync workers.
+ */
+TimestampTz
+LogRepWorkerGetSyncStartWakeup(void)
+{
+	return wakeup[LRW_WAKEUP_SYNC_START];
+}
+
+/*
+ * Update the current wakeup time for starting tablesync workers.  If the
+ * current wakeup time is <= next_sync_start, no action is taken.
+ */
+void
+LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start)
+{
+	if (next_sync_start < wakeup[LRW_WAKEUP_SYNC_START])
+		wakeup[LRW_WAKEUP_SYNC_START] = next_sync_start;
+}
+
+/*
+ * Clear the current wakeup time for starting tablesync workers.
+ */
+void
+LogRepWorkerClearSyncStartWakeup(void)
+{
+	wakeup[LRW_WAKEUP_SYNC_START] = DT_NOEND;
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dc87a4edd1..ae44717588 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+extern TimestampTz LogRepWorkerGetSyncStartWakeup(void);
+extern void LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start);
+extern void LogRepWorkerClearSyncStartWakeup(void);
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 51484ca7e2..0c8b6ebc4b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1435,6 +1435,7 @@ LockViewRecurse_context
 LockWaitPolicy
 LockingClause
 LogOpts
+LogRepWorkerWakeupReason
 LogStmtLevel
 LogicalDecodeBeginCB
 LogicalDecodeBeginPrepareCB
-- 
2.25.1

#7Thomas Munro
thomas.munro@gmail.com
In reply to: Nathan Bossart (#6)
Re: suppressing useless wakeups in logical/worker.c

On Thu, Jan 26, 2023 at 12:50 PM Nathan Bossart
<nathandbossart@gmail.com> wrote:

I did this in v3. I noticed that many of your comments also applied to the
similar patch that was recently applied to walreceiver.c, so I created
another patch to fix that up.

Can we also use TimestampDifferenceMilliseconds()? It knows about
rounding up for WaitLatch().

#8Nathan Bossart
nathandbossart@gmail.com
In reply to: Thomas Munro (#7)
Re: suppressing useless wakeups in logical/worker.c

On Thu, Jan 26, 2023 at 01:23:41PM +1300, Thomas Munro wrote:

Can we also use TimestampDifferenceMilliseconds()? It knows about
rounding up for WaitLatch().

I think we might risk overflowing "long" when all the wakeup times are
DT_NOEND:

* This is typically used to calculate a wait timeout for WaitLatch()
* or a related function. The choice of "long" as the result type
* is to harmonize with that. It is caller's responsibility that the
* input timestamps not be so far apart as to risk overflow of "long"
* (which'd happen at about 25 days on machines with 32-bit "long").

Maybe we can adjust that function or create a new one to deal with this.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#9Tom Lane
tgl@sss.pgh.pa.us
In reply to: Nathan Bossart (#8)
Re: suppressing useless wakeups in logical/worker.c

Nathan Bossart <nathandbossart@gmail.com> writes:

I think we might risk overflowing "long" when all the wakeup times are
DT_NOEND:

* This is typically used to calculate a wait timeout for WaitLatch()
* or a related function. The choice of "long" as the result type
* is to harmonize with that. It is caller's responsibility that the
* input timestamps not be so far apart as to risk overflow of "long"
* (which'd happen at about 25 days on machines with 32-bit "long").

Maybe we can adjust that function or create a new one to deal with this.

It'd probably be reasonable to file down that sharp edge by instead
specifying that TimestampDifferenceMilliseconds will clamp overflowing
differences to LONG_MAX. Maybe there should be a clamp on the underflow
side too ... but should it be to LONG_MIN or to zero?

BTW, as long as we're discussing roundoff gotchas, I noticed while
testing your previous patch that there's some inconsistency between
TimestampDifferenceExceeds and TimestampDifferenceMilliseconds.
What you submitted at [1]/messages/by-id/20230110174345.GA1292607@nathanxps13 did this:

+            if (TimestampDifferenceExceeds(last_start, now,
+                                           wal_retrieve_retry_interval))
+                ...
+            else
+            {
+                long        elapsed;
+
+                elapsed = TimestampDifferenceMilliseconds(last_start, now);
+                wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+            }

and I discovered that that could sometimes busy-wait by repeatedly
falling through to the "else", but then calculating elapsed ==
wal_retrieve_retry_interval and hence setting wait_time to zero.
I fixed it in the committed version [2]https://git.postgresql.org/gitweb/?p=postgresql.git&amp;a=commitdiff&amp;h=5a3a95385 by always computing "elapsed"
and then checking if that's strictly less than
wal_retrieve_retry_interval, but I bet there's existing code with the
same issue. I think we need to take a closer look at making
TimestampDifferenceMilliseconds' roundoff behavior match the outcome of
TimestampDifferenceExceeds comparisons.

regards, tom lane

[1]: /messages/by-id/20230110174345.GA1292607@nathanxps13
[2]: https://git.postgresql.org/gitweb/?p=postgresql.git&amp;a=commitdiff&amp;h=5a3a95385

#10Thomas Munro
thomas.munro@gmail.com
In reply to: Tom Lane (#9)
Re: suppressing useless wakeups in logical/worker.c

On Thu, Jan 26, 2023 at 3:28 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Nathan Bossart <nathandbossart@gmail.com> writes:

I think we might risk overflowing "long" when all the wakeup times are
DT_NOEND:

* This is typically used to calculate a wait timeout for WaitLatch()
* or a related function. The choice of "long" as the result type
* is to harmonize with that. It is caller's responsibility that the
* input timestamps not be so far apart as to risk overflow of "long"
* (which'd happen at about 25 days on machines with 32-bit "long").

Maybe we can adjust that function or create a new one to deal with this.

It'd probably be reasonable to file down that sharp edge by instead
specifying that TimestampDifferenceMilliseconds will clamp overflowing
differences to LONG_MAX. Maybe there should be a clamp on the underflow
side too ... but should it be to LONG_MIN or to zero?

That got me curious... Why did WaitLatch() use long in the first
place? I see that it was in Heikki's original sketch[1]/messages/by-id/4C72E85C.3000201@enterprisedb.com, but I can't
think of any implementation reason for it. Note that the current
implementation of WaitLatch() et al will reach WaitEventSetWait()'s
assertion that the timeout is <= INT_MAX, so a LONG_MAX clamp isn't
right without further clamping. Then internally,
WaitEventSetWaitBlock() takes an int, so there is an implicit cast to
int. If I had to guess I'd say the reasons for long in the API are
lost, and the WES rewrite used in "int" because that's what poll() and
epoll_wait() wanted.

[1]: /messages/by-id/4C72E85C.3000201@enterprisedb.com

#11Tom Lane
tgl@sss.pgh.pa.us
In reply to: Thomas Munro (#10)
Re: suppressing useless wakeups in logical/worker.c

Thomas Munro <thomas.munro@gmail.com> writes:

On Thu, Jan 26, 2023 at 3:28 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

It'd probably be reasonable to file down that sharp edge by instead
specifying that TimestampDifferenceMilliseconds will clamp overflowing
differences to LONG_MAX. Maybe there should be a clamp on the underflow
side too ... but should it be to LONG_MIN or to zero?

That got me curious... Why did WaitLatch() use long in the first
place?

Good question. It's not a great choice, because of the inherent
platform specificity. OTOH, I'm not sure it's worth the pain
to change now.

regards, tom lane

#12Tom Lane
tgl@sss.pgh.pa.us
In reply to: Tom Lane (#11)
1 attachment(s)
Re: suppressing useless wakeups in logical/worker.c

I wrote:

It'd probably be reasonable to file down that sharp edge by instead
specifying that TimestampDifferenceMilliseconds will clamp overflowing
differences to LONG_MAX. Maybe there should be a clamp on the underflow
side too ... but should it be to LONG_MIN or to zero?

After looking closer, I see that TimestampDifferenceMilliseconds
already explicitly states that its output is intended for WaitLatch
and friends, which makes it perfectly sane for it to clamp the result
to [0, INT_MAX] rather than depending on the caller to not pass
out-of-range values.

I checked existing callers, and found one place in basebackup_copy.c
that had not read the memo about TimestampDifferenceMilliseconds
never returning a negative value, and another in postmaster.c that
had not read the memo about Min() and Max() being macros. There
are none that are unhappy about clamping to INT_MAX, and at least
one that was already assuming it could just cast the result to int.

Hence, I propose the attached. I haven't gone as far as to change
the result type from long to int; that seems like a lot of code
churn (if we are to update WaitLatch and all callers to match)
and it won't really buy anything semantically.

regards, tom lane

Attachments:

fix-TimestampDifferenceMilliseconds.patchtext/x-diff; charset=us-ascii; name=fix-TimestampDifferenceMilliseconds.patchDownload
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c
index 05470057f5..2bb6c89f8c 100644
--- a/src/backend/backup/basebackup_copy.c
+++ b/src/backend/backup/basebackup_copy.c
@@ -215,7 +215,8 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
 		 * the system clock was set backward, so that such occurrences don't
 		 * have the effect of suppressing further progress messages.
 		 */
-		if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
+		if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
+			now < mysink->last_progress_report_time)
 		{
 			mysink->last_progress_report_time = now;
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5b775cf7d0..62fba5fcee 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1670,11 +1670,12 @@ DetermineSleepTime(void)
 
 	if (next_wakeup != 0)
 	{
-		/* Ensure we don't exceed one minute, or go under 0. */
-		return Max(0,
-				   Min(60 * 1000,
-					   TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
-													   next_wakeup)));
+		int			ms;
+
+		/* result of TimestampDifferenceMilliseconds is in [0, INT_MAX] */
+		ms = (int) TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
+												   next_wakeup);
+		return Min(60 * 1000, ms);
 	}
 
 	return 60 * 1000;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e95398db05..b0cfddd548 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -445,7 +445,7 @@ WalReceiverMain(void)
 				pgsocket	wait_fd = PGINVALID_SOCKET;
 				int			rc;
 				TimestampTz nextWakeup;
-				int			nap;
+				long		nap;
 
 				/*
 				 * Exit walreceiver if we're not in recovery. This should not
@@ -528,15 +528,9 @@ WalReceiverMain(void)
 				for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
 					nextWakeup = Min(wakeup[i], nextWakeup);
 
-				/*
-				 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
-				 * timeouts longer than INT_MAX milliseconds, so we limit the
-				 * result accordingly.  Also, we round up to the next
-				 * millisecond to avoid waking up too early and spinning until
-				 * one of the wakeup times.
-				 */
+				/* Calculate the nap time, clamping as necessary. */
 				now = GetCurrentTimestamp();
-				nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+				nap = TimestampDifferenceMilliseconds(now, nextWakeup);
 
 				/*
 				 * Ideally we would reuse a WaitEventSet object repeatedly
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 928c330897..b1d1963729 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1690,12 +1690,12 @@ TimestampDifference(TimestampTz start_time, TimestampTz stop_time,
  *
  * This is typically used to calculate a wait timeout for WaitLatch()
  * or a related function.  The choice of "long" as the result type
- * is to harmonize with that.  It is caller's responsibility that the
- * input timestamps not be so far apart as to risk overflow of "long"
- * (which'd happen at about 25 days on machines with 32-bit "long").
+ * is to harmonize with that; furthermore, we clamp the result to at most
+ * INT_MAX milliseconds, because that's all that WaitLatch() allows.
  *
- * Both inputs must be ordinary finite timestamps (in current usage,
- * they'll be results from GetCurrentTimestamp()).
+ * At least one input must be an ordinary finite timestamp, else the "diff"
+ * calculation might overflow.  We do support stop_time == TIMESTAMP_INFINITY,
+ * which will result in INT_MAX wait time.
  *
  * We expect start_time <= stop_time.  If not, we return zero,
  * since then we're already past the previously determined stop_time.
@@ -1710,6 +1710,8 @@ TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
 
 	if (diff <= 0)
 		return 0;
+	else if (diff >= (INT_MAX * INT64CONST(1000) - 999))
+		return (long) INT_MAX;
 	else
 		return (long) ((diff + 999) / 1000);
 }
#13Nathan Bossart
nathandbossart@gmail.com
In reply to: Tom Lane (#12)
Re: suppressing useless wakeups in logical/worker.c

On Thu, Jan 26, 2023 at 01:54:08PM -0500, Tom Lane wrote:

After looking closer, I see that TimestampDifferenceMilliseconds
already explicitly states that its output is intended for WaitLatch
and friends, which makes it perfectly sane for it to clamp the result
to [0, INT_MAX] rather than depending on the caller to not pass
out-of-range values.

+1

* This is typically used to calculate a wait timeout for WaitLatch()
* or a related function.  The choice of "long" as the result type
- * is to harmonize with that.  It is caller's responsibility that the
- * input timestamps not be so far apart as to risk overflow of "long"
- * (which'd happen at about 25 days on machines with 32-bit "long").
+ * is to harmonize with that; furthermore, we clamp the result to at most
+ * INT_MAX milliseconds, because that's all that WaitLatch() allows.
*
- * Both inputs must be ordinary finite timestamps (in current usage,
- * they'll be results from GetCurrentTimestamp()).
+ * At least one input must be an ordinary finite timestamp, else the "diff"
+ * calculation might overflow.  We do support stop_time == TIMESTAMP_INFINITY,
+ * which will result in INT_MAX wait time.

I wonder if we should explicitly reject negative timestamps to eliminate
any chance of int64 overflow, too. Alternatively, we could detect that the
operation will overflow and return either 0 or INT_MAX, but I assume
there's minimal use of this function with negative timestamps.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#14Tom Lane
tgl@sss.pgh.pa.us
In reply to: Nathan Bossart (#13)
Re: suppressing useless wakeups in logical/worker.c

Nathan Bossart <nathandbossart@gmail.com> writes:

On Thu, Jan 26, 2023 at 01:54:08PM -0500, Tom Lane wrote:

- * Both inputs must be ordinary finite timestamps (in current usage,
- * they'll be results from GetCurrentTimestamp()).
+ * At least one input must be an ordinary finite timestamp, else the "diff"
+ * calculation might overflow.  We do support stop_time == TIMESTAMP_INFINITY,
+ * which will result in INT_MAX wait time.

I wonder if we should explicitly reject negative timestamps to eliminate
any chance of int64 overflow, too.

Hmm. I'm disinclined to add an assumption that the epoch is in the past,
but I take your point that the subtraction would overflow with
TIMESTAMP_INFINITY and a negative finite timestamp. Maybe we should
make use of pg_sub_s64_overflow()?

BTW, I just noticed that the adjacent function TimestampDifference
has a lot of callers that would be much happier using
TimestampDifferenceMilliseconds.

regards, tom lane

#15Nathan Bossart
nathandbossart@gmail.com
In reply to: Tom Lane (#14)
Re: suppressing useless wakeups in logical/worker.c

On Thu, Jan 26, 2023 at 03:04:30PM -0500, Tom Lane wrote:

Nathan Bossart <nathandbossart@gmail.com> writes:

I wonder if we should explicitly reject negative timestamps to eliminate
any chance of int64 overflow, too.

Hmm. I'm disinclined to add an assumption that the epoch is in the past,
but I take your point that the subtraction would overflow with
TIMESTAMP_INFINITY and a negative finite timestamp. Maybe we should
make use of pg_sub_s64_overflow()?

That would be my vote. I think the 'diff <= 0' check might need to be
replaced with something like 'start_time > stop_time' so that we return 0
for the underflow case.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#16Tom Lane
tgl@sss.pgh.pa.us
In reply to: Nathan Bossart (#15)
1 attachment(s)
Re: suppressing useless wakeups in logical/worker.c

Nathan Bossart <nathandbossart@gmail.com> writes:

On Thu, Jan 26, 2023 at 03:04:30PM -0500, Tom Lane wrote:

Hmm. I'm disinclined to add an assumption that the epoch is in the past,
but I take your point that the subtraction would overflow with
TIMESTAMP_INFINITY and a negative finite timestamp. Maybe we should
make use of pg_sub_s64_overflow()?

That would be my vote. I think the 'diff <= 0' check might need to be
replaced with something like 'start_time > stop_time' so that we return 0
for the underflow case.

Right, so more like this.

regards, tom lane

Attachments:

fix-TimestampDifferenceMilliseconds-2.patchtext/x-diff; charset=us-ascii; name=fix-TimestampDifferenceMilliseconds-2.patchDownload
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c
index 05470057f5..2bb6c89f8c 100644
--- a/src/backend/backup/basebackup_copy.c
+++ b/src/backend/backup/basebackup_copy.c
@@ -215,7 +215,8 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
 		 * the system clock was set backward, so that such occurrences don't
 		 * have the effect of suppressing further progress messages.
 		 */
-		if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
+		if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
+			now < mysink->last_progress_report_time)
 		{
 			mysink->last_progress_report_time = now;
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5b775cf7d0..62fba5fcee 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1670,11 +1670,12 @@ DetermineSleepTime(void)
 
 	if (next_wakeup != 0)
 	{
-		/* Ensure we don't exceed one minute, or go under 0. */
-		return Max(0,
-				   Min(60 * 1000,
-					   TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
-													   next_wakeup)));
+		int			ms;
+
+		/* result of TimestampDifferenceMilliseconds is in [0, INT_MAX] */
+		ms = (int) TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
+												   next_wakeup);
+		return Min(60 * 1000, ms);
 	}
 
 	return 60 * 1000;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e95398db05..b0cfddd548 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -445,7 +445,7 @@ WalReceiverMain(void)
 				pgsocket	wait_fd = PGINVALID_SOCKET;
 				int			rc;
 				TimestampTz nextWakeup;
-				int			nap;
+				long		nap;
 
 				/*
 				 * Exit walreceiver if we're not in recovery. This should not
@@ -528,15 +528,9 @@ WalReceiverMain(void)
 				for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
 					nextWakeup = Min(wakeup[i], nextWakeup);
 
-				/*
-				 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
-				 * timeouts longer than INT_MAX milliseconds, so we limit the
-				 * result accordingly.  Also, we round up to the next
-				 * millisecond to avoid waking up too early and spinning until
-				 * one of the wakeup times.
-				 */
+				/* Calculate the nap time, clamping as necessary. */
 				now = GetCurrentTimestamp();
-				nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+				nap = TimestampDifferenceMilliseconds(now, nextWakeup);
 
 				/*
 				 * Ideally we would reuse a WaitEventSet object repeatedly
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 928c330897..47e059a409 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1690,26 +1690,31 @@ TimestampDifference(TimestampTz start_time, TimestampTz stop_time,
  *
  * This is typically used to calculate a wait timeout for WaitLatch()
  * or a related function.  The choice of "long" as the result type
- * is to harmonize with that.  It is caller's responsibility that the
- * input timestamps not be so far apart as to risk overflow of "long"
- * (which'd happen at about 25 days on machines with 32-bit "long").
- *
- * Both inputs must be ordinary finite timestamps (in current usage,
- * they'll be results from GetCurrentTimestamp()).
+ * is to harmonize with that; furthermore, we clamp the result to at most
+ * INT_MAX milliseconds, because that's all that WaitLatch() allows.
  *
  * We expect start_time <= stop_time.  If not, we return zero,
  * since then we're already past the previously determined stop_time.
  *
+ * Subtracting finite and infinite timestamps works correctly, returning
+ * zero or INT_MAX as appropriate.
+ *
  * Note we round up any fractional millisecond, since waiting for just
  * less than the intended timeout is undesirable.
  */
 long
 TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
 {
-	TimestampTz diff = stop_time - start_time;
+	TimestampTz diff;
 
-	if (diff <= 0)
+	/* Deal with zero or negative elapsed time quickly. */
+	if (start_time >= stop_time)
 		return 0;
+	/* To not fail with timestamp infinities, we must detect overflow. */
+	if (pg_sub_s64_overflow(stop_time, start_time, &diff))
+		return (long) INT_MAX;
+	if (diff >= (INT_MAX * INT64CONST(1000) - 999))
+		return (long) INT_MAX;
 	else
 		return (long) ((diff + 999) / 1000);
 }
#17Nathan Bossart
nathandbossart@gmail.com
In reply to: Tom Lane (#16)
Re: suppressing useless wakeups in logical/worker.c

On Thu, Jan 26, 2023 at 04:09:51PM -0500, Tom Lane wrote:

Right, so more like this.

LGTM

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#18Tom Lane
tgl@sss.pgh.pa.us
In reply to: Nathan Bossart (#17)
Re: suppressing useless wakeups in logical/worker.c

Nathan Bossart <nathandbossart@gmail.com> writes:

On Thu, Jan 26, 2023 at 04:09:51PM -0500, Tom Lane wrote:

Right, so more like this.

LGTM

Thanks, pushed.

Returning to the prior patch ... I don't much care for this:

+                    /* Maybe there will be a free slot in a second... */
+                    retry_time = TimestampTzPlusSeconds(now, 1);
+                    LogRepWorkerUpdateSyncStartWakeup(retry_time);

We're not moving the goalposts very far on unnecessary wakeups if
we have to do that. Do we need to get a wakeup on sync slot free?
Although having to send that to every worker seems ugly. Maybe this
is being done in the wrong place and we need to find a way to get
the launcher to handle it.

As for the business about process_syncing_tables being only called
conditionally, I was already of the opinion that the way it's
getting called is loony. Why isn't it called from LogicalRepApplyLoop
(and noplace else)? With more certainty about when it runs, we might
not need so many kluges elsewhere.

regards, tom lane

#19Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#18)
Re: suppressing useless wakeups in logical/worker.c

On Fri, Jan 27, 2023 at 4:07 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Thanks, pushed.

Returning to the prior patch ... I don't much care for this:

+                    /* Maybe there will be a free slot in a second... */
+                    retry_time = TimestampTzPlusSeconds(now, 1);
+                    LogRepWorkerUpdateSyncStartWakeup(retry_time);

We're not moving the goalposts very far on unnecessary wakeups if
we have to do that. Do we need to get a wakeup on sync slot free?
Although having to send that to every worker seems ugly. Maybe this
is being done in the wrong place and we need to find a way to get
the launcher to handle it.

As for the business about process_syncing_tables being only called
conditionally, I was already of the opinion that the way it's
getting called is loony. Why isn't it called from LogicalRepApplyLoop
(and noplace else)?

Currently, it seems to be called after processing transaction end
commands or when we are not in any transaction. As per my
understanding, that is when we can ensure the sync between tablesync
and apply worker. For example, say when tablesync worker is doing the
initial copy, the apply worker went ahead and processed some
additional xacts (WAL), now the tablesync worker needs to process all
those transactions after initial sync and before it can mark the state
as SYNCDONE. So that can be checked only at transaction boundries.

However, it is not very clear to me why the patch needs the below code.
@@ -3615,7 +3639,33 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
  if (!dlist_is_empty(&lsn_mapping))
  wait_time = WalWriterDelay;
  else
- wait_time = NAPTIME_PER_CYCLE;
+ {
+ TimestampTz nextWakeup = DT_NOEND;
+
+ /*
+ * Since process_syncing_tables() is called conditionally, the
+ * tablesync worker start wakeup time might be in the past, and we
+ * can't know for sure when it will be updated again.  Rather than
+ * spinning in a tight loop in this case, bump this wakeup time by
+ * a second.
+ */
+ now = GetCurrentTimestamp();
+ if (wakeup[LRW_WAKEUP_SYNC_START] < now)
+ wakeup[LRW_WAKEUP_SYNC_START] =
TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1);

Do we see unnecessary wakeups without this, or delay in sync?

BTW, do we need to do something about wakeups in
wait_for_relation_state_change()?

--
With Regards,
Amit Kapila.

#20Nathan Bossart
nathandbossart@gmail.com
In reply to: Amit Kapila (#19)
Re: suppressing useless wakeups in logical/worker.c

On Sat, Jan 28, 2023 at 10:26:25AM +0530, Amit Kapila wrote:

On Fri, Jan 27, 2023 at 4:07 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Returning to the prior patch ... I don't much care for this:

+                    /* Maybe there will be a free slot in a second... */
+                    retry_time = TimestampTzPlusSeconds(now, 1);
+                    LogRepWorkerUpdateSyncStartWakeup(retry_time);

We're not moving the goalposts very far on unnecessary wakeups if
we have to do that. Do we need to get a wakeup on sync slot free?
Although having to send that to every worker seems ugly. Maybe this
is being done in the wrong place and we need to find a way to get
the launcher to handle it.

It might be feasible to set up a before_shmem_exit() callback that wakes up
the apply worker (like is already done for the launcher). I think the
apply worker is ordinarily notified via the tablesync worker's notify_pid,
but AFAICT there's no guarantee that the apply worker hasn't restarted with
a different PID.

+ /*
+ * Since process_syncing_tables() is called conditionally, the
+ * tablesync worker start wakeup time might be in the past, and we
+ * can't know for sure when it will be updated again.  Rather than
+ * spinning in a tight loop in this case, bump this wakeup time by
+ * a second.
+ */
+ now = GetCurrentTimestamp();
+ if (wakeup[LRW_WAKEUP_SYNC_START] < now)
+ wakeup[LRW_WAKEUP_SYNC_START] =
TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1);

Do we see unnecessary wakeups without this, or delay in sync?

I haven't looked too cloesly to see whether busy loops are likely in
practice.

BTW, do we need to do something about wakeups in
wait_for_relation_state_change()?

... and wait_for_worker_state_change(), and copy_read_data(). From a quick
glance, it looks like fixing these would be a more invasive change. TBH
I'm beginning to wonder whether all this is really worth it to prevent
waking up once per second.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Nathan Bossart (#20)
Re: suppressing useless wakeups in logical/worker.c

On Wed, Feb 1, 2023 at 5:35 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Sat, Jan 28, 2023 at 10:26:25AM +0530, Amit Kapila wrote:

On Fri, Jan 27, 2023 at 4:07 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Returning to the prior patch ... I don't much care for this:

+                    /* Maybe there will be a free slot in a second... */
+                    retry_time = TimestampTzPlusSeconds(now, 1);
+                    LogRepWorkerUpdateSyncStartWakeup(retry_time);

We're not moving the goalposts very far on unnecessary wakeups if
we have to do that. Do we need to get a wakeup on sync slot free?
Although having to send that to every worker seems ugly. Maybe this
is being done in the wrong place and we need to find a way to get
the launcher to handle it.

It might be feasible to set up a before_shmem_exit() callback that wakes up
the apply worker (like is already done for the launcher). I think the
apply worker is ordinarily notified via the tablesync worker's notify_pid,
but AFAICT there's no guarantee that the apply worker hasn't restarted with
a different PID.

+ /*
+ * Since process_syncing_tables() is called conditionally, the
+ * tablesync worker start wakeup time might be in the past, and we
+ * can't know for sure when it will be updated again.  Rather than
+ * spinning in a tight loop in this case, bump this wakeup time by
+ * a second.
+ */
+ now = GetCurrentTimestamp();
+ if (wakeup[LRW_WAKEUP_SYNC_START] < now)
+ wakeup[LRW_WAKEUP_SYNC_START] =
TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1);

Do we see unnecessary wakeups without this, or delay in sync?

I haven't looked too cloesly to see whether busy loops are likely in
practice.

BTW, do we need to do something about wakeups in
wait_for_relation_state_change()?

... and wait_for_worker_state_change(), and copy_read_data(). From a quick
glance, it looks like fixing these would be a more invasive change.

What kind of logic do you have in mind to avoid waking up once per
second in those cases?

TBH
I'm beginning to wonder whether all this is really worth it to prevent
waking up once per second.

If we can't do it for all cases, do you see any harm in doing it for
cases where we can achieve it without adding much complexity? We can
probably add comments for others so that if someone else has better
ideas in the future we can deal with those as well.

--
With Regards,
Amit Kapila.

#22Nathan Bossart
nathandbossart@gmail.com
In reply to: Amit Kapila (#21)
1 attachment(s)
Re: suppressing useless wakeups in logical/worker.c

I've attached a minimally-updated patch that doesn't yet address the bigger
topics under discussion.

On Thu, Mar 16, 2023 at 03:30:37PM +0530, Amit Kapila wrote:

On Wed, Feb 1, 2023 at 5:35 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Sat, Jan 28, 2023 at 10:26:25AM +0530, Amit Kapila wrote:

BTW, do we need to do something about wakeups in
wait_for_relation_state_change()?

... and wait_for_worker_state_change(), and copy_read_data(). From a quick
glance, it looks like fixing these would be a more invasive change.

What kind of logic do you have in mind to avoid waking up once per
second in those cases?

I haven't looked into this too much yet. I'd probably try out Tom's
suggestions from upthread [0]/messages/by-id/3220831.1674772625@sss.pgh.pa.us next and see if those can be applied here,
too.

TBH
I'm beginning to wonder whether all this is really worth it to prevent
waking up once per second.

If we can't do it for all cases, do you see any harm in doing it for
cases where we can achieve it without adding much complexity? We can
probably add comments for others so that if someone else has better
ideas in the future we can deal with those as well.

I don't think there's any harm, but I'm also not sure it does a whole lot
of good. At the very least, I think we should figure out something better
than the process_syncing_tables() hacks before taking this patch seriously.

[0]: /messages/by-id/3220831.1674772625@sss.pgh.pa.us

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v4-0001-suppress-useless-wakeups-in-logical-worker.c.patchtext/x-diff; charset=us-asciiDownload
From d76710d4b79e173e2d1baf1af228fe7dd8927e72 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 24 Jan 2023 21:12:28 -0800
Subject: [PATCH v4 1/1] suppress useless wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  28 +++
 src/backend/replication/logical/worker.c    | 189 ++++++++++++++++----
 src/include/replication/worker_internal.h   |   4 +
 src/tools/pgindent/typedefs.list            |   1 +
 4 files changed, 186 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..573b46b5a2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (LogRepWorkerGetSyncStartWakeup() <= GetCurrentTimestamp())
+		LogRepWorkerClearSyncStartWakeup();
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -592,6 +599,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
+					else
+					{
+						TimestampTz retry_time;
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						retry_time = TimestampTzPlusMilliseconds(hentry->last_start_time,
+																 wal_retrieve_retry_interval);
+						LogRepWorkerUpdateSyncStartWakeup(retry_time);
+					}
+				}
+				else
+				{
+					TimestampTz now = GetCurrentTimestamp();
+					TimestampTz retry_time;
+
+					/* Maybe there will be a free slot in a second... */
+					retry_time = TimestampTzPlusSeconds(now, 1);
+					LogRepWorkerUpdateSyncStartWakeup(retry_time);
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 10f9711972..8e68540b6f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -208,8 +208,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -351,6 +349,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	LRW_WAKEUP_SYNC_START
+#define NUM_LRW_WAKEUPS (LRW_WAKEUP_SYNC_START + 1)
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -3441,10 +3459,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3474,6 +3491,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = &errcallback;
 	apply_error_context_stack = error_context_stack;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -3513,9 +3535,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					now = GetCurrentTimestamp();
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -3607,7 +3630,29 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = TIMESTAMP_INFINITY;
+
+			/*
+			 * Since process_syncing_tables() is called conditionally, the
+			 * tablesync worker start wakeup time might be in the past, and we
+			 * can't know for sure when it will be updated again.  Rather than
+			 * spinning in a tight loop in this case, bump this wakeup time by
+			 * a second.
+			 */
+			now = GetCurrentTimestamp();
+			if (wakeup[LRW_WAKEUP_SYNC_START] < now)
+				wakeup[LRW_WAKEUP_SYNC_START] = TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1);
+
+			/* Find soonest wakeup time, to limit our nap. */
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/*
+			 * Calculate the nap time, clamping as necessary.
+			 */
+			wait_time = TimestampDifferenceMilliseconds(now, nextWakeup);
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
@@ -3625,6 +3670,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			/* recompute wakeup times */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * LogRepWorkerComputeNextWakeup() will have cleared the tablesync
+			 * worker start wakeup time, so we might not wake up to start a new
+			 * worker at the appropriate time.  To deal with this, we set the
+			 * wakeup time to right now so that
+			 * process_syncing_tables_for_apply() recalculates it as soon as
+			 * possible.
+			 */
+			if (!am_tablesync_worker())
+				LogRepWorkerUpdateSyncStartWakeup(now);
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -3643,31 +3703,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
+			now = GetCurrentTimestamp();
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
+
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
 			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
-
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
-
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = TIMESTAMP_INFINITY;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -3703,7 +3749,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3746,10 +3791,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -5048,3 +5094,74 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_APPLY;
 	}
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.  We ask the caller to pass in the
+ * value of "now" because this frequently avoids multiple calls of
+ * GetCurrentTimestamp().  It had better be a reasonably up-to-date value
+ * though.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = TIMESTAMP_INFINITY;
+			else
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = TIMESTAMP_INFINITY;
+			else
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = TIMESTAMP_INFINITY;
+			else
+				wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
+			break;
+		case LRW_WAKEUP_SYNC_START:
+			/*
+			 * This wakeup time is manually set as needed.  This function can
+			 * only be used to initialize its value.
+			 */
+			wakeup[reason] = TIMESTAMP_INFINITY;
+			break;
+			/* there's intentionally no default: here */
+	}
+}
+
+/*
+ * Retrieve the current wakeup time for starting tablesync workers.
+ */
+TimestampTz
+LogRepWorkerGetSyncStartWakeup(void)
+{
+	return wakeup[LRW_WAKEUP_SYNC_START];
+}
+
+/*
+ * Update the current wakeup time for starting tablesync workers.  If the
+ * current wakeup time is <= next_sync_start, no action is taken.
+ */
+void
+LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start)
+{
+	if (next_sync_start < wakeup[LRW_WAKEUP_SYNC_START])
+		wakeup[LRW_WAKEUP_SYNC_START] = next_sync_start;
+}
+
+/*
+ * Clear the current wakeup time for starting tablesync workers.
+ */
+void
+LogRepWorkerClearSyncStartWakeup(void)
+{
+	wakeup[LRW_WAKEUP_SYNC_START] = TIMESTAMP_INFINITY;
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dc87a4edd1..ae44717588 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+extern TimestampTz LogRepWorkerGetSyncStartWakeup(void);
+extern void LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start);
+extern void LogRepWorkerClearSyncStartWakeup(void);
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 097f42e1b3..66b699852a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1443,6 +1443,7 @@ LockViewRecurse_context
 LockWaitPolicy
 LockingClause
 LogOpts
+LogRepWorkerWakeupReason
 LogStmtLevel
 LogicalDecodeBeginCB
 LogicalDecodeBeginPrepareCB
-- 
2.25.1

#23Amit Kapila
amit.kapila16@gmail.com
In reply to: Nathan Bossart (#22)
Re: suppressing useless wakeups in logical/worker.c

On Fri, Mar 17, 2023 at 5:52 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

I've attached a minimally-updated patch that doesn't yet address the bigger
topics under discussion.

On Thu, Mar 16, 2023 at 03:30:37PM +0530, Amit Kapila wrote:

On Wed, Feb 1, 2023 at 5:35 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Sat, Jan 28, 2023 at 10:26:25AM +0530, Amit Kapila wrote:

BTW, do we need to do something about wakeups in
wait_for_relation_state_change()?

... and wait_for_worker_state_change(), and copy_read_data(). From a quick
glance, it looks like fixing these would be a more invasive change.

What kind of logic do you have in mind to avoid waking up once per
second in those cases?

I haven't looked into this too much yet. I'd probably try out Tom's
suggestions from upthread [0] next and see if those can be applied here,
too.

For the clean exit of tablesync worker, we already wake up the apply
worker in finish_sync_worker(). You probably want to deal with error
cases or is there something else on your mind? BTW, for
wait_for_worker_state_change(), one possibility is to wake all the
sync workers when apply worker exits but not sure if that is a very
good idea.

Few minor comments:
=====================
1.
- if (wal_receiver_timeout > 0)
+ now = GetCurrentTimestamp();
+ if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating logical replication worker due to timeout")));
+
+ /* Check to see if it's time for a ping. */
+ if (now >= wakeup[LRW_WAKEUP_PING])
  {
- TimestampTz now = GetCurrentTimestamp();

Previously, we use to call GetCurrentTimestamp() only when
wal_receiver_timeout > 0 but we ignore that part now. It may not
matter much but if possible let's avoid calling GetCurrentTimestamp()
at additional places.

2.
+ for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+ LogRepWorkerComputeNextWakeup(i, now);
+
+ /*
+ * LogRepWorkerComputeNextWakeup() will have cleared the tablesync
+ * worker start wakeup time, so we might not wake up to start a new
+ * worker at the appropriate time.  To deal with this, we set the
+ * wakeup time to right now so that
+ * process_syncing_tables_for_apply() recalculates it as soon as
+ * possible.
+ */
+ if (!am_tablesync_worker())
+ LogRepWorkerUpdateSyncStartWakeup(now);

Can't we avoid clearing syncstart time in the first place?

--
With Regards,
Amit Kapila.

#24Daniel Gustafsson
daniel@yesql.se
In reply to: Amit Kapila (#23)
Re: suppressing useless wakeups in logical/worker.c

On 17 Mar 2023, at 10:16, Amit Kapila <amit.kapila16@gmail.com> wrote:

Few minor comments:

Have you had a chance to address the comments raised by Amit in this thread?

--
Daniel Gustafsson

#25Nathan Bossart
nathandbossart@gmail.com
In reply to: Daniel Gustafsson (#24)
Re: suppressing useless wakeups in logical/worker.c

On Tue, Jul 04, 2023 at 09:48:23AM +0200, Daniel Gustafsson wrote:

On 17 Mar 2023, at 10:16, Amit Kapila <amit.kapila16@gmail.com> wrote:

Few minor comments:

Have you had a chance to address the comments raised by Amit in this thread?

Not yet, sorry.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com