From 3e4ea302e9ae554cae76e7a4dc4f90b4503cf842 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Wed, 28 Aug 2024 12:59:02 +0000
Subject: [PATCH v1 2/2] Add pg_stat_get_backend_io()

Adding the pg_stat_get_backend_io() function to retrieve I/O statistics for
a particular backend pid. Note this function behaves as if stats_fetch_consistency
is set to none.
---
 doc/src/sgml/monitoring.sgml           |  18 ++++
 src/backend/utils/activity/pgstat_io.c |   6 ++
 src/backend/utils/adt/pgstatfuncs.c    | 120 +++++++++++++++++++++++++
 src/include/catalog/pg_proc.dat        |   8 ++
 src/include/pgstat.h                   |   1 +
 src/test/regress/expected/stats.out    |  25 ++++++
 src/test/regress/sql/stats.sql         |  16 +++-
 7 files changed, 193 insertions(+), 1 deletion(-)
  10.8% doc/src/sgml/
  47.6% src/backend/utils/adt/
   9.2% src/include/catalog/
  15.3% src/test/regress/expected/
  14.4% src/test/regress/sql/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 27d2548d61..ad89d9caa7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4713,6 +4713,24 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_stat_get_backend_io</primary>
+        </indexterm>
+        <function>pg_stat_get_backend_io</function> ( <type>integer</type> )
+        <returnvalue>setof record</returnvalue>
+       </para>
+       <para>
+        Returns I/O statistics about the backend with the specified
+        process ID. The output fields are exactly the same as the ones in the
+        <link linkend="monitoring-pg-stat-io-view"> <structname>pg_stat_io</structname></link>
+        view. This function behaves as if <varname>stats_fetch_consistency</varname>
+        is set to <literal>none</literal> (means each execution re-fetches
+        counters from shared memory).
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c
index 43280f4892..8fe8c224a9 100644
--- a/src/backend/utils/activity/pgstat_io.c
+++ b/src/backend/utils/activity/pgstat_io.c
@@ -170,6 +170,12 @@ pgstat_fetch_my_stat_io(void)
 	return &pgStatLocal.snapshot.my_io;
 }
 
+PgStat_IO *
+pgstat_fetch_proc_stat_io(ProcNumber procNumber)
+{
+	return &pgStatLocal.shmem->io.stats[procNumber];
+}
+
 /*
  * Flush out locally pending IO statistics
  *
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 58e321f421..502b9662a1 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1474,6 +1474,126 @@ pg_stat_get_io(PG_FUNCTION_ARGS)
 	return (Datum) 0;
 }
 
+Datum
+pg_stat_get_backend_io(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo;
+	PgStat_IO  *backends_io_stats;
+	Datum		reset_time;
+	ProcNumber	procNumber;
+	PGPROC	   *proc;
+
+	int			backend_pid = PG_GETARG_INT32(0);
+
+	rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	InitMaterializedSRF(fcinfo, 0);
+
+	proc = BackendPidGetProc(backend_pid);
+
+	/* This backend_pid does not exist */
+
+	if (proc != NULL)
+	{
+		procNumber = GetNumberFromPGProc(proc);
+		backends_io_stats = pgstat_fetch_proc_stat_io(procNumber);
+		reset_time = TimestampTzGetDatum(backends_io_stats->stat_reset_timestamp);
+
+		for (int bktype = 0; bktype < BACKEND_NUM_TYPES; bktype++)
+		{
+			Datum		bktype_desc = CStringGetTextDatum(GetBackendTypeDesc(bktype));
+			PgStat_BktypeIO *bktype_stats = &backends_io_stats->stats[bktype];
+
+			/*
+			 * In Assert builds, we can afford an extra loop through all of
+			 * the counters checking that only expected stats are non-zero,
+			 * since it keeps the non-Assert code cleaner.
+			 */
+			Assert(pgstat_bktype_io_stats_valid(bktype_stats, bktype));
+
+			/*
+			 * For those BackendTypes without IO Operation stats, skip
+			 * representing them in the view altogether.
+			 */
+			if (!pgstat_tracks_io_bktype(bktype))
+				continue;
+
+			for (int io_obj = 0; io_obj < IOOBJECT_NUM_TYPES; io_obj++)
+			{
+				const char *obj_name = pgstat_get_io_object_name(io_obj);
+
+				for (int io_context = 0; io_context < IOCONTEXT_NUM_TYPES; io_context++)
+				{
+					const char *context_name = pgstat_get_io_context_name(io_context);
+
+					Datum		values[IO_NUM_COLUMNS] = {0};
+					bool		nulls[IO_NUM_COLUMNS] = {0};
+
+					/*
+					 * Some combinations of BackendType, IOObject, and
+					 * IOContext are not valid for any type of IOOp. In such
+					 * cases, omit the entire row from the view.
+					 */
+					if (!pgstat_tracks_io_object(bktype, io_obj, io_context))
+						continue;
+
+					values[IO_COL_BACKEND_TYPE] = bktype_desc;
+					values[IO_COL_CONTEXT] = CStringGetTextDatum(context_name);
+					values[IO_COL_OBJECT] = CStringGetTextDatum(obj_name);
+					values[IO_COL_RESET_TIME] = TimestampTzGetDatum(reset_time);
+
+					/*
+					 * Hard-code this to the value of BLCKSZ for now. Future
+					 * values could include XLOG_BLCKSZ, once WAL IO is
+					 * tracked, and constant multipliers, once
+					 * non-block-oriented IO (e.g. temporary file IO) is
+					 * tracked.
+					 */
+					values[IO_COL_CONVERSION] = Int64GetDatum(BLCKSZ);
+
+					for (int io_op = 0; io_op < IOOP_NUM_TYPES; io_op++)
+					{
+						int			op_idx = pgstat_get_io_op_index(io_op);
+						int			time_idx = pgstat_get_io_time_index(io_op);
+
+						/*
+						 * Some combinations of BackendType and IOOp, of
+						 * IOContext and IOOp, and of IOObject and IOOp are
+						 * not tracked. Set these cells in the view NULL.
+						 */
+						if (pgstat_tracks_io_op(bktype, io_obj, io_context, io_op))
+						{
+							PgStat_Counter count =
+								bktype_stats->counts[io_obj][io_context][io_op];
+
+							values[op_idx] = Int64GetDatum(count);
+						}
+						else
+							nulls[op_idx] = true;
+
+						/* not every operation is timed */
+						if (time_idx == IO_COL_INVALID)
+							continue;
+
+						if (!nulls[op_idx])
+						{
+							PgStat_Counter time =
+								bktype_stats->times[io_obj][io_context][io_op];
+
+							values[time_idx] = Float8GetDatum(pg_stat_us_to_ms(time));
+						}
+						else
+							nulls[time_idx] = true;
+					}
+
+					tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+										 values, nulls);
+				}
+			}
+		}
+	}
+	return (Datum) 0;
+}
+
 /*
  * Returns statistics of WAL activity
  */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1184341fc8..8c1e2d6a46 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5830,6 +5830,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{is_global,backend_type,object,context,reads,read_time,writes,write_time,writebacks,writeback_time,extends,extend_time,op_bytes,hits,evictions,reuses,fsyncs,fsync_time,stats_reset}',
   prosrc => 'pg_stat_get_io' },
+{ oid => '8896', descr => 'statistics: per backend type IO statistics',
+  proname => 'pg_stat_get_backend_io', prorows => '30', proretset => 't',
+  provolatile => 'v', proparallel => 'r', prorettype => 'record',
+  proargtypes => 'int4',
+  proallargtypes => '{int4,text,text,text,int8,float8,int8,float8,int8,float8,int8,float8,int8,int8,int8,int8,int8,float8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{backend_pid,backend_type,object,context,reads,read_time,writes,write_time,writebacks,writeback_time,extends,extend_time,op_bytes,hits,evictions,reuses,fsyncs,fsync_time,stats_reset}',
+  prosrc => 'pg_stat_get_backend_io' },
 
 { oid => '1136', descr => 'statistics: information about WAL activity',
   proname => 'pg_stat_get_wal', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9ec4053a3e..963fa36be9 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -561,6 +561,7 @@ extern void pgstat_count_io_op_time(IOObject io_object, IOContext io_context,
 
 extern PgStat_IO *pgstat_fetch_global_stat_io(void);
 extern PgStat_IO *pgstat_fetch_my_stat_io(void);
+extern PgStat_IO *pgstat_fetch_proc_stat_io(ProcNumber  procNumber);
 extern const char *pgstat_get_io_context_name(IOContext io_context);
 extern const char *pgstat_get_io_object_name(IOObject io_object);
 
diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out
index c489e528e0..aced015c2f 100644
--- a/src/test/regress/expected/stats.out
+++ b/src/test/regress/expected/stats.out
@@ -1263,12 +1263,18 @@ SELECT sum(extends) AS io_sum_shared_before_extends
   FROM pg_stat_io WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(extends) AS my_io_sum_shared_before_extends
   FROM pg_my_stat_io WHERE context = 'normal' AND object = 'relation' \gset
+SELECT sum(extends) AS backend_io_sum_shared_before_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_stat_io
   WHERE object = 'relation' \gset io_sum_shared_before_
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_my_stat_io
   WHERE object = 'relation' \gset my_io_sum_shared_before_
+SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE object = 'relation' \gset backend_io_sum_shared_before_
 CREATE TABLE test_io_shared(a int);
 INSERT INTO test_io_shared SELECT i FROM generate_series(1,100)i;
 SELECT pg_stat_force_next_flush();
@@ -1293,6 +1299,15 @@ SELECT :my_io_sum_shared_after_extends > :my_io_sum_shared_before_extends;
  t
 (1 row)
 
+SELECT sum(extends) AS backend_io_sum_shared_after_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
+SELECT :backend_io_sum_shared_after_extends > :backend_io_sum_shared_before_extends;
+ ?column? 
+----------
+ t
+(1 row)
+
 -- After a checkpoint, there should be some additional IOCONTEXT_NORMAL writes
 -- and fsyncs in the global stats (not for the backend).
 -- See comment above for rationale for two explicit CHECKPOINTs.
@@ -1553,6 +1568,8 @@ SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) +
   FROM pg_stat_io \gset
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS my_io_stats_pre_reset
   FROM pg_my_stat_io \gset
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_pre_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
 SELECT pg_stat_reset_shared('io');
  pg_stat_reset_shared 
 ----------------------
@@ -1575,6 +1592,14 @@ SELECT :my_io_stats_post_reset < :my_io_stats_pre_reset;
  t
 (1 row)
 
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_post_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
+SELECT :backend_io_stats_post_reset < :backend_io_stats_pre_reset;
+ ?column? 
+----------
+ t
+(1 row)
+
 -- test BRIN index doesn't block HOT update
 CREATE TABLE brin_hot (
   id  integer PRIMARY KEY,
diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql
index c95cb71652..d05009e1f5 100644
--- a/src/test/regress/sql/stats.sql
+++ b/src/test/regress/sql/stats.sql
@@ -611,12 +611,18 @@ SELECT sum(extends) AS io_sum_shared_before_extends
   FROM pg_stat_io WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(extends) AS my_io_sum_shared_before_extends
   FROM pg_my_stat_io WHERE context = 'normal' AND object = 'relation' \gset
+SELECT sum(extends) AS backend_io_sum_shared_before_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_stat_io
   WHERE object = 'relation' \gset io_sum_shared_before_
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_my_stat_io
   WHERE object = 'relation' \gset my_io_sum_shared_before_
+SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE object = 'relation' \gset backend_io_sum_shared_before_
 CREATE TABLE test_io_shared(a int);
 INSERT INTO test_io_shared SELECT i FROM generate_series(1,100)i;
 SELECT pg_stat_force_next_flush();
@@ -626,6 +632,10 @@ SELECT :io_sum_shared_after_extends > :io_sum_shared_before_extends;
 SELECT sum(extends) AS my_io_sum_shared_after_extends
   FROM pg_my_stat_io WHERE context = 'normal' AND object = 'relation' \gset
 SELECT :my_io_sum_shared_after_extends > :my_io_sum_shared_before_extends;
+SELECT sum(extends) AS backend_io_sum_shared_after_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
+SELECT :backend_io_sum_shared_after_extends > :backend_io_sum_shared_before_extends;
 
 -- After a checkpoint, there should be some additional IOCONTEXT_NORMAL writes
 -- and fsyncs in the global stats (not for the backend).
@@ -778,6 +788,8 @@ SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) +
   FROM pg_stat_io \gset
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS my_io_stats_pre_reset
   FROM pg_my_stat_io \gset
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_pre_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
 SELECT pg_stat_reset_shared('io');
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS io_stats_post_reset
   FROM pg_stat_io \gset
@@ -785,7 +797,9 @@ SELECT :io_stats_post_reset < :io_stats_pre_reset;
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS my_io_stats_post_reset
   FROM pg_my_stat_io \gset
 SELECT :my_io_stats_post_reset < :my_io_stats_pre_reset;
-
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_post_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
+SELECT :backend_io_stats_post_reset < :backend_io_stats_pre_reset;
 
 -- test BRIN index doesn't block HOT update
 CREATE TABLE brin_hot (
-- 
2.34.1

