From 3fa8bd30c15a99919601283f59f579e64b03c121 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Tue, 25 Feb 2025 10:18:05 +0000
Subject: [PATCH v4 1/2] Flush the IO statistics of active walsenders

The walsender does not flush its IO statistics until it exits.
The issue is there since pg_stat_io has been introduced in a9c70b46dbe.
This commits:

1. ensures it does not wait to exit to flush its IO statistics
2. flush its IO statistics periodically to not overload the WAL sender
3. adds a test for a physical walsender (a logical walsender had the same issue
but the fix is in the same code path)
---
 src/backend/replication/walsender.c   | 60 +++++++++++++++++++--------
 src/backend/utils/activity/pgstat.c   |  2 -
 src/include/pgstat.h                  |  2 +
 src/test/recovery/t/001_stream_rep.pl | 16 +++++++
 4 files changed, 60 insertions(+), 20 deletions(-)
  70.5% src/backend/replication/
   3.1% src/backend/utils/activity/
   3.6% src/include/
  22.6% src/test/recovery/t/

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 446d10c1a7d..9b44d4ae600 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -90,6 +90,7 @@
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -2740,6 +2741,8 @@ WalSndCheckTimeOut(void)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	TimestampTz last_flush = 0;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2834,30 +2837,51 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * WalSndWaitForWal() handle any other blocking; idle receivers need
 		 * its additional actions.  For physical replication, also block if
 		 * caught up; its send_data does not block.
+		 *
+		 * When the WAL sender is caught up or has pending data to send, we
+		 * also periodically report I/O statistics. It's done periodically to
+		 * not overload the WAL sender.
 		 */
-		if ((WalSndCaughtUp && send_data != XLogSendLogical &&
-			 !streamingDoneSending) ||
-			pq_is_send_pending())
+		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
 		{
-			long		sleeptime;
-			int			wakeEvents;
+			TimestampTz now;
 
-			if (!streamingDoneReceiving)
-				wakeEvents = WL_SOCKET_READABLE;
-			else
-				wakeEvents = 0;
+			now = GetCurrentTimestamp();
 
-			/*
-			 * Use fresh timestamp, not last_processing, to reduce the chance
-			 * of reaching wal_sender_timeout before sending a keepalive.
-			 */
-			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+			if (TimestampDifferenceExceeds(last_flush, now, PGSTAT_MIN_INTERVAL))
+			{
+				/*
+				 * Report IO statistics
+				 */
+				pgstat_flush_io(false);
+				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+				last_flush = now;
+			}
 
-			if (pq_is_send_pending())
-				wakeEvents |= WL_SOCKET_WRITEABLE;
+			if (send_data != XLogSendLogical || pq_is_send_pending())
+			{
+				long		sleeptime;
+				int			wakeEvents;
+
+				if (!streamingDoneReceiving)
+					wakeEvents = WL_SOCKET_READABLE;
+				else
+					wakeEvents = 0;
+
+				/*
+				 * Use fresh timestamp, not last_processing, to reduce the
+				 * chance of reaching wal_sender_timeout before sending a
+				 * keepalive.
+				 */
+				sleeptime = WalSndComputeSleeptime(now);
+
+				if (pq_is_send_pending())
+					wakeEvents |= WL_SOCKET_WRITEABLE;
+
+				/* Sleep until something happens or we time out */
+				WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
+			}
 
-			/* Sleep until something happens or we time out */
-			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
 	}
 }
diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c
index 3168b825e25..1bf84cbf64e 100644
--- a/src/backend/utils/activity/pgstat.c
+++ b/src/backend/utils/activity/pgstat.c
@@ -123,8 +123,6 @@
  * ----------
  */
 
-/* minimum interval non-forced stats flushes.*/
-#define PGSTAT_MIN_INTERVAL			1000
 /* how long until to block flushing pending stats updates */
 #define PGSTAT_MAX_INTERVAL			60000
 /* when to call pgstat_report_stat() again, even when idle */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index def6b370ac1..d1b15bf7757 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -22,6 +22,8 @@
 #include "utils/relcache.h"
 #include "utils/wait_event.h"	/* for backward compatibility */	/* IWYU pragma: export */
 
+/* minimum interval non-forced stats flushes.*/
+#define PGSTAT_MIN_INTERVAL                 1000
 
 /* ----------
  * Paths for the statistics files (relative to installation's $PGDATA).
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 3945f00ab88..3371895ab1d 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
 	has_streaming => 1);
 $node_standby_2->start;
 
+# To check that an active walsender updates its IO statistics below.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 # Create some content on primary and check its presence in standby nodes
 $node_primary->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -329,6 +332,19 @@ $node_primary->psql(
 
 note "switching to physical replication slot";
 
+# Wait for the walsender to update its IO statistics.
+# Has to be done before the next restart and far enough from the
+# pg_stat_reset_shared('io') to minimize the risk of polling for too long.
+$node_primary->poll_query_until(
+	'postgres',
+	qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # Switch to using a physical replication slot. We can do this without a new
 # backup since physical slots can go backwards if needed. Do so on both
 # standbys. Since we're going to be testing things that affect the slot state,
-- 
2.34.1

