Add 'worker_type' to pg_stat_subscription

Started by Peter Smithover 2 years ago34 messages
#1Peter Smith
smithpb2250@gmail.com
1 attachment(s)

Hi hackers,

Earlier this year I proposed a small change for the pg_stat_subscription view:

------
...it would be very useful to have an additional "kind" attribute for
this view. This will save the user from needing to do mental
gymnastics every time just to recognise what kind of process they are
looking at.
------

At that time Amit replied [1]/messages/by-id/CAA4eK1JO54=3s0KM9iZGSrQmmfzk9PEOKkW8TXjo2OKaKrSGCA@mail.gmail.com that this could be posted as a separate
enhancement thread.

Now that the LogicalRepWorkerType has been recently pushed [2]https://github.com/postgres/postgres/commit/2a8b40e3681921943a2989fd4ec6cdbf8766566c
(something with changes in the same area of the code) it seemed the
right time to resurrect my pg_stat_subscription proposal.

PSA patch v1.

Thoughts?

------
[1]: /messages/by-id/CAA4eK1JO54=3s0KM9iZGSrQmmfzk9PEOKkW8TXjo2OKaKrSGCA@mail.gmail.com
[2]: https://github.com/postgres/postgres/commit/2a8b40e3681921943a2989fd4ec6cdbf8766566c

Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v1-0001-Add-worker_type-to-pg_stat_subscription.patchapplication/octet-stream; name=v1-0001-Add-worker_type-to-pg_stat_subscription.patchDownload
From b63ba0a637cd10ec341d807d9eaf4279142ced12 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 16 Aug 2023 19:02:36 +1000
Subject: [PATCH v1] Add worker_type to pg_stat_subscription

---
 doc/src/sgml/monitoring.sgml               | 26 ++++++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 55 +++++++++++++++++++++---------
 src/include/catalog/pg_proc.dat            |  6 ++--
 src/test/regress/expected/rules.out        |  3 +-
 5 files changed, 70 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index f4fc5d8..ad25b5c 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,32 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process. Possible values are:
+       <itemizedlist>
+        <listitem>
+        <para>
+          <literal>a</literal>: apply worker
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>p</literal>: parallel apply worker
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>t</literal>: tablesync worker
+         </para>
+        </listitem>
+       </itemizedlist>
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>leader_pid</structfield> <type>integer</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index af65af6..166ad5f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -950,6 +950,7 @@ CREATE VIEW pg_stat_subscription AS
             su.oid AS subid,
             su.subname,
             st.pid,
+			st.worker_type,
             st.leader_pid,
             st.relid,
             st.received_lsn,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16..95f87d5 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1272,7 +1272,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1298,40 +1298,61 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		if (OidIsValid(subid) && worker.subid != subid)
 			continue;
 
+		values[0] = ObjectIdGetDatum(worker.subid);
+
 		worker_pid = worker.proc->pid;
+		values[1] = Int32GetDatum(worker_pid);
+
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[2] = CStringGetTextDatum("a");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[2] = CStringGetTextDatum("p");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[2] = CStringGetTextDatum("t");
+				break;
+			case WORKERTYPE_UNKNOWN: /* should not be possible */
+				nulls[2] = true;
+		}
 
-		values[0] = ObjectIdGetDatum(worker.subid);
-		if (isTablesyncWorker(&worker))
-			values[1] = ObjectIdGetDatum(worker.relid);
-		else
-			nulls[1] = true;
-		values[2] = Int32GetDatum(worker_pid);
 
 		if (isParallelApplyWorker(&worker))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
 
-		if (XLogRecPtrIsInvalid(worker.last_lsn))
+		if (isTablesyncWorker(&worker))
+			values[4] = ObjectIdGetDatum(worker.relid);
+		else
 			nulls[4] = true;
+
+		if (XLogRecPtrIsInvalid(worker.last_lsn))
+			nulls[5] = true;
 		else
-			values[4] = LSNGetDatum(worker.last_lsn);
+			values[5] = LSNGetDatum(worker.last_lsn);
+
 		if (worker.last_send_time == 0)
-			nulls[5] = true;
+			nulls[6] = true;
 		else
-			values[5] = TimestampTzGetDatum(worker.last_send_time);
+			values[6] = TimestampTzGetDatum(worker.last_send_time);
+
 		if (worker.last_recv_time == 0)
-			nulls[6] = true;
+			nulls[7] = true;
 		else
-			values[6] = TimestampTzGetDatum(worker.last_recv_time);
+			values[7] = TimestampTzGetDatum(worker.last_recv_time);
+
 		if (XLogRecPtrIsInvalid(worker.reply_lsn))
-			nulls[7] = true;
+			nulls[8] = true;
 		else
-			values[7] = LSNGetDatum(worker.reply_lsn);
+			values[8] = LSNGetDatum(worker.reply_lsn);
+
 		if (worker.reply_time == 0)
-			nulls[8] = true;
+			nulls[9] = true;
 		else
-			values[8] = TimestampTzGetDatum(worker.reply_time);
+			values[9] = TimestampTzGetDatum(worker.reply_time);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073..a3455f2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5466,9 +5466,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,int4,text,int4,oid,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,pid,worker_type,leader_pid,relid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e07afcd..82935e8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2119,6 +2119,7 @@ pg_stat_ssl| SELECT pid,
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,
+    st.worker_type,
     st.leader_pid,
     st.relid,
     st.received_lsn,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, pid, worker_type, leader_pid, relid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
-- 
1.8.3.1

#2Nathan Bossart
nathandbossart@gmail.com
In reply to: Peter Smith (#1)
Re: Add 'worker_type' to pg_stat_subscription

On Wed, Aug 16, 2023 at 07:14:18PM +1000, Peter Smith wrote:

Earlier this year I proposed a small change for the pg_stat_subscription view:

------
...it would be very useful to have an additional "kind" attribute for
this view. This will save the user from needing to do mental
gymnastics every time just to recognise what kind of process they are
looking at.
------

At that time Amit replied [1] that this could be posted as a separate
enhancement thread.

Now that the LogicalRepWorkerType has been recently pushed [2]
(something with changes in the same area of the code) it seemed the
right time to resurrect my pg_stat_subscription proposal.

This sounds generally reasonable to me.

      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process. Possible values are:
+       <itemizedlist>
+        <listitem>
+        <para>
+          <literal>a</literal>: apply worker
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>p</literal>: parallel apply worker
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>t</literal>: tablesync worker
+         </para>
+        </listitem>
+       </itemizedlist>
+      </para></entry>
+     </row>

Is there any reason not to spell out the names? I think that would match
the other system views better (e.g., backend_type in pg_stat_activity).
Also, instead of "tablesync worker", I'd suggest using "synchronization
worker" to match the name used elsewhere in this table.

I see that the table refers to "leader apply workers". Would those show up
as parallel apply workers in the view? Can we add another worker type for
those?

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#3Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#2)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Sat, Sep 2, 2023 at 7:41 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

Thanks for your interest in this patch.

Is there any reason not to spell out the names? I think that would match
the other system views better (e.g., backend_type in pg_stat_activity).

I had thought it might be simpler in case someone wanted to query by
type. But your suggestion for consistency is probably better, so I
changed to do it that way. The help is also simplified to match the
other 'backend_type' you cited.

Also, instead of "tablesync worker", I'd suggest using "synchronization
worker" to match the name used elsewhere in this table.

Changed to "table synchronization worker".

I see that the table refers to "leader apply workers". Would those show up
as parallel apply workers in the view? Can we add another worker type for
those?

Internally there are only 3 worker types: A "leader" apply worker is
basically the same as a regular apply worker, except it has other
parallel apply workers associated with it.

I felt that pretending there are 4 types in the view would be
confusing. Instead, I just removed the word "leader". Now there are:
"apply worker"
"parallel apply worker"
"table synchronization worker"

PSA patch v2.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v2-0001-Add-worker_type-to-pg_stat_subscription.patchapplication/x-patch; name=v2-0001-Add-worker_type-to-pg_stat_subscription.patchDownload
From bac581d9f6843b3df0dd5fc45e318594a7921ee6 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 4 Sep 2023 13:58:16 +1000
Subject: [PATCH v2] Add worker_type to pg_stat_subscription

---
 doc/src/sgml/monitoring.sgml               | 12 +++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 55 +++++++++++++++++++++---------
 src/include/catalog/pg_proc.dat            |  6 ++--
 src/test/regress/expected/rules.out        |  3 +-
 5 files changed, 56 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d..45b9ccf 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2004,6 +2004,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process. Possible types are:
+       <literal>apply worker</literal>,
+       <literal>parallel apply worker</literal>,
+       <literal>table synchronization worker</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>leader_pid</structfield> <type>integer</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2..3b7f5c2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -950,6 +950,7 @@ CREATE VIEW pg_stat_subscription AS
             su.oid AS subid,
             su.subname,
             st.pid,
+			st.worker_type,
             st.leader_pid,
             st.relid,
             st.received_lsn,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc9..0e2fbaf 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1304,40 +1304,61 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		if (OidIsValid(subid) && worker.subid != subid)
 			continue;
 
+		values[0] = ObjectIdGetDatum(worker.subid);
+
 		worker_pid = worker.proc->pid;
+		values[1] = Int32GetDatum(worker_pid);
+
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[2] = CStringGetTextDatum("apply worker");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[2] = CStringGetTextDatum("parallel apply worker");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[2] = CStringGetTextDatum("table synchronization worker");
+				break;
+			case WORKERTYPE_UNKNOWN: /* should not be possible */
+				nulls[2] = true;
+		}
 
-		values[0] = ObjectIdGetDatum(worker.subid);
-		if (isTablesyncWorker(&worker))
-			values[1] = ObjectIdGetDatum(worker.relid);
-		else
-			nulls[1] = true;
-		values[2] = Int32GetDatum(worker_pid);
 
 		if (isParallelApplyWorker(&worker))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
 
-		if (XLogRecPtrIsInvalid(worker.last_lsn))
+		if (isTablesyncWorker(&worker))
+			values[4] = ObjectIdGetDatum(worker.relid);
+		else
 			nulls[4] = true;
+
+		if (XLogRecPtrIsInvalid(worker.last_lsn))
+			nulls[5] = true;
 		else
-			values[4] = LSNGetDatum(worker.last_lsn);
+			values[5] = LSNGetDatum(worker.last_lsn);
+
 		if (worker.last_send_time == 0)
-			nulls[5] = true;
+			nulls[6] = true;
 		else
-			values[5] = TimestampTzGetDatum(worker.last_send_time);
+			values[6] = TimestampTzGetDatum(worker.last_send_time);
+
 		if (worker.last_recv_time == 0)
-			nulls[6] = true;
+			nulls[7] = true;
 		else
-			values[6] = TimestampTzGetDatum(worker.last_recv_time);
+			values[7] = TimestampTzGetDatum(worker.last_recv_time);
+
 		if (XLogRecPtrIsInvalid(worker.reply_lsn))
-			nulls[7] = true;
+			nulls[8] = true;
 		else
-			values[7] = LSNGetDatum(worker.reply_lsn);
+			values[8] = LSNGetDatum(worker.reply_lsn);
+
 		if (worker.reply_time == 0)
-			nulls[8] = true;
+			nulls[9] = true;
 		else
-			values[8] = TimestampTzGetDatum(worker.reply_time);
+			values[9] = TimestampTzGetDatum(worker.reply_time);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6..e65edf9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,int4,text,int4,oid,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,pid,worker_type,leader_pid,relid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5..f1f7f13 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2119,6 +2119,7 @@ pg_stat_ssl| SELECT pid,
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,
+    st.worker_type,
     st.leader_pid,
     st.relid,
     st.received_lsn,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, pid, worker_type, leader_pid, relid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
-- 
1.8.3.1

#4Nathan Bossart
nathandbossart@gmail.com
In reply to: Peter Smith (#3)
Re: Add 'worker_type' to pg_stat_subscription

On Wed, Sep 06, 2023 at 09:02:21AM +1200, Peter Smith wrote:

On Sat, Sep 2, 2023 at 7:41 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

I see that the table refers to "leader apply workers". Would those show up
as parallel apply workers in the view? Can we add another worker type for
those?

Internally there are only 3 worker types: A "leader" apply worker is
basically the same as a regular apply worker, except it has other
parallel apply workers associated with it.

I felt that pretending there are 4 types in the view would be
confusing. Instead, I just removed the word "leader". Now there are:
"apply worker"
"parallel apply worker"
"table synchronization worker"

Okay. Should we omit "worker" for each of the types? Since these are the
values for the "worker_type" column, it seems a bit redundant. For
example, we don't add "backend" to the end of each value for backend_type
in pg_stat_activity.

I wonder if we could add the new field to the end of
pg_stat_get_subscription() so that we could simplify this patch a bit. At
the moment, a big chunk of it is dedicated to reordering the values.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#5Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#4)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Wed, Sep 6, 2023 at 9:49 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Wed, Sep 06, 2023 at 09:02:21AM +1200, Peter Smith wrote:

On Sat, Sep 2, 2023 at 7:41 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

I see that the table refers to "leader apply workers". Would those show up
as parallel apply workers in the view? Can we add another worker type for
those?

Internally there are only 3 worker types: A "leader" apply worker is
basically the same as a regular apply worker, except it has other
parallel apply workers associated with it.

I felt that pretending there are 4 types in the view would be
confusing. Instead, I just removed the word "leader". Now there are:
"apply worker"
"parallel apply worker"
"table synchronization worker"

Okay. Should we omit "worker" for each of the types? Since these are the
values for the "worker_type" column, it seems a bit redundant. For
example, we don't add "backend" to the end of each value for backend_type
in pg_stat_activity.

I wonder if we could add the new field to the end of
pg_stat_get_subscription() so that we could simplify this patch a bit. At
the moment, a big chunk of it is dedicated to reordering the values.

Modified as suggested. PSA v3.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v3-0001-Add-worker_type-to-pg_stat_subscription.patchapplication/octet-stream; name=v3-0001-Add-worker_type-to-pg_stat_subscription.patchDownload
From c0079e0671cdb80eea4cc1ef1678b989ac42243e Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 7 Sep 2023 10:25:52 +1000
Subject: [PATCH v3] Add worker_type to pg_stat_subscription

---
 doc/src/sgml/monitoring.sgml               | 12 ++++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 27 ++++++++++++++++++++++++---
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 5 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d..eaf51f5 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2004,6 +2004,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process. Possible types are:
+       <literal>apply</literal>,
+       <literal>parallel</literal>,
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>leader_pid</structfield> <type>integer</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2..3b7f5c2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -950,6 +950,7 @@ CREATE VIEW pg_stat_subscription AS
             su.oid AS subid,
             su.subname,
             st.pid,
+			st.worker_type,
             st.leader_pid,
             st.relid,
             st.received_lsn,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc9..a645a5a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1304,13 +1304,14 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		if (OidIsValid(subid) && worker.subid != subid)
 			continue;
 
-		worker_pid = worker.proc->pid;
-
 		values[0] = ObjectIdGetDatum(worker.subid);
+
 		if (isTablesyncWorker(&worker))
 			values[1] = ObjectIdGetDatum(worker.relid);
 		else
 			nulls[1] = true;
+
+		worker_pid = worker.proc->pid;
 		values[2] = Int32GetDatum(worker_pid);
 
 		if (isParallelApplyWorker(&worker))
@@ -1322,23 +1323,43 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			nulls[4] = true;
 		else
 			values[4] = LSNGetDatum(worker.last_lsn);
+
 		if (worker.last_send_time == 0)
 			nulls[5] = true;
 		else
 			values[5] = TimestampTzGetDatum(worker.last_send_time);
+
 		if (worker.last_recv_time == 0)
 			nulls[6] = true;
 		else
 			values[6] = TimestampTzGetDatum(worker.last_recv_time);
+
 		if (XLogRecPtrIsInvalid(worker.reply_lsn))
 			nulls[7] = true;
 		else
 			values[7] = LSNGetDatum(worker.reply_lsn);
+
 		if (worker.reply_time == 0)
 			nulls[8] = true;
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN: /* should not be possible */
+				nulls[9] = true;
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6..f0b7b9c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5..ea173a2 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2119,6 +2119,7 @@ pg_stat_ssl| SELECT pid,
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,
+    st.worker_type,
     st.leader_pid,
     st.relid,
     st.received_lsn,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
-- 
1.8.3.1

#6Nathan Bossart
nathandbossart@gmail.com
In reply to: Peter Smith (#5)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 07, 2023 at 12:36:29PM +1200, Peter Smith wrote:

Modified as suggested. PSA v3.

Thanks. I've attached v4 with a couple of small changes. Notably, I've
moved the worker_type column to before the pid column, as it felt more
natural to me to keep the PID columns together. I've also added an
elog(ERROR, ...) for WORKERTYPE_UNKNOWN, as that seems to be the standard
practice elsewhere. That being said, are we absolutely confident that this
really cannot happen? I haven't looked too closely, but if there is a
small race or something that could cause us to see a worker with this type,
perhaps it would be better to actually list it as "unknown". Thoughts?

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v4-0001-add-worker-type-to-pg_stat_subscription.patchtext/x-diff; charset=us-asciiDownload
From 85b826752b1b10176809918975ce7b495dadd3db Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Thu, 7 Sep 2023 15:24:23 -0700
Subject: [PATCH v4 1/1] add worker type to pg_stat_subscription

---
 doc/src/sgml/monitoring.sgml               | 11 +++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 5 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..17f9323f23 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..501910b445 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
-- 
2.25.1

#7Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#6)
Re: Add 'worker_type' to pg_stat_subscription

On Fri, Sep 8, 2023 at 8:28 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Thu, Sep 07, 2023 at 12:36:29PM +1200, Peter Smith wrote:

Modified as suggested. PSA v3.

Thanks. I've attached v4 with a couple of small changes. Notably, I've
moved the worker_type column to before the pid column, as it felt more
natural to me to keep the PID columns together. I've also added an
elog(ERROR, ...) for WORKERTYPE_UNKNOWN, as that seems to be the standard
practice elsewhere.
That being said, are we absolutely confident that this
really cannot happen? I haven't looked too closely, but if there is a
small race or something that could cause us to see a worker with this type,
perhaps it would be better to actually list it as "unknown". Thoughts?

The type is only assigned during worker process launch, and during
process cleanup [1]https://github.com/search?q=repo%3Apostgres%2Fpostgres%20%20worker-%3Etype&amp;type=code. It's only possible to be UNKNOWN after
logicalrep_worker_cleanup().

AFAIK the stats can never see a worker with an UNKNOWN type, although
it was due to excessive caution against something unforeseen that my
original code did below instead of the elog.

+ case WORKERTYPE_UNKNOWN: /* should not be possible */
+ nulls[9] = true;

Adding "unknown" for something that is supposed to be impossible might
be slight overkill, but so long as there is no obligation to write
about "unknown" in the PG DOCS then I agree it is probably better to
do that,

------
[1]: https://github.com/search?q=repo%3Apostgres%2Fpostgres%20%20worker-%3Etype&amp;type=code

Kind Regards,
Peter Smith.
Fujitsu Australia

#8Michael Paquier
michael@paquier.xyz
In reply to: Peter Smith (#7)
Re: Add 'worker_type' to pg_stat_subscription

On Tue, Sep 12, 2023 at 01:07:51PM +1000, Peter Smith wrote:

The type is only assigned during worker process launch, and during
process cleanup [1]. It's only possible to be UNKNOWN after
logicalrep_worker_cleanup().

AFAIK the stats can never see a worker with an UNKNOWN type, although
it was due to excessive caution against something unforeseen that my
original code did below instead of the elog.

+ case WORKERTYPE_UNKNOWN: /* should not be possible */
+ nulls[9] = true;

Adding "unknown" for something that is supposed to be impossible might
be slight overkill, but so long as there is no obligation to write
about "unknown" in the PG DOCS then I agree it is probably better to
do that,

Using an elog() is OK IMO. pg_stat_get_subscription() holds
LogicalRepWorkerLock in shared mode, and the only code path setting a
worker to WORKERTYPE_UNKNOWN requires that this same LWLock is hold in
exclusive mode while resetting all the shmem fields of the
subscription entry cleaned up, which is what
pg_stat_get_subscription() uses to check if a subscription should be
included in its SRF.

Shouldn't this patch add or tweak some SQL queries in
src/test/subscription/ to show some worker types, at least?
--
Michael

#9Peter Smith
smithpb2250@gmail.com
In reply to: Michael Paquier (#8)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Tue, Sep 12, 2023 at 1:44 PM Michael Paquier <michael@paquier.xyz> wrote:

On Tue, Sep 12, 2023 at 01:07:51PM +1000, Peter Smith wrote:

The type is only assigned during worker process launch, and during
process cleanup [1]. It's only possible to be UNKNOWN after
logicalrep_worker_cleanup().

AFAIK the stats can never see a worker with an UNKNOWN type, although
it was due to excessive caution against something unforeseen that my
original code did below instead of the elog.

+ case WORKERTYPE_UNKNOWN: /* should not be possible */
+ nulls[9] = true;

Adding "unknown" for something that is supposed to be impossible might
be slight overkill, but so long as there is no obligation to write
about "unknown" in the PG DOCS then I agree it is probably better to
do that,

Using an elog() is OK IMO. pg_stat_get_subscription() holds
LogicalRepWorkerLock in shared mode, and the only code path setting a
worker to WORKERTYPE_UNKNOWN requires that this same LWLock is hold in
exclusive mode while resetting all the shmem fields of the
subscription entry cleaned up, which is what
pg_stat_get_subscription() uses to check if a subscription should be
included in its SRF.

Shouldn't this patch add or tweak some SQL queries in
src/test/subscription/ to show some worker types, at least?

Right. I found just a single test currently using pg_stat_subscription
catalog. I added a worker_type check for that.

PSA v5

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v5-0001-add-worker-type-to-pg_stat_subscription.patchapplication/octet-stream; name=v5-0001-add-worker-type-to-pg_stat_subscription.patchDownload
From a411f12b3f731702b6681bb9dde4f46ba324e587 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 12 Sep 2023 18:37:57 +1000
Subject: [PATCH v5] add worker type to pg_stat_subscription

---
 doc/src/sgml/monitoring.sgml               | 11 +++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 src/test/subscription/t/004_sync.pl        |  4 ++++
 6 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d..17f9323 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1995,6 +1995,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2..fcb1497 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc9..501910b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6..f0b7b9c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5..2c60400 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index aa7714c..fdba119 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -82,6 +82,10 @@ $node_subscriber->safe_psql('postgres',
 $node_subscriber->poll_query_until('postgres',
 	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
 ) or die "Timed out while waiting for subscriber to start";
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT worker_type FROM pg_stat_subscription WHERE pid IS NOT NULL AND subname = 'tap_sub2' AND relid IS NULL"
+);
+is($result, qq(apply), 'apply worker is running');
 
 # and drop both subscriptions
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
-- 
1.8.3.1

#10Michael Paquier
michael@paquier.xyz
In reply to: Peter Smith (#9)
Re: Add 'worker_type' to pg_stat_subscription

On Tue, Sep 12, 2023 at 07:00:14PM +1000, Peter Smith wrote:

Right. I found just a single test currently using pg_stat_subscription
catalog. I added a worker_type check for that.

This looks enough to me, thanks!
--
Michael

#11Maxim Orlov
orlovmg@gmail.com
In reply to: Michael Paquier (#10)
Re: Add 'worker_type' to pg_stat_subscription

Hi!

I did a look at the patch, like the idea. The overall code is in a good
condition, implements the described feature.

Side note: this is not a problem of this particular patch, but in
pg_stat_get_subscription and many other places, there
is a switch with worker types. Can we use a default section there to have
an explicit error instead of the compiler
warnings if somehow we decide to add another one worker type?

So, should we mark this thread as RfC?

--
Best regards,
Maxim Orlov.

#12Nathan Bossart
nathandbossart@gmail.com
In reply to: Maxim Orlov (#11)
Re: Add 'worker_type' to pg_stat_subscription

On Wed, Sep 13, 2023 at 05:06:28PM +0300, Maxim Orlov wrote:

I did a look at the patch, like the idea. The overall code is in a good
condition, implements the described feature.

Thanks for reviewing.

Side note: this is not a problem of this particular patch, but in
pg_stat_get_subscription and many other places, there
is a switch with worker types. Can we use a default section there to have
an explicit error instead of the compiler
warnings if somehow we decide to add another one worker type?

-1. We want such compiler warnings to remind us to adjust the code
accordingly. If we just rely on an ERROR in the default section, we might
miss it if there isn't a relevant test.

So, should we mark this thread as RfC?

I've done so. Barring additional feedback, I intend to commit this in the
next few days.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#13Nathan Bossart
nathandbossart@gmail.com
In reply to: Nathan Bossart (#12)
Re: Add 'worker_type' to pg_stat_subscription

On Wed, Sep 13, 2023 at 09:59:04AM -0700, Nathan Bossart wrote:

On Wed, Sep 13, 2023 at 05:06:28PM +0300, Maxim Orlov wrote:

So, should we mark this thread as RfC?

I've done so. Barring additional feedback, I intend to commit this in the
next few days.

Note to self: this needs a catversion bump.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#14Nathan Bossart
nathandbossart@gmail.com
In reply to: Nathan Bossart (#12)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Wed, Sep 13, 2023 at 09:59:04AM -0700, Nathan Bossart wrote:

On Wed, Sep 13, 2023 at 05:06:28PM +0300, Maxim Orlov wrote:

So, should we mark this thread as RfC?

I've done so. Barring additional feedback, I intend to commit this in the
next few days.

I did some staging work for the patch (attached). The one code change I
made was for the new test. Instead of adding a new test, I figured we
could modify the preceding test to check for the expected worker type
instead of whether relid is NULL. ISTM this relid check is intended to
filter for the apply worker, anyway.

The only reason I didn't apply this already is because IMHO we should
adjust the worker types and the documentation for the view to be
consistent. For example, the docs say "leader apply worker" but the view
just calls them "apply" workers. The docs say "synchronization worker" but
the view calls them "table synchronization" workers. My first instinct is
to call apply workers "leader apply" workers in the view, and to call table
synchronization workers "table synchronization workers" in the docs.

Thoughts?

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v6-0001-Add-worker-type-to-pg_stat_subscription.patchtext/x-diff; charset=us-asciiDownload
From a49c8d92c4ddaf99d067d03e6adabea068497e93 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Thu, 14 Sep 2023 14:31:44 -0700
Subject: [PATCH v6 1/1] Add worker type to pg_stat_subscription.

Thanks to 2a8b40e368, the logical replication worker type is easily
determined, and this information is a nice addition to the
pg_stat_subscription view.  The worker type could already be
deduced via other columns such as leader_pid and relid, but that is
unnecessary complexity for users.

Bumps catversion.

Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml               | 11 +++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/catversion.h           |  2 +-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 src/test/subscription/t/004_sync.pl        |  2 +-
 7 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..17f9323f23 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..501910b445 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 4eaef54d0c..a0c1dbca95 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	202309061
+#define CATALOG_VERSION_NO	202309141
 
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index bf4d59efba..ee07d28b37 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for it to start
 $node_subscriber->poll_query_until('postgres',
-	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
+	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'"
 ) or die "Timed out while waiting for subscriber to start";
 
 # and drop both subscriptions
-- 
2.25.1

#15Nathan Bossart
nathandbossart@gmail.com
In reply to: Nathan Bossart (#14)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 14, 2023 at 03:04:19PM -0700, Nathan Bossart wrote:

The only reason I didn't apply this already is because IMHO we should
adjust the worker types and the documentation for the view to be
consistent. For example, the docs say "leader apply worker" but the view
just calls them "apply" workers. The docs say "synchronization worker" but
the view calls them "table synchronization" workers. My first instinct is
to call apply workers "leader apply" workers in the view, and to call table
synchronization workers "table synchronization workers" in the docs.

Concretely, like this.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v7-0001-Add-worker-type-to-pg_stat_subscription.patchtext/x-diff; charset=us-asciiDownload
From 965dc5f05eee9e1c6cac1374d5800fff1ea5cba2 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Thu, 14 Sep 2023 14:31:44 -0700
Subject: [PATCH v7 1/1] Add worker type to pg_stat_subscription.

Thanks to 2a8b40e368, the logical replication worker type is easily
determined, and this information is a nice addition to the
pg_stat_subscription view.  The worker type could already be
deduced via other columns such as leader_pid and relid, but that is
unnecessary complexity for users.

Bumps catversion.

Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml               | 13 ++++++++++++-
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/catversion.h           |  2 +-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 src/test/subscription/t/004_sync.pl        |  2 +-
 7 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..d2328eb85d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>leader apply</literal>, <literal>parallel apply</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
@@ -2008,7 +2019,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para>
       <para>
        Process ID of the leader apply worker if this process is a parallel
-       apply worker; NULL if this process is a leader apply worker or a
+       apply worker; NULL if this process is a leader apply worker or a table
        synchronization worker
       </para></entry>
      </row>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..54ab8a37f4 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("leader apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 4eaef54d0c..f1f6c5855b 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	202309061
+#define CATALOG_VERSION_NO	202309151
 
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index bf4d59efba..f25c9f6408 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for it to start
 $node_subscriber->poll_query_until('postgres',
-	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
+	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'leader apply'"
 ) or die "Timed out while waiting for subscriber to start";
 
 # and drop both subscriptions
-- 
2.25.1

#16Michael Paquier
michael@paquier.xyz
In reply to: Nathan Bossart (#15)
Re: Add 'worker_type' to pg_stat_subscription

On Fri, Sep 15, 2023 at 09:35:38AM -0700, Nathan Bossart wrote:

Concretely, like this.

There are two references to "synchronization worker" in tablesync.c
(exit routine and busy loop), and a bit more of "sync worker"..
Anyway, these don't matter much, but there are two errmsgs where the
term "tablesync worker" is used. Even if they are internal, these
could be made more consistent at least?

In config.sgml, max_sync_workers_per_subscription's description uses
"synchronization workers". In the second case, adding "table" makes
little sense, but could it for the two other sentences?
--
Michael

#17Amit Kapila
amit.kapila16@gmail.com
In reply to: Nathan Bossart (#15)
Re: Add 'worker_type' to pg_stat_subscription

On Sat, Sep 16, 2023 at 1:06 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Thu, Sep 14, 2023 at 03:04:19PM -0700, Nathan Bossart wrote:

The only reason I didn't apply this already is because IMHO we should
adjust the worker types and the documentation for the view to be
consistent. For example, the docs say "leader apply worker" but the view
just calls them "apply" workers. The docs say "synchronization worker" but
the view calls them "table synchronization" workers. My first instinct is
to call apply workers "leader apply" workers in the view, and to call table
synchronization workers "table synchronization workers" in the docs.

Concretely, like this.

I think there is a merit in keeping the worker type as 'sync' or
'synchronization' because these would be used in future for syncing
other objects like sequences. One more thing that slightly looks odd
is the 'leader apply' type of worker, won't this be confusing when
there is no parallel apply worker in the system? In this regard,
probably existing documentation could also be improved.

--
With Regards,
Amit Kapila.

#18Nathan Bossart
nathandbossart@gmail.com
In reply to: Amit Kapila (#17)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Sat, Sep 16, 2023 at 06:09:48PM +0530, Amit Kapila wrote:

I think there is a merit in keeping the worker type as 'sync' or
'synchronization' because these would be used in future for syncing
other objects like sequences. One more thing that slightly looks odd
is the 'leader apply' type of worker, won't this be confusing when
there is no parallel apply worker in the system? In this regard,
probably existing documentation could also be improved.

These are good points. I went ahead and adjusted the patch back to using
"apply" for [leader] apply workers and to using "synchronization" for
synchronization workers. I also adjusted a couple of the error messages
that Michael pointed out to say "synchronization worker" instead of "table
synchronization worker" or "tablesync worker".

This still leaves the possibility for confusion with the documentation's
use of "leader apply worker", but I haven't touched that for now.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v8-0001-Add-worker-type-to-pg_stat_subscription.patchtext/x-diff; charset=us-asciiDownload
From 42c7b1910af29a8543ed65b2150e5eedae34a594 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Sat, 16 Sep 2023 13:21:54 -0700
Subject: [PATCH v8 1/1] Add worker type to pg_stat_subscription.

Thanks to 2a8b40e368, the logical replication worker type is easily
determined, and this information is a nice addition to the
pg_stat_subscription view.  The worker type could already be
deduced via other columns such as leader_pid and relid, but that is
unnecessary complexity for users.

Bumps catversion.

Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov, Amit Kapila
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml                | 11 +++++++++++
 src/backend/catalog/system_views.sql        |  1 +
 src/backend/replication/logical/launcher.c  | 18 +++++++++++++++++-
 src/backend/replication/logical/tablesync.c |  2 +-
 src/backend/replication/logical/worker.c    |  6 +++---
 src/include/catalog/catversion.h            |  2 +-
 src/include/catalog/pg_proc.dat             |  6 +++---
 src/test/regress/expected/rules.out         |  3 ++-
 src/test/subscription/t/004_sync.pl         |  2 +-
 9 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..3546e8b3d9 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel apply</literal>, and
+       <literal>synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..49e1c934a7 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e2cee92cf2..cd71807088 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -151,7 +151,7 @@ finish_sync_worker(void)
 
 	StartTransactionCommand();
 	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+			(errmsg("logical replication synchronization worker for subscription \"%s\", table \"%s\" has finished",
 					MySubscription->name,
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 597947410f..3894d741ac 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1065,7 +1065,7 @@ apply_handle_begin_prepare(StringInfo s)
 	if (am_tablesync_worker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+				 errmsg_internal("synchronization worker received a BEGIN PREPARE message")));
 
 	/* There must not be an active streaming transaction. */
 	Assert(!TransactionIdIsValid(stream_xid));
@@ -1304,7 +1304,7 @@ apply_handle_stream_prepare(StringInfo s)
 	if (am_tablesync_worker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
+				 errmsg_internal("synchronization worker received a STREAM PREPARE message")));
 
 	logicalrep_read_stream_prepare(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
@@ -4628,7 +4628,7 @@ InitializeLogRepWorker(void)
 
 	if (am_tablesync_worker())
 		ereport(LOG,
-				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+				(errmsg("logical replication synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
 						get_rel_name(MyLogicalRepWorker->relid))));
 	else
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 4eaef54d0c..791d8ef313 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	202309061
+#define CATALOG_VERSION_NO	202309161
 
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index bf4d59efba..ee07d28b37 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for it to start
 $node_subscriber->poll_query_until('postgres',
-	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
+	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'"
 ) or die "Timed out while waiting for subscriber to start";
 
 # and drop both subscriptions
-- 
2.25.1

#19Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#18)
Re: Add 'worker_type' to pg_stat_subscription

IIUC some future feature syncing of sequences is likely to share a lot
of the tablesync worker code (maybe it is only differentiated by the
relid being for a RELKIND_SEQUENCE?).

The original intent of this stats worker-type patch was to be able to
easily know the type of the process without having to dig through
other attributes (like relid etc.) to infer it. If you feel
differentiating kinds of syncing processes won't be of interest to
users then just generically calling it "synchronization" is fine by
me. OTOH, if users might care what 'kind' of syncing it is, perhaps
leaving the stats attribute as "table synchronization" (and some
future patch would add "sequence synchronization") is better.

TBH, I am not sure which option is best, so I am happy to go with the consensus.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#20Amit Kapila
amit.kapila16@gmail.com
In reply to: Nathan Bossart (#18)
Re: Add 'worker_type' to pg_stat_subscription

On Sun, Sep 17, 2023 at 2:10 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Sat, Sep 16, 2023 at 06:09:48PM +0530, Amit Kapila wrote:

This still leaves the possibility for confusion with the documentation's
use of "leader apply worker", but I haven't touched that for now.

We may want to fix that separately but as you have raised here, I
found the following two places in docs which could be a bit confusing.

"Specifies maximum number of logical replication workers. This
includes leader apply workers, parallel apply workers, and table
synchronization"

""OID of the relation that the worker is synchronizing; NULL for the
leader apply worker and parallel apply workers"

One simple idea to reduce confusion could be to use (leader) in the
above two places. Do you see any other place which could be confusing
and what do you suggest to fix it?

--
With Regards,
Amit Kapila.

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#19)
Re: Add 'worker_type' to pg_stat_subscription

On Mon, Sep 18, 2023 at 6:10 AM Peter Smith <smithpb2250@gmail.com> wrote:

IIUC some future feature syncing of sequences is likely to share a lot
of the tablesync worker code (maybe it is only differentiated by the
relid being for a RELKIND_SEQUENCE?).

The original intent of this stats worker-type patch was to be able to
easily know the type of the process without having to dig through
other attributes (like relid etc.) to infer it.

That makes sense and I think it will probably be helpful in debugging.
For example, I am not sure the following and similar changes in the
patch are a good idea:
if (am_tablesync_worker())
  ereport(LOG,
- (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has started",
+ (errmsg("logical replication synchronization worker for subscription
\"%s\", table \"%s\" has started",

I think it would be sometimes helpful in debugging to know the type of
sync worker, so keeping the type in the above message would be
helpful.

If you feel
differentiating kinds of syncing processes won't be of interest to
users then just generically calling it "synchronization" is fine by
me. OTOH, if users might care what 'kind' of syncing it is, perhaps
leaving the stats attribute as "table synchronization" (and some
future patch would add "sequence synchronization") is better.

Earlier, I thought it would be better to keep it generic but after
seeing your point and the latest changes in the patch it seems
differentiating between types of sync workers would be a good idea.

--
With Regards,
Amit Kapila.

#22Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#20)
Re: Add 'worker_type' to pg_stat_subscription

On Mon, Sep 18, 2023 at 1:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Sun, Sep 17, 2023 at 2:10 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Sat, Sep 16, 2023 at 06:09:48PM +0530, Amit Kapila wrote:

This still leaves the possibility for confusion with the documentation's
use of "leader apply worker", but I haven't touched that for now.

We may want to fix that separately but as you have raised here, I
found the following two places in docs which could be a bit confusing.

"Specifies maximum number of logical replication workers. This
includes leader apply workers, parallel apply workers, and table
synchronization"

""OID of the relation that the worker is synchronizing; NULL for the
leader apply worker and parallel apply workers"

One simple idea to reduce confusion could be to use (leader) in the
above two places. Do you see any other place which could be confusing
and what do you suggest to fix it?

IIRC we first encountered this problem with the parallel apply workers
were introduced -- "leader" was added wherever we needed to
distinguish the main apply and the parallel apply worker. Perhaps at
that time, we ought to have changed it *everywhere* instead of
changing only the ambiguous places. Lately, I've been thinking it
would have been easier to have *one* rule and always call the (main)
apply worker the "leader apply" worker -- simply because 2 names
("leader apply" and "parallel apply") are easier to explain than 3
names.

A "leader apply" worker with no "parallel apply" workers is a bit like
the "boss" of a company that has no employees -- IMO it's OK to still
say that they are the "boss".

Regardless, I think changing this in other docs and other code is
outside the scope of this simple pg stats patch -- here we can just
change the relevant config docs and the stats attribute value to
"leader apply" and leave it at that.

Changing every other place to consistently say "leader apply" is a
bigger task for another thread because we will find lots more places
to change. For example, there are messages like: "logical replication
apply worker for subscription \"%s\" has started" that perhaps should
say "logical replication leader apply worker for subscription \"%s\"
has started". Such changes don't belong in this stats patch.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#23Nathan Bossart
nathandbossart@gmail.com
In reply to: Peter Smith (#22)
Re: Add 'worker_type' to pg_stat_subscription

On Mon, Sep 18, 2023 at 04:56:46PM +1000, Peter Smith wrote:

On Mon, Sep 18, 2023 at 1:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

One simple idea to reduce confusion could be to use (leader) in the
above two places. Do you see any other place which could be confusing
and what do you suggest to fix it?

IIRC we first encountered this problem with the parallel apply workers
were introduced -- "leader" was added wherever we needed to
distinguish the main apply and the parallel apply worker. Perhaps at
that time, we ought to have changed it *everywhere* instead of
changing only the ambiguous places. Lately, I've been thinking it
would have been easier to have *one* rule and always call the (main)
apply worker the "leader apply" worker -- simply because 2 names
("leader apply" and "parallel apply") are easier to explain than 3
names.

A "leader apply" worker with no "parallel apply" workers is a bit like
the "boss" of a company that has no employees -- IMO it's OK to still
say that they are the "boss".

From the latest discussion, it sounds like you (Peter and Amit) are leaning
more towards something like the v7 patch [0]/messages/by-id/attachment/150345/v7-0001-Add-worker-type-to-pg_stat_subscription.patch. I'm okay with that. Perhaps
it'd be worth starting a new thread after this one to make the terminology
consistent in the docs, error messages, views, etc. Fortunately, we have
some time to straighten this out for v17.

Regardless, I think changing this in other docs and other code is
outside the scope of this simple pg stats patch -- here we can just
change the relevant config docs and the stats attribute value to
"leader apply" and leave it at that.

Changing every other place to consistently say "leader apply" is a
bigger task for another thread because we will find lots more places
to change. For example, there are messages like: "logical replication
apply worker for subscription \"%s\" has started" that perhaps should
say "logical replication leader apply worker for subscription \"%s\"
has started". Such changes don't belong in this stats patch.

+1

[0]: /messages/by-id/attachment/150345/v7-0001-Add-worker-type-to-pg_stat_subscription.patch

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#24Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#23)
Re: Add 'worker_type' to pg_stat_subscription

On Tue, Sep 19, 2023 at 1:20 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Mon, Sep 18, 2023 at 04:56:46PM +1000, Peter Smith wrote:

On Mon, Sep 18, 2023 at 1:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

One simple idea to reduce confusion could be to use (leader) in the
above two places. Do you see any other place which could be confusing
and what do you suggest to fix it?

IIRC we first encountered this problem with the parallel apply workers
were introduced -- "leader" was added wherever we needed to
distinguish the main apply and the parallel apply worker. Perhaps at
that time, we ought to have changed it *everywhere* instead of
changing only the ambiguous places. Lately, I've been thinking it
would have been easier to have *one* rule and always call the (main)
apply worker the "leader apply" worker -- simply because 2 names
("leader apply" and "parallel apply") are easier to explain than 3
names.

A "leader apply" worker with no "parallel apply" workers is a bit like
the "boss" of a company that has no employees -- IMO it's OK to still
say that they are the "boss".

From the latest discussion, it sounds like you (Peter and Amit) are leaning
more towards something like the v7 patch [0]. I'm okay with that. Perhaps
it'd be worth starting a new thread after this one to make the terminology
consistent in the docs, error messages, views, etc. Fortunately, we have
some time to straighten this out for v17.

Yes, the v7 patch looked good to me.

Regardless, I think changing this in other docs and other code is
outside the scope of this simple pg stats patch -- here we can just
change the relevant config docs and the stats attribute value to
"leader apply" and leave it at that.

Changing every other place to consistently say "leader apply" is a
bigger task for another thread because we will find lots more places
to change. For example, there are messages like: "logical replication
apply worker for subscription \"%s\" has started" that perhaps should
say "logical replication leader apply worker for subscription \"%s\"
has started". Such changes don't belong in this stats patch.

+1

[0] /messages/by-id/attachment/150345/v7-0001-Add-worker-type-to-pg_stat_subscription.patch

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#25Amit Kapila
amit.kapila16@gmail.com
In reply to: Nathan Bossart (#23)
Re: Add 'worker_type' to pg_stat_subscription

On Mon, Sep 18, 2023 at 8:50 PM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Mon, Sep 18, 2023 at 04:56:46PM +1000, Peter Smith wrote:

On Mon, Sep 18, 2023 at 1:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

One simple idea to reduce confusion could be to use (leader) in the
above two places. Do you see any other place which could be confusing
and what do you suggest to fix it?

IIRC we first encountered this problem with the parallel apply workers
were introduced -- "leader" was added wherever we needed to
distinguish the main apply and the parallel apply worker. Perhaps at
that time, we ought to have changed it *everywhere* instead of
changing only the ambiguous places. Lately, I've been thinking it
would have been easier to have *one* rule and always call the (main)
apply worker the "leader apply" worker -- simply because 2 names
("leader apply" and "parallel apply") are easier to explain than 3
names.

A "leader apply" worker with no "parallel apply" workers is a bit like
the "boss" of a company that has no employees -- IMO it's OK to still
say that they are the "boss".

From the latest discussion, it sounds like you (Peter and Amit) are leaning
more towards something like the v7 patch [0].

I am of the opinion that worker_type should be 'apply' instead of
'leader apply' because even when it is a leader for parallel apply
worker, it could perform individual transactions apply. For reference,
I checked pg_stat_activity.backend_type, there is nothing called main
or leader backend even when the backend is involved in parallel query.

--
With Regards,
Amit Kapila.

#26Nathan Bossart
nathandbossart@gmail.com
In reply to: Amit Kapila (#25)
1 attachment(s)
Re: Add 'worker_type' to pg_stat_subscription

On Tue, Sep 19, 2023 at 08:36:35AM +0530, Amit Kapila wrote:

I am of the opinion that worker_type should be 'apply' instead of
'leader apply' because even when it is a leader for parallel apply
worker, it could perform individual transactions apply. For reference,
I checked pg_stat_activity.backend_type, there is nothing called main
or leader backend even when the backend is involved in parallel query.

Okay. Here is v9 of the patch with this change.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

Attachments:

v9-0001-Add-worker-type-to-pg_stat_subscription.patchtext/x-diff; charset=us-asciiDownload
From b8268a4c95ff217742bc2e127f74f67c9a417233 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Wed, 20 Sep 2023 12:22:21 -0700
Subject: [PATCH v9 1/1] Add worker type to pg_stat_subscription.

Thanks to 2a8b40e368, the logical replication worker type is easily
determined, and this information is a nice addition to the
pg_stat_subscription view.  The worker type could already be
deduced via other columns such as leader_pid and relid, but that is
unnecessary complexity for users.

Bumps catversion.

Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov, Amit Kapila
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml               | 13 ++++++++++++-
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 src/test/subscription/t/004_sync.pl        |  2 +-
 6 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..9c4930e9ae 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel apply</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
@@ -2008,7 +2019,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para>
       <para>
        Process ID of the leader apply worker if this process is a parallel
-       apply worker; NULL if this process is a leader apply worker or a
+       apply worker; NULL if this process is a leader apply worker or a table
        synchronization worker
       </para></entry>
      </row>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..501910b445 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index bf4d59efba..ee07d28b37 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for it to start
 $node_subscriber->poll_query_until('postgres',
-	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
+	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'"
 ) or die "Timed out while waiting for subscriber to start";
 
 # and drop both subscriptions
-- 
2.25.1

#27Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#26)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 5:30 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Tue, Sep 19, 2023 at 08:36:35AM +0530, Amit Kapila wrote:

I am of the opinion that worker_type should be 'apply' instead of
'leader apply' because even when it is a leader for parallel apply
worker, it could perform individual transactions apply. For reference,
I checked pg_stat_activity.backend_type, there is nothing called main
or leader backend even when the backend is involved in parallel query.

Okay. Here is v9 of the patch with this change.

One question -- the patch comment still says "Bumps catversion.", but
catversion.h change is missing from the v9 patch?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#28Michael Paquier
michael@paquier.xyz
In reply to: Peter Smith (#27)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 09:01:01AM +1000, Peter Smith wrote:

One question -- the patch comment still says "Bumps catversion.", but
catversion.h change is missing from the v9 patch?

Yeah, previous patches did that, but it is no big deal. My take is
that it is a good practice to never do a catversion bump in posted
patches, and that it is equally a good practice from Nathan to be
reminded about that with the addition of a note in the commit message
of the patch posted.
--
Michael

#29Nathan Bossart
nathandbossart@gmail.com
In reply to: Michael Paquier (#28)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 08:14:23AM +0900, Michael Paquier wrote:

On Thu, Sep 21, 2023 at 09:01:01AM +1000, Peter Smith wrote:

One question -- the patch comment still says "Bumps catversion.", but
catversion.h change is missing from the v9 patch?

Yeah, previous patches did that, but it is no big deal. My take is
that it is a good practice to never do a catversion bump in posted
patches, and that it is equally a good practice from Nathan to be
reminded about that with the addition of a note in the commit message
of the patch posted.

Right, I'll take care of it before committing. I'm trying to make sure I
don't forget. :)

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#30Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#29)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 9:34 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Thu, Sep 21, 2023 at 08:14:23AM +0900, Michael Paquier wrote:

On Thu, Sep 21, 2023 at 09:01:01AM +1000, Peter Smith wrote:

One question -- the patch comment still says "Bumps catversion.", but
catversion.h change is missing from the v9 patch?

Yeah, previous patches did that, but it is no big deal. My take is
that it is a good practice to never do a catversion bump in posted
patches, and that it is equally a good practice from Nathan to be
reminded about that with the addition of a note in the commit message
of the patch posted.

Right, I'll take care of it before committing. I'm trying to make sure I
don't forget. :)

OK, all good.

~~~

This is a bit of a late entry, but looking at the PG DOCS, I felt it
might be simpler if we don't always refer to every other worker type
when explaining NULLs. The descriptions are already bigger than they
need to be, and if more types ever get added they will keep growing.

~

BEFORE
leader_pid integer
Process ID of the leader apply worker if this process is a parallel
apply worker; NULL if this process is a leader apply worker or a table
synchronization worker

SUGGESTION
leader_pid integer
Process ID of the leader apply worker; NULL if this process is not a
parallel apply worker

~

BEFORE
relid oid
OID of the relation that the worker is synchronizing; NULL for the
leader apply worker and parallel apply workers

SUGGESTION
relid oid
OID of the relation being synchronized; NULL if this process is not a
table synchronization worker

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#31Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#30)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 5:36 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Thu, Sep 21, 2023 at 9:34 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Thu, Sep 21, 2023 at 08:14:23AM +0900, Michael Paquier wrote:

On Thu, Sep 21, 2023 at 09:01:01AM +1000, Peter Smith wrote:

One question -- the patch comment still says "Bumps catversion.", but
catversion.h change is missing from the v9 patch?

Yeah, previous patches did that, but it is no big deal. My take is
that it is a good practice to never do a catversion bump in posted
patches, and that it is equally a good practice from Nathan to be
reminded about that with the addition of a note in the commit message
of the patch posted.

Right, I'll take care of it before committing. I'm trying to make sure I
don't forget. :)

OK, all good.

~~~

This is a bit of a late entry, but looking at the PG DOCS, I felt it
might be simpler if we don't always refer to every other worker type
when explaining NULLs. The descriptions are already bigger than they
need to be, and if more types ever get added they will keep growing.

~

BEFORE
leader_pid integer
Process ID of the leader apply worker if this process is a parallel
apply worker; NULL if this process is a leader apply worker or a table
synchronization worker

SUGGESTION
leader_pid integer
Process ID of the leader apply worker; NULL if this process is not a
parallel apply worker

~

BEFORE
relid oid
OID of the relation that the worker is synchronizing; NULL for the
leader apply worker and parallel apply workers

SUGGESTION
relid oid
OID of the relation being synchronized; NULL if this process is not a
table synchronization worker

I find the current descriptions better than the proposed. But I am not
opposed to your proposal if others are okay with it. Personally, I
feel even if we want to change these descriptions, we can do it as a
separate patch.

--
With Regards,
Amit Kapila.

#32Amit Kapila
amit.kapila16@gmail.com
In reply to: Nathan Bossart (#26)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 1:00 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Tue, Sep 19, 2023 at 08:36:35AM +0530, Amit Kapila wrote:

I am of the opinion that worker_type should be 'apply' instead of
'leader apply' because even when it is a leader for parallel apply
worker, it could perform individual transactions apply. For reference,
I checked pg_stat_activity.backend_type, there is nothing called main
or leader backend even when the backend is involved in parallel query.

Okay. Here is v9 of the patch with this change.

The changes looks good to me, though I haven't tested it. But feel
free to commit if you are fine with this patch.

--
With Regards,
Amit Kapila.

#33Nathan Bossart
nathandbossart@gmail.com
In reply to: Amit Kapila (#32)
Re: Add 'worker_type' to pg_stat_subscription

On Thu, Sep 21, 2023 at 04:01:20PM +0530, Amit Kapila wrote:

The changes looks good to me, though I haven't tested it. But feel
free to commit if you are fine with this patch.

Committed.

--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com

#34Peter Smith
smithpb2250@gmail.com
In reply to: Nathan Bossart (#33)
Re: Add 'worker_type' to pg_stat_subscription

On Tue, Sep 26, 2023 at 7:16 AM Nathan Bossart <nathandbossart@gmail.com> wrote:

On Thu, Sep 21, 2023 at 04:01:20PM +0530, Amit Kapila wrote:

The changes looks good to me, though I haven't tested it. But feel
free to commit if you are fine with this patch.

Committed.

Thanks for pushing.

------
Kind Regards,
Peter Smith.
Fujitsu Australia