From 32e8bf3e84d58b43c8ab5888eb909e86bdf0ceea Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm@gmail.com>
Date: Fri, 12 Feb 2021 14:06:44 +0100
Subject: [PATCH 1/6] Add progress-reported components for COPY progress
 reporting

The command, io target and excluded tuple count (by COPY ... FROM ... s'
WHERE -clause) are now reported in the pg_stat_progress_copy view.
Additionally, the column lines_processed is renamed to tuples_processed to
disambiguate the meaning of this column in cases of CSV and BINARY copies and
to stay consistent with regards to names in the pg_stat_progress_*-views.

Of special interest is the reporting of io_target, with which we can
distinguish logical replications' initial table synchronization workers'
progress without having to join the pg_stat_activity view.
---
 doc/src/sgml/monitoring.sgml         | 37 ++++++++++++++++++++++++++--
 src/backend/catalog/system_views.sql | 11 ++++++++-
 src/backend/commands/copyfrom.c      | 30 +++++++++++++++++++++-
 src/backend/commands/copyto.c        | 29 ++++++++++++++++++++--
 src/include/commands/progress.h      | 15 ++++++++++-
 src/test/regress/expected/rules.out  | 15 ++++++++++-
 6 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index c602ee4427..3c39c82f1a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6544,6 +6544,29 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+       The command that is running: <literal>COPY FROM</literal>, or
+       <literal>COPY TO</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>io_target</structfield> <type>text</type>
+      </para>
+      <para>
+       The io target that the data is read from or written to: 
+       <literal>FILE</literal>, <literal>PROGRAM</literal>, 
+       <literal>STDIO</literal> (for COPY FROM STDIN and COPY TO STDOUT),
+       or <literal>CALLBACK</literal> (used in the table synchronization
+       background worker).
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>bytes_processed</structfield> <type>bigint</type>
@@ -6565,10 +6588,20 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>lines_processed</structfield> <type>bigint</type>
+       <structfield>tuples_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of tuples already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>tuples_excluded</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of lines already processed by <command>COPY</command> command.
+       Number of tuples not processed because they were excluded by the
+       <command>WHERE</command> clause of the <command>COPY</command> command.
       </para></entry>
      </row>
     </tbody>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd9d7..6a3ac47b85 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1129,9 +1129,18 @@ CREATE VIEW pg_stat_progress_copy AS
     SELECT
         S.pid AS pid, S.datid AS datid, D.datname AS datname,
         S.relid AS relid,
+        CASE S.param5 WHEN 1 THEN 'COPY FROM'
+                      WHEN 2 THEN 'COPY TO'
+                      END AS command,
+        CASE S.param6 WHEN 1 THEN 'FILE'
+                      WHEN 2 THEN 'PROGRAM'
+                      WHEN 3 THEN 'STDIO'
+                      WHEN 4 THEN 'CALLBACK'
+                      END AS io_target,
         S.param1 AS bytes_processed,
         S.param2 AS bytes_total,
-        S.param3 AS lines_processed
+        S.param3 AS tuples_processed,
+        S.param4 AS tuples_excluded
     FROM pg_stat_get_progress_info('COPY') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 796ca7b3f7..c3610eb67e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -540,6 +540,7 @@ CopyFrom(CopyFromState cstate)
 	CopyInsertMethod insertMethod;
 	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	uint64		processed = 0;
+	uint64		excluded = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
@@ -869,7 +870,11 @@ CopyFrom(CopyFromState cstate)
 			econtext->ecxt_scantuple = myslot;
 			/* Skip items that don't match COPY's WHERE clause */
 			if (!ExecQual(cstate->qualexpr, econtext))
+			{
+				/* Report that this tuple was filtered out by the WHERE clause */
+				pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, ++excluded);
 				continue;
+			}
 		}
 
 		/* Determine the partition to insert the tuple into */
@@ -1107,7 +1112,7 @@ CopyFrom(CopyFromState cstate)
 			 * for counting tuples inserted by an INSERT command. Update
 			 * progress of the COPY command as well.
 			 */
-			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed);
 		}
 	}
 
@@ -1424,6 +1429,8 @@ BeginCopyFrom(ParseState *pstate,
 	/* initialize progress */
 	pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
 								  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	pgstat_progress_update_param(PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_FROM);
+
 	cstate->bytes_processed = 0;
 
 	/* We keep those variables in cstate. */
@@ -1501,6 +1508,27 @@ BeginCopyFrom(ParseState *pstate,
 		ReceiveCopyBinaryHeader(cstate);
 	}
 
+	{
+		int64 io_target;
+		switch (cstate->copy_src)
+		{
+			case COPY_FILE:
+				if (is_program)
+					io_target = PROGRESS_COPY_IO_TARGET_PROGRAM;
+				else
+					io_target = PROGRESS_COPY_IO_TARGET_FILE;
+				break;
+			case COPY_OLD_FE:
+			case COPY_NEW_FE:
+				io_target = PROGRESS_COPY_IO_TARGET_STDIO;
+				break;
+			case COPY_CALLBACK:
+				io_target = PROGRESS_COPY_IO_TARGET_CALLBACK;
+				break;
+		}
+		pgstat_progress_update_param(PROGRESS_COPY_IO_TARGET, io_target);
+	}
+
 	/* create workspace for CopyReadAttributes results */
 	if (!cstate->opts.binary)
 	{
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index e04ec1e331..42c4a828df 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -772,6 +772,31 @@ BeginCopyTo(ParseState *pstate,
 	/* initialize progress */
 	pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
 								  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	{
+		const int progress_index[] = {
+			PROGRESS_COPY_COMMAND,
+			PROGRESS_COPY_IO_TARGET
+		};
+		int64 progress_vals[] = {
+			PROGRESS_COPY_COMMAND_TO,
+			0
+		};
+		switch (cstate->copy_dest)
+		{
+			case COPY_FILE:
+				if (is_program)
+					progress_vals[1] = PROGRESS_COPY_IO_TARGET_PROGRAM;
+				else
+					progress_vals[1] = PROGRESS_COPY_IO_TARGET_FILE;
+				break;
+			case COPY_OLD_FE:
+			case COPY_NEW_FE:
+				progress_vals[1] = PROGRESS_COPY_IO_TARGET_STDIO;
+				break;
+		}
+		pgstat_progress_update_multi_param(2, progress_index, progress_vals);
+	}
+
 	cstate->bytes_processed = 0;
 
 	MemoryContextSwitchTo(oldcontext);
@@ -954,7 +979,7 @@ CopyTo(CopyToState cstate)
 			CopyOneRowTo(cstate, slot);
 
 			/* Increment amount of processed tuples and update the progress */
-			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed);
 		}
 
 		ExecDropSingleTupleTableSlot(slot);
@@ -1321,7 +1346,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	CopyOneRowTo(cstate, slot);
 
 	/* Increment amount of processed tuples and update the progress */
-	pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
+	pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++myState->processed);
 
 	return true;
 }
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 95ec5d02e9..e003217554 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -136,6 +136,19 @@
 /* Commands of PROGRESS_COPY */
 #define PROGRESS_COPY_BYTES_PROCESSED 0
 #define PROGRESS_COPY_BYTES_TOTAL 1
-#define PROGRESS_COPY_LINES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_EXCLUDED 3
+#define PROGRESS_COPY_COMMAND 4
+#define PROGRESS_COPY_IO_TARGET 5
+
+/* Commands of PROGRESS_COPY_COMMAND */
+#define PROGRESS_COPY_COMMAND_FROM 1
+#define PROGRESS_COPY_COMMAND_TO 2
+
+/* Types of PROGRESS_COPY_INOUT_TYPE */
+#define PROGRESS_COPY_IO_TARGET_FILE 1
+#define PROGRESS_COPY_IO_TARGET_PROGRAM 2
+#define PROGRESS_COPY_IO_TARGET_STDIO 3
+#define PROGRESS_COPY_IO_TARGET_CALLBACK 4
 
 #endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 10a1f34ebc..3c6776429d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1949,9 +1949,22 @@ pg_stat_progress_copy| SELECT s.pid,
     s.datid,
     d.datname,
     s.relid,
+        CASE s.param5
+            WHEN 1 THEN 'COPY FROM'::text
+            WHEN 2 THEN 'COPY TO'::text
+            ELSE NULL::text
+        END AS command,
+        CASE s.param6
+            WHEN 1 THEN 'FILE'::text
+            WHEN 2 THEN 'PROGRAM'::text
+            WHEN 3 THEN 'STDIO'::text
+            WHEN 4 THEN 'CALLBACK'::text
+            ELSE NULL::text
+        END AS io_target,
     s.param1 AS bytes_processed,
     s.param2 AS bytes_total,
-    s.param3 AS lines_processed
+    s.param3 AS tuples_processed,
+    s.param4 AS tuples_excluded
    FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
-- 
2.26.2

