From e4e7df78c059b9c888544918789654ebdde3e2eb Mon Sep 17 00:00:00 2001
From: Masahiro Ikeda <ikedamsh@oss.nttdata.com>
Date: Tue, 2 Mar 2021 16:36:17 +0900
Subject: [PATCH 2/2] Makes the wal receiver report WAL statistics

This patch makes the WAL receiver report WAL statistics.

- fundamentally changes how the stats collector interacts
  with the WAL receiver.

- unifying the logic to collect xlog write stats for the
  WAL receiver and the others to avoid duplicate logic.

Author: Masahiro Ikeda
Reviewed-By: Japin Li, Hayato Kuroda, Masahiko Sawada, David Johnston,
Fujii Masao
Discussion:
https://postgr.es/m/0509ad67b585a5b86a83d445dfa75392@oss.nttdata.com
---
 doc/src/sgml/monitoring.sgml          |  4 ++
 src/backend/access/transam/xlog.c     | 64 +++++++++++++++++----------
 src/backend/replication/walreceiver.c | 12 ++++-
 src/include/access/xlog.h             |  1 +
 4 files changed, 57 insertions(+), 24 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index a16be45a71..66525e184f 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3495,6 +3495,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        Number of times WAL buffers were written out to disk via
        <function>XLogWrite</function>, which is invoked during an
        <function>XLogFlush</function> request (see <xref linkend="wal-configuration"/>)
+       , or WAL data written out to disk by the WAL receiver.
       </para></entry>
      </row>
 
@@ -3506,6 +3507,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        Total amount of time spent writing WAL buffers were written out to disk via
        <function>XLogWrite</function>, which is invoked during an
        <function>XLogFlush</function> request (see <xref linkend="wal-configuration"/>),
+       or WAL data written out to disk by the WAL receiver, 
        excluding sync time unless 
        <xref linkend="guc-wal-sync-method"/> is either <literal>open_datasync</literal> or 
        <literal>open_sync</literal>. Units are in milliseconds with microsecond resolution.
@@ -3521,6 +3523,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        Number of times WAL files were synced to disk via 
        <function>issue_xlog_fsync</function>, which is invoked during an 
        <function>XLogFlush</function> request (see <xref linkend="wal-configuration"/>)
+       by backends and background processes including the WAL receiver
        while <xref linkend="guc-wal-sync-method"/> was set to one of the 
        "sync at commit" options (i.e., <literal>fdatasync</literal>, 
        <literal>fsync</literal>, or <literal>fsync_writethrough</literal>).
@@ -3535,6 +3538,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        Total amount of time spent syncing WAL files to disk via
        <function>issue_xlog_fsync</function>, which is invoked during an 
        <function>XLogFlush</function> request (see <xref linkend="wal-configuration"/>)
+       by backends and background processes including the WAL receiver
        while <xref linkend="guc-wal-sync-method"/> was set to one of the 
        "sync at commit" options (i.e., <literal>fdatasync</literal>, 
        <literal>fsync</literal>, or <literal>fsync_writethrough</literal>).
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 98f558b4c7..c220b6b776 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2534,7 +2534,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			Size		nbytes;
 			Size		nleft;
 			int			written;
-			instr_time	start;
 
 			/* OK to write the page(s) */
 			from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
@@ -2544,28 +2543,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			{
 				errno = 0;
 
-				/* Measure I/O timing to write WAL data */
-				if (track_wal_io_timing)
-					INSTR_TIME_SET_CURRENT(start);
-
-				pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
-				written = pg_pwrite(openLogFile, from, nleft, startoffset);
-				pgstat_report_wait_end();
-
-				/*
-				 * Increment the I/O timing and the number of times WAL data
-				 * were written out to disk.
-				 */
-				if (track_wal_io_timing)
-				{
-					instr_time	duration;
-
-					INSTR_TIME_SET_CURRENT(duration);
-					INSTR_TIME_SUBTRACT(duration, start);
-					WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
-				}
-
-				WalStats.m_wal_write++;
+				written = XLogWriteFile(openLogFile, from, nleft, startoffset);
 
 				if (written <= 0)
 				{
@@ -2705,6 +2683,46 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	}
 }
 
+/*
+ * Issue pg_pwrite to write an XLOG file.
+ *
+ * 'fd' is a file descriptor for the XLOG file to write
+ * 'buf' is a buffer starting address to write.
+ * 'nbyte' is a number of max bytes to write up.
+ * 'offset' is a offset of XLOG file to be set.
+ */
+int
+XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset)
+{
+	int written;
+	instr_time	start;
+
+	/* Measure I/O timing to write WAL data */
+	if (track_wal_io_timing)
+		INSTR_TIME_SET_CURRENT(start);
+
+	pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
+	written = pg_pwrite(fd, buf, nbyte, offset);
+	pgstat_report_wait_end();
+
+	/*
+	 * Increment the I/O timing and the number of times WAL data were
+	 * written out to disk.
+	 */
+	if (track_wal_io_timing)
+	{
+		instr_time	duration;
+
+		INSTR_TIME_SET_CURRENT(duration);
+		INSTR_TIME_SUBTRACT(duration, start);
+		WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
+	}
+
+	WalStats.m_wal_write++;
+
+	return written;
+}
+
 /*
  * Record the LSN for an asynchronous transaction commit/abort
  * and nudge the WALWriter if there is work for it to do.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7810ee916c..f9834b8302 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -770,6 +770,9 @@ WalRcvDie(int code, Datum arg)
 	/* Ensure that all WAL records received are flushed to disk */
 	XLogWalRcvFlush(true);
 
+	/* Send WAL statistics to the stats collector before terminating */
+	pgstat_send_wal();
+
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -907,6 +910,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 					XLogArchiveForceDone(xlogfname);
 				else
 					XLogArchiveNotify(xlogfname);
+
+				/*
+				 * Send WAL statistics to the stats collector when finishing
+				 * the current WAL segment file to avoid overloading it.
+				 */
+				pgstat_send_wal();
 			}
 			recvFile = -1;
 
@@ -928,7 +937,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 		/* OK to write the logs */
 		errno = 0;
 
-		byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
+		byteswritten = XLogWriteFile(recvFile, buf, segbytes, (off_t) startoff);
+
 		if (byteswritten <= 0)
 		{
 			char		xlogfname[MAXFNAMELEN];
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1e53d9d4ca..b345de8a28 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -290,6 +290,7 @@ extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
 extern int	XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
 extern int	XLogFileOpen(XLogSegNo segno);
+extern int	XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern XLogSegNo XLogGetLastRemovedSegno(void);
-- 
2.25.1

