From 35e9815dec8e1770d380d6b2446b41010b1007a0 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm@gmail.com>
Date: Mon, 8 Feb 2021 18:13:46 +0100
Subject: [PATCH v5 4/6] Add a io_target column to the copy progress view

This allows filtering on IO target for progress reporting, and allows for
further filtering of COPY commands. Additionally, this allows for
identification of logical replication's initial table synchronization
background workers at the subscriber side through io_target = CALLBACK, as it
is the only current supplier of a callback datasource.
---
 doc/src/sgml/monitoring.sgml         | 13 +++++++++++++
 src/backend/catalog/system_views.sql |  5 +++++
 src/backend/commands/copyfrom.c      | 22 ++++++++++++++++++++++
 src/backend/commands/copyto.c        | 26 +++++++++++++++++++++++++-
 src/include/commands/progress.h      |  7 +++++++
 src/test/regress/expected/rules.out  |  7 +++++++
 6 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index ca84b53896..3c39c82f1a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6554,6 +6554,19 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>io_target</structfield> <type>text</type>
+      </para>
+      <para>
+       The io target that the data is read from or written to: 
+       <literal>FILE</literal>, <literal>PROGRAM</literal>, 
+       <literal>STDIO</literal> (for COPY FROM STDIN and COPY TO STDOUT),
+       or <literal>CALLBACK</literal> (used in the table synchronization
+       background worker).
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>bytes_processed</structfield> <type>bigint</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 1082b7d253..6a3ac47b85 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1132,6 +1132,11 @@ CREATE VIEW pg_stat_progress_copy AS
         CASE S.param5 WHEN 1 THEN 'COPY FROM'
                       WHEN 2 THEN 'COPY TO'
                       END AS command,
+        CASE S.param6 WHEN 1 THEN 'FILE'
+                      WHEN 2 THEN 'PROGRAM'
+                      WHEN 3 THEN 'STDIO'
+                      WHEN 4 THEN 'CALLBACK'
+                      END AS io_target,
         S.param1 AS bytes_processed,
         S.param2 AS bytes_total,
         S.param3 AS tuples_processed,
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ce343dbf80..bf952fa293 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1429,6 +1429,7 @@ BeginCopyFrom(ParseState *pstate,
 	pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
 								  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
 	pgstat_progress_update_param(PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_FROM);
+
 	cstate->bytes_processed = 0;
 
 	/* We keep those variables in cstate. */
@@ -1506,6 +1507,27 @@ BeginCopyFrom(ParseState *pstate,
 		ReceiveCopyBinaryHeader(cstate);
 	}
 
+	{
+		int64 io_target;
+		switch (cstate->copy_src)
+		{
+			case COPY_FILE:
+				if (is_program)
+					io_target = PROGRESS_COPY_IO_TARGET_PROGRAM;
+				else
+					io_target = PROGRESS_COPY_IO_TARGET_FILE;
+				break;
+			case COPY_OLD_FE:
+			case COPY_NEW_FE:
+				io_target = PROGRESS_COPY_IO_TARGET_STDIO;
+				break;
+			case COPY_CALLBACK:
+				io_target = PROGRESS_COPY_IO_TARGET_CALLBACK;
+				break;
+		}
+		pgstat_progress_update_param(PROGRESS_COPY_IO_TARGET, io_target);
+	}
+
 	/* create workspace for CopyReadAttributes results */
 	if (!cstate->opts.binary)
 	{
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 534c091c75..42c4a828df 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -772,7 +772,31 @@ BeginCopyTo(ParseState *pstate,
 	/* initialize progress */
 	pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
 								  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
-	pgstat_progress_update_param(PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_TO);
+	{
+		const int progress_index[] = {
+			PROGRESS_COPY_COMMAND,
+			PROGRESS_COPY_IO_TARGET
+		};
+		int64 progress_vals[] = {
+			PROGRESS_COPY_COMMAND_TO,
+			0
+		};
+		switch (cstate->copy_dest)
+		{
+			case COPY_FILE:
+				if (is_program)
+					progress_vals[1] = PROGRESS_COPY_IO_TARGET_PROGRAM;
+				else
+					progress_vals[1] = PROGRESS_COPY_IO_TARGET_FILE;
+				break;
+			case COPY_OLD_FE:
+			case COPY_NEW_FE:
+				progress_vals[1] = PROGRESS_COPY_IO_TARGET_STDIO;
+				break;
+		}
+		pgstat_progress_update_multi_param(2, progress_index, progress_vals);
+	}
+
 	cstate->bytes_processed = 0;
 
 	MemoryContextSwitchTo(oldcontext);
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 1c30d09abb..e003217554 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -139,9 +139,16 @@
 #define PROGRESS_COPY_TUPLES_PROCESSED 2
 #define PROGRESS_COPY_TUPLES_EXCLUDED 3
 #define PROGRESS_COPY_COMMAND 4
+#define PROGRESS_COPY_IO_TARGET 5
 
 /* Commands of PROGRESS_COPY_COMMAND */
 #define PROGRESS_COPY_COMMAND_FROM 1
 #define PROGRESS_COPY_COMMAND_TO 2
 
+/* Types of PROGRESS_COPY_INOUT_TYPE */
+#define PROGRESS_COPY_IO_TARGET_FILE 1
+#define PROGRESS_COPY_IO_TARGET_PROGRAM 2
+#define PROGRESS_COPY_IO_TARGET_STDIO 3
+#define PROGRESS_COPY_IO_TARGET_CALLBACK 4
+
 #endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 63b5e33083..0698c71d23 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1953,6 +1953,13 @@ pg_stat_progress_copy| SELECT s.pid,
             WHEN 2 THEN 'COPY TO'::text
             ELSE NULL::text
         END AS command,
+        CASE s.param6
+            WHEN 1 THEN 'FILE'::text
+            WHEN 2 THEN 'PROGRAM'::text
+            WHEN 3 THEN 'STDIO'::text
+            WHEN 4 THEN 'CALLBACK'::text
+            ELSE NULL::text
+        END AS io_target,
     s.param1 AS bytes_processed,
     s.param2 AS bytes_total,
     s.param3 AS tuples_processed,
-- 
2.20.1

