Adding a LogicalRepWorker type field

Started by Peter Smithover 2 years ago30 messages
#1Peter Smith
smithpb2250@gmail.com
2 attachment(s)

Hi hackers,

BACKGROUND:

The logical replication has different worker "types" (e.g. apply
leader, apply parallel, tablesync).

They all use a common structure called LogicalRepWorker, but at times
it is necessary to know what "type" of worker a given LogicalRepWorker
represents.

Once, there were just apply workers and tablesync workers - these were
easily distinguished because only tablesync workers had a valid
'relid' field. Next, parallel-apply workers were introduced - these
are distinguishable from the apply leaders by the value of
'leader_pid' field.

PROBLEM:

IMO, deducing the worker's type by examining multiple different field
values seems a dubious way to do it. This maybe was reasonable enough
when there were only 2 types, but as more get added it becomes
increasingly complicated.

SOLUTION:

AFAIK none of the complications is necessary anyway; the worker type
is already known at launch time, and it never changes during the life
of the process.

The attached Patch 0001 introduces a new enum 'type' field, which is
assigned during the launch.

~

This change not only simplifies the code, but it also permits other
code optimizations, because now we can switch on the worker enum type,
instead of using cascading if/else. (see Patch 0002).

Thoughts?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v1-0001-Add-LogicalRepWorkerType-enum.patchapplication/octet-stream; name=v1-0001-Add-LogicalRepWorkerType-enum.patchDownload
From 8afba828a3c1cf3e23a85f9c9f034bbc90977237 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 28 Jul 2023 14:45:00 +1000
Subject: [PATCH v1] Add LogicalRepWorkerType enum

---
 src/backend/replication/logical/launcher.c |  4 ++++
 src/include/replication/worker_internal.h  | 20 ++++++++++++++++----
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d..702732f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -427,6 +427,10 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type =
+		OidIsValid(relid) ? LR_WORKER_TABLESYNC :
+		is_parallel_apply_worker ? LR_WORKER_APPLY_PARALLEL :
+		LR_WORKER_APPLY;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781..97447dc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,8 +27,19 @@
 #include "storage/spin.h"
 
 
+/* Different kinds of workers */
+typedef enum LogicalRepWorkerType
+{
+	LR_WORKER_TABLESYNC,
+	LR_WORKER_APPLY,
+	LR_WORKER_APPLY_PARALLEL
+} LogicalRepWorkerType;
+
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -305,19 +316,20 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isLeaderApplyWorker(worker) ((worker)->type == LR_WORKER_APPLY)
+#define isParallelApplyWorker(worker) ((worker)->type == LR_WORKER_APPLY_PARALLEL)
+#define isTablesyncWorker(worker) ((worker)->type == LR_WORKER_TABLESYNC)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return isTablesyncWorker(MyLogicalRepWorker);
 }
 
 static inline bool
 am_leader_apply_worker(void)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return isLeaderApplyWorker(MyLogicalRepWorker);
 }
 
 static inline bool
-- 
1.8.3.1

v1-0002-Switch-on-worker-type.patchapplication/octet-stream; name=v1-0002-Switch-on-worker-type.patchDownload
From a5e6a3130e8e135ed2f88e0e47cb74c953dc0383 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 28 Jul 2023 19:25:29 +1000
Subject: [PATCH v1] Switch on worker type

---
 src/backend/replication/logical/tablesync.c | 28 +++++++++++++--------
 src/backend/replication/logical/worker.c    | 39 ++++++++++++++++-------------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d46165..867105a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -647,18 +647,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (am_parallel_apply_worker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case LR_WORKER_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
 
-	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case LR_WORKER_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case LR_WORKER_APPLY_PARALLEL:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd..e3dc861 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,25 +486,30 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
-		return rel->state == SUBREL_STATE_READY;
+		case LR_WORKER_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case LR_WORKER_APPLY_PARALLEL:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+								MySubscription->name),
+						 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+			return rel->state == SUBREL_STATE_READY;
+
+		case LR_WORKER_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
 	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

#2Bharath Rupireddy
bharath.rupireddyforpostgres@gmail.com
In reply to: Peter Smith (#1)
Re: Adding a LogicalRepWorker type field

On Mon, Jul 31, 2023 at 7:17 AM Peter Smith <smithpb2250@gmail.com> wrote:

PROBLEM:

IMO, deducing the worker's type by examining multiple different field
values seems a dubious way to do it. This maybe was reasonable enough
when there were only 2 types, but as more get added it becomes
increasingly complicated.

+1 for being more explicit in worker types. I also think that we must
move away from am_{parallel_apply, tablesync,
leader_apply}_worker(void) to Is{ParallelApply, TableSync,
LeaderApply}Worker(MyLogicalRepWorker), just to be a little more
consistent and less confusion around different logical replication
worker type related functions.

AFAIK none of the complications is necessary anyway; the worker type
is already known at launch time, and it never changes during the life
of the process.

The attached Patch 0001 introduces a new enum 'type' field, which is
assigned during the launch.

This change not only simplifies the code, but it also permits other
code optimizations, because now we can switch on the worker enum type,
instead of using cascading if/else. (see Patch 0002).

Some comments:
1.
+/* Different kinds of workers */
+typedef enum LogicalRepWorkerType
+{
+    LR_WORKER_TABLESYNC,
+    LR_WORKER_APPLY,
+    LR_WORKER_APPLY_PARALLEL
+} LogicalRepWorkerType;

Can these names be readable? How about something like
LR_TABLESYNC_WORKER, LR_APPLY_WORKER, LR_PARALLEL_APPLY_WORKER?

2.
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isLeaderApplyWorker(worker) ((worker)->type == LR_WORKER_APPLY)
+#define isParallelApplyWorker(worker) ((worker)->type ==
LR_WORKER_APPLY_PARALLEL)
+#define isTablesyncWorker(worker) ((worker)->type == LR_WORKER_TABLESYNC)

Can the above start with "Is" instead of "is" similar to
IsLogicalWorker and IsLogicalParallelApplyWorker?

3.
+    worker->type =
+        OidIsValid(relid) ? LR_WORKER_TABLESYNC :
+        is_parallel_apply_worker ? LR_WORKER_APPLY_PARALLEL :
+        LR_WORKER_APPLY;

Perhaps, an if-else is better for readability?

if (OidIsValid(relid))
worker->type = LR_WORKER_TABLESYNC;
else if (is_parallel_apply_worker)
worker->type = LR_WORKER_APPLY_PARALLEL;
else
worker->type = LR_WORKER_APPLY;

4.
+/* Different kinds of workers */
+typedef enum LogicalRepWorkerType
+{
+    LR_WORKER_TABLESYNC,
+    LR_WORKER_APPLY,
+    LR_WORKER_APPLY_PARALLEL
+} LogicalRepWorkerType;

Have a LR_WORKER_UNKNOWN = 0 and set it in logicalrep_worker_cleanup()?

5.
+        case LR_WORKER_APPLY:
+            return (rel->state == SUBREL_STATE_READY ||
+                    (rel->state == SUBREL_STATE_SYNCDONE &&
+                     rel->statelsn <= remote_final_lsn));

Not necessary, but a good idea to have a default: clause and error out
saying wrong logical replication worker type?

6.
+        case LR_WORKER_APPLY_PARALLEL:
+            /*
+             * Skip for parallel apply workers because they only
operate on tables
+             * that are in a READY state. See pa_can_start() and
+             * should_apply_changes_for_rel().
+             */
+            break;

I'd better keep this if-else as-is instead of a switch case with
nothing for parallel apply worker.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

#3Amit Kapila
amit.kapila16@gmail.com
In reply to: Bharath Rupireddy (#2)
Re: Adding a LogicalRepWorker type field

On Mon, Jul 31, 2023 at 3:25 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:

On Mon, Jul 31, 2023 at 7:17 AM Peter Smith <smithpb2250@gmail.com> wrote:

PROBLEM:

IMO, deducing the worker's type by examining multiple different field
values seems a dubious way to do it. This maybe was reasonable enough
when there were only 2 types, but as more get added it becomes
increasingly complicated.

+1 for being more explicit in worker types.

+1. BTW, do we need the below functions (am_tablesync_worker(),
am_leader_apply_worker()) after this work?
static inline bool
 am_tablesync_worker(void)
 {
- return OidIsValid(MyLogicalRepWorker->relid);
+ return isTablesyncWorker(MyLogicalRepWorker);
 }
 static inline bool
 am_leader_apply_worker(void)
 {
- return (!am_tablesync_worker() &&
- !isParallelApplyWorker(MyLogicalRepWorker));
+ return isLeaderApplyWorker(MyLogicalRepWorker);
 }

--
With Regards,
Amit Kapila.

#4Peter Smith
smithpb2250@gmail.com
In reply to: Bharath Rupireddy (#2)
2 attachment(s)
Re: Adding a LogicalRepWorker type field

Thanks for your detailed code review.

Most comments are addressed in the attached v2 patches. Details inline below:

On Mon, Jul 31, 2023 at 7:55 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:

On Mon, Jul 31, 2023 at 7:17 AM Peter Smith <smithpb2250@gmail.com> wrote:

PROBLEM:

IMO, deducing the worker's type by examining multiple different field
values seems a dubious way to do it. This maybe was reasonable enough
when there were only 2 types, but as more get added it becomes
increasingly complicated.

+1 for being more explicit in worker types. I also think that we must
move away from am_{parallel_apply, tablesync,
leader_apply}_worker(void) to Is{ParallelApply, TableSync,
LeaderApply}Worker(MyLogicalRepWorker), just to be a little more
consistent and less confusion around different logical replication
worker type related functions.

Done. I converged everything to the macro-naming style as suggested,
but I chose not to pass MyLogicalRepWorker as a parameter for the
normal (am_xxx case) otherwise I felt it would make the calling code
unnecessarily verbose.

Some comments:
1.
+/* Different kinds of workers */
+typedef enum LogicalRepWorkerType
+{
+    LR_WORKER_TABLESYNC,
+    LR_WORKER_APPLY,
+    LR_WORKER_APPLY_PARALLEL
+} LogicalRepWorkerType;

Can these names be readable? How about something like
LR_TABLESYNC_WORKER, LR_APPLY_WORKER, LR_PARALLEL_APPLY_WORKER?

Done. Renamed similar to your suggestion.

2.
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isLeaderApplyWorker(worker) ((worker)->type == LR_WORKER_APPLY)
+#define isParallelApplyWorker(worker) ((worker)->type ==
LR_WORKER_APPLY_PARALLEL)
+#define isTablesyncWorker(worker) ((worker)->type == LR_WORKER_TABLESYNC)

Can the above start with "Is" instead of "is" similar to
IsLogicalWorker and IsLogicalParallelApplyWorker?

Done as suggested.

3.
+    worker->type =
+        OidIsValid(relid) ? LR_WORKER_TABLESYNC :
+        is_parallel_apply_worker ? LR_WORKER_APPLY_PARALLEL :
+        LR_WORKER_APPLY;

Perhaps, an if-else is better for readability?

if (OidIsValid(relid))
worker->type = LR_WORKER_TABLESYNC;
else if (is_parallel_apply_worker)
worker->type = LR_WORKER_APPLY_PARALLEL;
else
worker->type = LR_WORKER_APPLY;

Done as suggested.

4.
+/* Different kinds of workers */
+typedef enum LogicalRepWorkerType
+{
+    LR_WORKER_TABLESYNC,
+    LR_WORKER_APPLY,
+    LR_WORKER_APPLY_PARALLEL
+} LogicalRepWorkerType;

Have a LR_WORKER_UNKNOWN = 0 and set it in logicalrep_worker_cleanup()?

Done as suggested.

5.
+        case LR_WORKER_APPLY:
+            return (rel->state == SUBREL_STATE_READY ||
+                    (rel->state == SUBREL_STATE_SYNCDONE &&
+                     rel->statelsn <= remote_final_lsn));

Not necessary, but a good idea to have a default: clause and error out
saying wrong logical replication worker type?

Not done. Switching on the enum gives a compiler warning if the case
is not handled. All enums are handled.

6.
+        case LR_WORKER_APPLY_PARALLEL:
+            /*
+             * Skip for parallel apply workers because they only
operate on tables
+             * that are in a READY state. See pa_can_start() and
+             * should_apply_changes_for_rel().
+             */
+            break;

I'd better keep this if-else as-is instead of a switch case with
nothing for parallel apply worker.

Not done. IIUC using a switch, the compiler can optimize the code to a
single "jump" thereby saving the extra type-check the HEAD code is
doing. Admittedly, it’s only a nanosecond saving, so it is no problem
to revert this change, but why waste any CPU time unless there is a
reason to?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v2-0002-Switch-on-worker-type.patchapplication/octet-stream; name=v2-0002-Switch-on-worker-type.patchDownload
From c277577e33927e4eb62c334d87be3f16804257a6 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 2 Aug 2023 12:18:01 +1000
Subject: [PATCH v2] Switch on worker type

---
 src/backend/replication/logical/tablesync.c | 32 ++++++++++++++--------
 src/backend/replication/logical/worker.c    | 42 +++++++++++++++++------------
 2 files changed, 46 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e359286..98ebbc9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -647,18 +647,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (IsParallelApplyWorker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case TYPE_TABLESYNC_WORKER:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
 
-	if (IsTablesyncWorker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case TYPE_APPLY_WORKER:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case TYPE_PARALLEL_APPLY_WORKER:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+
+		case TYPE_UNKNOWN:
+			Assert(false);
+			break;
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d165db6..dabfa48 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,25 +486,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (IsTablesyncWorker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (IsParallelApplyWorker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
-		return rel->state == SUBREL_STATE_READY;
+		case TYPE_TABLESYNC_WORKER:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case TYPE_APPLY_WORKER:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case TYPE_PARALLEL_APPLY_WORKER:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+									MySubscription->name),
+							 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+			return rel->state == SUBREL_STATE_READY;
+
+		case TYPE_UNKNOWN:
+			Assert(false);
+			return false;
 	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

v2-0001-Add-LogicalRepWorkerType-enum.patchapplication/octet-stream; name=v2-0001-Add-LogicalRepWorkerType-enum.patchDownload
From 19d80078daf5d85dda49ff43a6afb51f49cdef26 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 2 Aug 2023 10:15:13 +1000
Subject: [PATCH v2] Add LogicalRepWorkerType enum

---
 .../replication/logical/applyparallelworker.c      | 12 +++---
 src/backend/replication/logical/launcher.c         | 24 ++++++++----
 src/backend/replication/logical/tablesync.c        |  4 +-
 src/backend/replication/logical/worker.c           | 44 +++++++++++-----------
 src/include/replication/worker_internal.h          | 42 ++++++++++-----------
 src/tools/pgindent/typedefs.list                   |  1 +
 6 files changed, 68 insertions(+), 59 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 6fb9614..6d25fde 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -265,7 +265,7 @@ static bool
 pa_can_start(void)
 {
 	/* Only leader apply workers can start parallel apply workers. */
-	if (!am_leader_apply_worker())
+	if (!IsLeaderApplyWorker())
 		return false;
 
 	/*
@@ -555,7 +555,7 @@ pa_find_worker(TransactionId xid)
 static void
 pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
-	Assert(!am_parallel_apply_worker());
+	Assert(!IsParallelApplyWorker());
 	Assert(winfo->in_use);
 	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
@@ -1506,7 +1506,7 @@ pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
 
 	if (fileset_state == FS_SERIALIZE_DONE)
 	{
-		Assert(am_leader_apply_worker());
+		Assert(IsLeaderApplyWorker());
 		Assert(MyLogicalRepWorker->stream_fileset);
 		wshared->fileset = *MyLogicalRepWorker->stream_fileset;
 	}
@@ -1522,7 +1522,7 @@ pa_get_fileset_state(void)
 {
 	PartialFileSetState fileset_state;
 
-	Assert(am_parallel_apply_worker());
+	Assert(IsParallelApplyWorker());
 
 	SpinLockAcquire(&MyParallelShared->mutex);
 	fileset_state = MyParallelShared->fileset_state;
@@ -1593,7 +1593,7 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
 void
 pa_decr_and_wait_stream_block(void)
 {
-	Assert(am_parallel_apply_worker());
+	Assert(IsParallelApplyWorker());
 
 	/*
 	 * It is only possible to not have any pending stream chunks when we are
@@ -1620,7 +1620,7 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
-	Assert(am_leader_apply_worker());
+	Assert(IsLeaderApplyWorker());
 
 	/*
 	 * Unlock the shared object lock so that parallel apply worker can
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d..a4afdc6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -259,7 +259,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
 		/* Skip parallel apply workers. */
-		if (isParallelApplyWorker(w))
+		if (IsParallelApplyWorker1(w))
 			continue;
 
 		if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -427,6 +427,13 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	if (OidIsValid(relid))
+		worker->type = TYPE_TABLESYNC_WORKER;
+	else if (is_parallel_apply_worker)
+		worker->type = TYPE_PARALLEL_APPLY_WORKER;
+	else
+		worker->type = TYPE_APPLY_WORKER;
+
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -601,7 +608,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 	if (worker)
 	{
-		Assert(!isParallelApplyWorker(worker));
+		Assert(!IsParallelApplyWorker1(worker));
 		logicalrep_worker_stop_internal(worker, SIGTERM);
 	}
 
@@ -643,7 +650,7 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 	worker = &LogicalRepCtx->workers[slot_no];
-	Assert(isParallelApplyWorker(worker));
+	Assert(IsParallelApplyWorker1(worker));
 
 	/*
 	 * Only stop the worker if the generation matches and the worker is alive.
@@ -729,7 +736,7 @@ static void
 logicalrep_worker_detach(void)
 {
 	/* Stop the parallel apply workers. */
-	if (am_leader_apply_worker())
+	if (IsLeaderApplyWorker())
 	{
 		List	   *workers;
 		ListCell   *lc;
@@ -749,7 +756,7 @@ logicalrep_worker_detach(void)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-			if (isParallelApplyWorker(w))
+			if (IsParallelApplyWorker1(w))
 				logicalrep_worker_stop_internal(w, SIGTERM);
 		}
 
@@ -772,6 +779,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 {
 	Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
 
+	worker->type = TYPE_UNKNOWN;
 	worker->in_use = false;
 	worker->proc = NULL;
 	worker->dbid = InvalidOid;
@@ -868,7 +876,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (w->subid == subid && IsParallelApplyWorker1(w))
 			res++;
 	}
 
@@ -1237,7 +1245,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+		if (IsParallelApplyWorker1(w) && w->proc && pid == w->proc->pid)
 		{
 			leader_pid = w->leader_pid;
 			break;
@@ -1290,7 +1298,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
 
-		if (isParallelApplyWorker(&worker))
+		if (IsParallelApplyWorker1(&worker))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d46165..e359286 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -652,10 +652,10 @@ process_syncing_tables(XLogRecPtr current_lsn)
 	 * that are in a READY state. See pa_can_start() and
 	 * should_apply_changes_for_rel().
 	 */
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 		return;
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		process_syncing_tables_for_sync(current_lsn);
 	else
 		process_syncing_tables_for_apply(current_lsn);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd..d165db6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,9 +486,9 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	else if (IsParallelApplyWorker())
 	{
 		/* We don't synchronize rel's that are in unknown state. */
 		if (rel->state != SUBREL_STATE_READY &&
@@ -1054,7 +1054,7 @@ apply_handle_begin_prepare(StringInfo s)
 	LogicalRepPreparedTxnData begin_data;
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
@@ -1293,7 +1293,7 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
@@ -1423,7 +1423,7 @@ apply_handle_origin(StringInfo s)
 	 */
 	if (!in_streamed_transaction &&
 		(!in_remote_transaction ||
-		 (IsTransactionState() && !am_tablesync_worker())))
+		 (IsTransactionState() && !IsTablesyncWorker())))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("ORIGIN message sent out of order")));
@@ -2020,7 +2020,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
-	if (!am_parallel_apply_worker())
+	if (!IsParallelApplyWorker())
 		maybe_start_skipping_changes(lsn);
 
 	/* Make sure we have an open transaction */
@@ -3440,7 +3440,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	 * Skip for parallel apply workers, because the lsn_mapping is maintained
 	 * by the leader apply worker.
 	 */
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 		return;
 
 	/* Need to do this in permanent context */
@@ -3832,7 +3832,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 static void
 apply_worker_exit(void)
 {
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 	{
 		/*
 		 * Don't stop the parallel apply worker as the leader will detect the
@@ -3851,7 +3851,7 @@ apply_worker_exit(void)
 	 * subscription is still active, and so that we won't leak that hash table
 	 * entry if it isn't.
 	 */
-	if (!am_tablesync_worker())
+	if (!IsTablesyncWorker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	proc_exit(0);
@@ -3894,7 +3894,7 @@ maybe_reread_subscription(void)
 						MySubscription->name)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
@@ -3932,7 +3932,7 @@ maybe_reread_subscription(void)
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
-		if (am_parallel_apply_worker())
+		if (IsParallelApplyWorker())
 			ereport(LOG,
 					(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
 							MySubscription->name)));
@@ -4359,7 +4359,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 {
 	char	   *syncslotname = NULL;
 
-	Assert(am_tablesync_worker());
+	Assert(IsTablesyncWorker());
 
 	PG_TRY();
 	{
@@ -4416,7 +4416,7 @@ start_apply(XLogRecPtr origin_startpos)
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			pgstat_report_subscription_error(MySubscription->oid, !IsTablesyncWorker());
 
 			PG_RE_THROW();
 		}
@@ -4465,7 +4465,7 @@ InitializeApplyWorker(void)
 						MyLogicalRepWorker->subid)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
@@ -4491,7 +4491,7 @@ InitializeApplyWorker(void)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
@@ -4545,7 +4545,7 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 	{
 		start_table_sync(&origin_startpos, &myslotname);
 
@@ -4658,7 +4658,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
 
-	if (!am_tablesync_worker())
+	if (!IsTablesyncWorker())
 	{
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
@@ -4727,7 +4727,7 @@ DisableSubscriptionAndExit(void)
 
 	/* Report the worker failed during either table synchronization or apply */
 	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
+									 !IsTablesyncWorker());
 
 	/* Disable the subscription */
 	StartTransactionCommand();
@@ -4735,7 +4735,7 @@ DisableSubscriptionAndExit(void)
 	CommitTransactionCommand();
 
 	/* Ensure we remove no-longer-useful entry for worker's start time */
-	if (!am_tablesync_worker() && !am_parallel_apply_worker())
+	if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	/* Notify the subscription has been disabled and exit */
@@ -4761,7 +4761,7 @@ IsLogicalWorker(void)
 bool
 IsLogicalParallelApplyWorker(void)
 {
-	return IsLogicalWorker() && am_parallel_apply_worker();
+	return IsLogicalWorker() && IsParallelApplyWorker();
 }
 
 /*
@@ -4826,7 +4826,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
 	XLogRecPtr	myskiplsn = MySubscription->skiplsn;
 	bool		started_tx = false;
 
-	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
+	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || IsParallelApplyWorker())
 		return;
 
 	if (!IsTransactionState())
@@ -5060,7 +5060,7 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 {
 	*winfo = NULL;
 
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 	{
 		return TRANS_PARALLEL_APPLY;
 	}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781..e40c255 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,8 +27,29 @@
 #include "storage/spin.h"
 
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	TYPE_UNKNOWN = 0,
+	TYPE_TABLESYNC_WORKER,
+	TYPE_APPLY_WORKER,
+	TYPE_PARALLEL_APPLY_WORKER
+} LogicalRepWorkerType;
+
+
+#define IsLeaderApplyWorker1(worker) ((worker)->type == TYPE_APPLY_WORKER)
+#define IsParallelApplyWorker1(worker) ((worker)->type == TYPE_PARALLEL_APPLY_WORKER)
+#define IsTablesyncWorker1(worker) ((worker)->type == TYPE_TABLESYNC_WORKER)
+
+#define IsLeaderApplyWorker() IsLeaderApplyWorker1(MyLogicalRepWorker)
+#define IsParallelApplyWorker() IsParallelApplyWorker1(MyLogicalRepWorker)
+#define IsTablesyncWorker() IsTablesyncWorker1(MyLogicalRepWorker)
+
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -305,25 +326,4 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
-
-static inline bool
-am_tablesync_worker(void)
-{
-	return OidIsValid(MyLogicalRepWorker->relid);
-}
-
-static inline bool
-am_leader_apply_worker(void)
-{
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
-}
-
-static inline bool
-am_parallel_apply_worker(void)
-{
-	return isParallelApplyWorker(MyLogicalRepWorker);
-}
-
 #endif							/* WORKER_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e941fb6..0b39394 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1498,6 +1498,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

#5Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#3)
Re: Adding a LogicalRepWorker type field

On Mon, Jul 31, 2023 at 11:11 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

+1. BTW, do we need the below functions (am_tablesync_worker(),
am_leader_apply_worker()) after this work?
static inline bool
am_tablesync_worker(void)
{
- return OidIsValid(MyLogicalRepWorker->relid);
+ return isTablesyncWorker(MyLogicalRepWorker);
}
static inline bool
am_leader_apply_worker(void)
{
- return (!am_tablesync_worker() &&
- !isParallelApplyWorker(MyLogicalRepWorker));
+ return isLeaderApplyWorker(MyLogicalRepWorker);
}

The am_xxx functions are removed now in the v2-0001 patch. See [1].

The replacement set of macros (the ones with no arg) are not strictly
necessary, except I felt it would make the code unnecessarily verbose
if we insist to pass MyLogicalRepWorker everywhere from the callers in
worker.c / tablesync.c / applyparallelworker.c.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#5)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 2, 2023 at 8:10 AM Peter Smith <smithpb2250@gmail.com> wrote:

The am_xxx functions are removed now in the v2-0001 patch. See [1].

The replacement set of macros (the ones with no arg) are not strictly
necessary, except I felt it would make the code unnecessarily verbose
if we insist to pass MyLogicalRepWorker everywhere from the callers in
worker.c / tablesync.c / applyparallelworker.c.

Agreed but having a dual set of macros is also not clean. Can we
provide only a unique set of inline functions instead?

--
With Regards,
Amit Kapila.

#7Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Peter Smith (#1)
Re: Adding a LogicalRepWorker type field

On Mon, Jul 31, 2023 at 10:47 AM Peter Smith <smithpb2250@gmail.com> wrote:

Hi hackers,

BACKGROUND:

The logical replication has different worker "types" (e.g. apply
leader, apply parallel, tablesync).

They all use a common structure called LogicalRepWorker, but at times
it is necessary to know what "type" of worker a given LogicalRepWorker
represents.

Once, there were just apply workers and tablesync workers - these were
easily distinguished because only tablesync workers had a valid
'relid' field. Next, parallel-apply workers were introduced - these
are distinguishable from the apply leaders by the value of
'leader_pid' field.

PROBLEM:

IMO, deducing the worker's type by examining multiple different field
values seems a dubious way to do it. This maybe was reasonable enough
when there were only 2 types, but as more get added it becomes
increasingly complicated.

SOLUTION:

AFAIK none of the complications is necessary anyway; the worker type
is already known at launch time, and it never changes during the life
of the process.

The attached Patch 0001 introduces a new enum 'type' field, which is
assigned during the launch.

~

This change not only simplifies the code, but it also permits other
code optimizations, because now we can switch on the worker enum type,
instead of using cascading if/else. (see Patch 0002).

Thoughts?

I can see the problem you stated and it's true that the worker's type
never changes during its lifetime. But I'm not sure we really need to
add a new variable to LogicalRepWorker since we already have enough
information there to distinguish the worker's type.

Currently, we deduce the worker's type by checking some fields that
never change such as relid and leader_piid. It's fine to me as long as
we encapcel the deduction of worker's type by macros (or inline
functions). Even with the proposed patch (0001 patch), we still need a
similar logic to determine the worker's type.

If we want to change both process_syncing_tables() and
should_apply_changes_for_rel() to use the switch statement instead of
using cascading if/else, I think we can have a function, say
LogicalRepWorkerType(), that returns LogicalRepWorkerType of the given
worker.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#8Peter Smith
smithpb2250@gmail.com
In reply to: Masahiko Sawada (#7)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 2, 2023 at 3:35 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:

On Mon, Jul 31, 2023 at 10:47 AM Peter Smith <smithpb2250@gmail.com> wrote:

Hi hackers,

BACKGROUND:

The logical replication has different worker "types" (e.g. apply
leader, apply parallel, tablesync).

They all use a common structure called LogicalRepWorker, but at times
it is necessary to know what "type" of worker a given LogicalRepWorker
represents.

Once, there were just apply workers and tablesync workers - these were
easily distinguished because only tablesync workers had a valid
'relid' field. Next, parallel-apply workers were introduced - these
are distinguishable from the apply leaders by the value of
'leader_pid' field.

PROBLEM:

IMO, deducing the worker's type by examining multiple different field
values seems a dubious way to do it. This maybe was reasonable enough
when there were only 2 types, but as more get added it becomes
increasingly complicated.

SOLUTION:

AFAIK none of the complications is necessary anyway; the worker type
is already known at launch time, and it never changes during the life
of the process.

The attached Patch 0001 introduces a new enum 'type' field, which is
assigned during the launch.

~

This change not only simplifies the code, but it also permits other
code optimizations, because now we can switch on the worker enum type,
instead of using cascading if/else. (see Patch 0002).

Thoughts?

I can see the problem you stated and it's true that the worker's type
never changes during its lifetime. But I'm not sure we really need to
add a new variable to LogicalRepWorker since we already have enough
information there to distinguish the worker's type.

Currently, we deduce the worker's type by checking some fields that
never change such as relid and leader_piid. It's fine to me as long as
we encapcel the deduction of worker's type by macros (or inline
functions). Even with the proposed patch (0001 patch), we still need a
similar logic to determine the worker's type.

Thanks for the feedback.

I agree that the calling code will not look very different
with/without this patch 0001, because everything is hidden by the
macros/functions. But those HEAD macros/functions are also hiding the
inefficiencies of the type deduction -- e.g. IMO it is quite strange
that we can only recognize the worker is an "apply worker" by
eliminating the other 2 possibilities.

As further background, the idea for this patch came from considering
another thread to "re-use" workers [1]reuse workers - /messages/by-id/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com. For example, when thinking
about re-using tablesync workers the HEAD am_tablesync_worker()
function begins to fall apart -- ie. it does not let you sensibly
disassociate a tablesync worker from its relid (which you might want
to do if tablesync workers were "pooled") because in the HEAD code any
tablesync worker without a relid is (by definition) NOT recognized as
a tablesync worker anymore!

Those above issues just disappear when a type field is added.

If we want to change both process_syncing_tables() and
should_apply_changes_for_rel() to use the switch statement instead of
using cascading if/else, I think we can have a function, say
LogicalRepWorkerType(), that returns LogicalRepWorkerType of the given
worker.

The point of that "switch" in patch 0002 was to demonstrate that with
a worker-type enum we can write more *efficient* code in some places
than the current cascading if/else. I think making a "switch" just for
the sake of it, by adding more functions that hide yet more work, is
going in the opposite direction.

------
[1]: reuse workers - /messages/by-id/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
/messages/by-id/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

#9Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#6)
2 attachment(s)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 2, 2023 at 1:00 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Aug 2, 2023 at 8:10 AM Peter Smith <smithpb2250@gmail.com> wrote:

The am_xxx functions are removed now in the v2-0001 patch. See [1].

The replacement set of macros (the ones with no arg) are not strictly
necessary, except I felt it would make the code unnecessarily verbose
if we insist to pass MyLogicalRepWorker everywhere from the callers in
worker.c / tablesync.c / applyparallelworker.c.

Agreed but having a dual set of macros is also not clean. Can we
provide only a unique set of inline functions instead?

We can't use the same names for both with/without-parameter functions
because there is no function overloading in C. In patch v3-0001 I've
replaced the "dual set of macros", with a single inline function of a
different name, and one set of space-saving macros.

PSA v3

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v3-0001-Add-LogicalRepWorkerType-enum.patchapplication/octet-stream; name=v3-0001-Add-LogicalRepWorkerType-enum.patchDownload
From 35bd37aff2be272aa1ce3a0edd70f0e3f1373563 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 2 Aug 2023 16:34:24 +1000
Subject: [PATCH v3] Add LogicalRepWorkerType enum

---
 .../replication/logical/applyparallelworker.c      | 12 +++---
 src/backend/replication/logical/launcher.c         | 24 ++++++++----
 src/backend/replication/logical/tablesync.c        |  4 +-
 src/backend/replication/logical/worker.c           | 44 +++++++++++-----------
 src/include/replication/worker_internal.h          | 33 ++++++++--------
 src/tools/pgindent/typedefs.list                   |  1 +
 6 files changed, 64 insertions(+), 54 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 6fb9614..6d25fde 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -265,7 +265,7 @@ static bool
 pa_can_start(void)
 {
 	/* Only leader apply workers can start parallel apply workers. */
-	if (!am_leader_apply_worker())
+	if (!IsLeaderApplyWorker())
 		return false;
 
 	/*
@@ -555,7 +555,7 @@ pa_find_worker(TransactionId xid)
 static void
 pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
-	Assert(!am_parallel_apply_worker());
+	Assert(!IsParallelApplyWorker());
 	Assert(winfo->in_use);
 	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
@@ -1506,7 +1506,7 @@ pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
 
 	if (fileset_state == FS_SERIALIZE_DONE)
 	{
-		Assert(am_leader_apply_worker());
+		Assert(IsLeaderApplyWorker());
 		Assert(MyLogicalRepWorker->stream_fileset);
 		wshared->fileset = *MyLogicalRepWorker->stream_fileset;
 	}
@@ -1522,7 +1522,7 @@ pa_get_fileset_state(void)
 {
 	PartialFileSetState fileset_state;
 
-	Assert(am_parallel_apply_worker());
+	Assert(IsParallelApplyWorker());
 
 	SpinLockAcquire(&MyParallelShared->mutex);
 	fileset_state = MyParallelShared->fileset_state;
@@ -1593,7 +1593,7 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
 void
 pa_decr_and_wait_stream_block(void)
 {
-	Assert(am_parallel_apply_worker());
+	Assert(IsParallelApplyWorker());
 
 	/*
 	 * It is only possible to not have any pending stream chunks when we are
@@ -1620,7 +1620,7 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
-	Assert(am_leader_apply_worker());
+	Assert(IsLeaderApplyWorker());
 
 	/*
 	 * Unlock the shared object lock so that parallel apply worker can
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d..9022b64 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -259,7 +259,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
 		/* Skip parallel apply workers. */
-		if (isParallelApplyWorker(w))
+		if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
 			continue;
 
 		if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -427,6 +427,13 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	if (OidIsValid(relid))
+		worker->type = TYPE_TABLESYNC_WORKER;
+	else if (is_parallel_apply_worker)
+		worker->type = TYPE_PARALLEL_APPLY_WORKER;
+	else
+		worker->type = TYPE_APPLY_WORKER;
+
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -601,7 +608,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 	if (worker)
 	{
-		Assert(!isParallelApplyWorker(worker));
+		Assert(!is_worker_type(worker, TYPE_PARALLEL_APPLY_WORKER));
 		logicalrep_worker_stop_internal(worker, SIGTERM);
 	}
 
@@ -643,7 +650,7 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 	worker = &LogicalRepCtx->workers[slot_no];
-	Assert(isParallelApplyWorker(worker));
+	Assert(is_worker_type(worker, TYPE_PARALLEL_APPLY_WORKER));
 
 	/*
 	 * Only stop the worker if the generation matches and the worker is alive.
@@ -729,7 +736,7 @@ static void
 logicalrep_worker_detach(void)
 {
 	/* Stop the parallel apply workers. */
-	if (am_leader_apply_worker())
+	if (IsLeaderApplyWorker())
 	{
 		List	   *workers;
 		ListCell   *lc;
@@ -749,7 +756,7 @@ logicalrep_worker_detach(void)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-			if (isParallelApplyWorker(w))
+			if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
 				logicalrep_worker_stop_internal(w, SIGTERM);
 		}
 
@@ -772,6 +779,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 {
 	Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
 
+	worker->type = TYPE_UNKNOWN;
 	worker->in_use = false;
 	worker->proc = NULL;
 	worker->dbid = InvalidOid;
@@ -868,7 +876,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (w->subid == subid && is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
 			res++;
 	}
 
@@ -1237,7 +1245,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+		if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER) && w->proc && pid == w->proc->pid)
 		{
 			leader_pid = w->leader_pid;
 			break;
@@ -1290,7 +1298,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
 
-		if (isParallelApplyWorker(&worker))
+		if (is_worker_type(&worker, TYPE_PARALLEL_APPLY_WORKER))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d46165..e359286 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -652,10 +652,10 @@ process_syncing_tables(XLogRecPtr current_lsn)
 	 * that are in a READY state. See pa_can_start() and
 	 * should_apply_changes_for_rel().
 	 */
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 		return;
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		process_syncing_tables_for_sync(current_lsn);
 	else
 		process_syncing_tables_for_apply(current_lsn);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd..d165db6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,9 +486,9 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	else if (IsParallelApplyWorker())
 	{
 		/* We don't synchronize rel's that are in unknown state. */
 		if (rel->state != SUBREL_STATE_READY &&
@@ -1054,7 +1054,7 @@ apply_handle_begin_prepare(StringInfo s)
 	LogicalRepPreparedTxnData begin_data;
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
@@ -1293,7 +1293,7 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
@@ -1423,7 +1423,7 @@ apply_handle_origin(StringInfo s)
 	 */
 	if (!in_streamed_transaction &&
 		(!in_remote_transaction ||
-		 (IsTransactionState() && !am_tablesync_worker())))
+		 (IsTransactionState() && !IsTablesyncWorker())))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("ORIGIN message sent out of order")));
@@ -2020,7 +2020,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
-	if (!am_parallel_apply_worker())
+	if (!IsParallelApplyWorker())
 		maybe_start_skipping_changes(lsn);
 
 	/* Make sure we have an open transaction */
@@ -3440,7 +3440,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	 * Skip for parallel apply workers, because the lsn_mapping is maintained
 	 * by the leader apply worker.
 	 */
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 		return;
 
 	/* Need to do this in permanent context */
@@ -3832,7 +3832,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 static void
 apply_worker_exit(void)
 {
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 	{
 		/*
 		 * Don't stop the parallel apply worker as the leader will detect the
@@ -3851,7 +3851,7 @@ apply_worker_exit(void)
 	 * subscription is still active, and so that we won't leak that hash table
 	 * entry if it isn't.
 	 */
-	if (!am_tablesync_worker())
+	if (!IsTablesyncWorker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	proc_exit(0);
@@ -3894,7 +3894,7 @@ maybe_reread_subscription(void)
 						MySubscription->name)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
@@ -3932,7 +3932,7 @@ maybe_reread_subscription(void)
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
-		if (am_parallel_apply_worker())
+		if (IsParallelApplyWorker())
 			ereport(LOG,
 					(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
 							MySubscription->name)));
@@ -4359,7 +4359,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 {
 	char	   *syncslotname = NULL;
 
-	Assert(am_tablesync_worker());
+	Assert(IsTablesyncWorker());
 
 	PG_TRY();
 	{
@@ -4416,7 +4416,7 @@ start_apply(XLogRecPtr origin_startpos)
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			pgstat_report_subscription_error(MySubscription->oid, !IsTablesyncWorker());
 
 			PG_RE_THROW();
 		}
@@ -4465,7 +4465,7 @@ InitializeApplyWorker(void)
 						MyLogicalRepWorker->subid)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
@@ -4491,7 +4491,7 @@ InitializeApplyWorker(void)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
@@ -4545,7 +4545,7 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 	{
 		start_table_sync(&origin_startpos, &myslotname);
 
@@ -4658,7 +4658,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
 
-	if (!am_tablesync_worker())
+	if (!IsTablesyncWorker())
 	{
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
@@ -4727,7 +4727,7 @@ DisableSubscriptionAndExit(void)
 
 	/* Report the worker failed during either table synchronization or apply */
 	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
+									 !IsTablesyncWorker());
 
 	/* Disable the subscription */
 	StartTransactionCommand();
@@ -4735,7 +4735,7 @@ DisableSubscriptionAndExit(void)
 	CommitTransactionCommand();
 
 	/* Ensure we remove no-longer-useful entry for worker's start time */
-	if (!am_tablesync_worker() && !am_parallel_apply_worker())
+	if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	/* Notify the subscription has been disabled and exit */
@@ -4761,7 +4761,7 @@ IsLogicalWorker(void)
 bool
 IsLogicalParallelApplyWorker(void)
 {
-	return IsLogicalWorker() && am_parallel_apply_worker();
+	return IsLogicalWorker() && IsParallelApplyWorker();
 }
 
 /*
@@ -4826,7 +4826,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
 	XLogRecPtr	myskiplsn = MySubscription->skiplsn;
 	bool		started_tx = false;
 
-	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
+	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || IsParallelApplyWorker())
 		return;
 
 	if (!IsTransactionState())
@@ -5060,7 +5060,7 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 {
 	*winfo = NULL;
 
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 	{
 		return TRANS_PARALLEL_APPLY;
 	}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781..2c9a069 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,8 +27,20 @@
 #include "storage/spin.h"
 
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	TYPE_UNKNOWN = 0,
+	TYPE_TABLESYNC_WORKER,
+	TYPE_APPLY_WORKER,
+	TYPE_PARALLEL_APPLY_WORKER
+} LogicalRepWorkerType;
+
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -305,25 +317,14 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
-
 static inline bool
-am_tablesync_worker(void)
+is_worker_type(LogicalRepWorker *worker, LogicalRepWorkerType type)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return worker->type == type;
 }
 
-static inline bool
-am_leader_apply_worker(void)
-{
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
-}
-
-static inline bool
-am_parallel_apply_worker(void)
-{
-	return isParallelApplyWorker(MyLogicalRepWorker);
-}
+#define IsLeaderApplyWorker() is_worker_type(MyLogicalRepWorker, TYPE_APPLY_WORKER)
+#define IsParallelApplyWorker() is_worker_type(MyLogicalRepWorker, TYPE_PARALLEL_APPLY_WORKER)
+#define IsTablesyncWorker() is_worker_type(MyLogicalRepWorker, TYPE_TABLESYNC_WORKER)
 
 #endif							/* WORKER_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e941fb6..0b39394 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1498,6 +1498,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

v3-0002-Switch-on-worker-type.patchapplication/octet-stream; name=v3-0002-Switch-on-worker-type.patchDownload
From 99c7b33014f009473db33a21e133c946d22fb511 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 2 Aug 2023 16:37:49 +1000
Subject: [PATCH v3] Switch on worker type

---
 src/backend/replication/logical/tablesync.c | 32 ++++++++++++++--------
 src/backend/replication/logical/worker.c    | 42 +++++++++++++++++------------
 2 files changed, 46 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e359286..98ebbc9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -647,18 +647,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (IsParallelApplyWorker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case TYPE_TABLESYNC_WORKER:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
 
-	if (IsTablesyncWorker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case TYPE_APPLY_WORKER:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case TYPE_PARALLEL_APPLY_WORKER:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+
+		case TYPE_UNKNOWN:
+			Assert(false);
+			break;
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d165db6..dabfa48 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,25 +486,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (IsTablesyncWorker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (IsParallelApplyWorker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
-		return rel->state == SUBREL_STATE_READY;
+		case TYPE_TABLESYNC_WORKER:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case TYPE_APPLY_WORKER:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case TYPE_PARALLEL_APPLY_WORKER:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+									MySubscription->name),
+							 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+			return rel->state == SUBREL_STATE_READY;
+
+		case TYPE_UNKNOWN:
+			Assert(false);
+			return false;
 	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

#10Bharath Rupireddy
bharath.rupireddyforpostgres@gmail.com
In reply to: Peter Smith (#9)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 2, 2023 at 12:14 PM Peter Smith <smithpb2250@gmail.com> wrote:

We can't use the same names for both with/without-parameter functions
because there is no function overloading in C. In patch v3-0001 I've
replaced the "dual set of macros", with a single inline function of a
different name, and one set of space-saving macros.

PSA v3

Quick thoughts:

1.
+typedef enum LogicalRepWorkerType
+{
+    TYPE_UNKNOWN = 0,
+    TYPE_TABLESYNC_WORKER,
+    TYPE_APPLY_WORKER,
+    TYPE_PARALLEL_APPLY_WORKER
+} LogicalRepWorkerType;

-1 for these names starting with prefix TYPE_, in fact LR_ looked better.

2.
+is_worker_type(LogicalRepWorker *worker, LogicalRepWorkerType type)

This function name looks too generic, an element of logical
replication is better in the name.

3.
+#define IsLeaderApplyWorker() is_worker_type(MyLogicalRepWorker,
TYPE_APPLY_WORKER)
+#define IsParallelApplyWorker() is_worker_type(MyLogicalRepWorker,
TYPE_PARALLEL_APPLY_WORKER)
+#define IsTablesyncWorker() is_worker_type(MyLogicalRepWorker,
TYPE_TABLESYNC_WORKER)

My thoughts were around removing am_XXX_worker() and
IsXXXWorker(&worker) functions and just have IsXXXWorker(&worker)
alone with using IsXXXWorker(MyLogicalRepWorker) in places where
am_XXX_worker() is used. If implementing this leads to something like
the above with is_worker_type, -1 from me.

Done. I converged everything to the macro-naming style as suggested,
but I chose not to pass MyLogicalRepWorker as a parameter for the
normal (am_xxx case) otherwise I felt it would make the calling code
unnecessarily verbose.

With is_worker_type, the calling code now looks even more verbose. I
don't think IsLeaderApplyWorker(MyLogicalRepWorker) is a bad idea.
This unifies multiple worker type functions into one and makes code
simple.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Bharath Rupireddy (#10)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 2, 2023 at 2:46 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:

On Wed, Aug 2, 2023 at 12:14 PM Peter Smith <smithpb2250@gmail.com> wrote:

We can't use the same names for both with/without-parameter functions
because there is no function overloading in C. In patch v3-0001 I've
replaced the "dual set of macros", with a single inline function of a
different name, and one set of space-saving macros.

PSA v3

Quick thoughts:

1.
+typedef enum LogicalRepWorkerType
+{
+    TYPE_UNKNOWN = 0,
+    TYPE_TABLESYNC_WORKER,
+    TYPE_APPLY_WORKER,
+    TYPE_PARALLEL_APPLY_WORKER
+} LogicalRepWorkerType;

-1 for these names starting with prefix TYPE_, in fact LR_ looked better.

2.
+is_worker_type(LogicalRepWorker *worker, LogicalRepWorkerType type)

This function name looks too generic, an element of logical
replication is better in the name.

3.
+#define IsLeaderApplyWorker() is_worker_type(MyLogicalRepWorker,
TYPE_APPLY_WORKER)
+#define IsParallelApplyWorker() is_worker_type(MyLogicalRepWorker,
TYPE_PARALLEL_APPLY_WORKER)
+#define IsTablesyncWorker() is_worker_type(MyLogicalRepWorker,
TYPE_TABLESYNC_WORKER)

My thoughts were around removing am_XXX_worker() and
IsXXXWorker(&worker) functions and just have IsXXXWorker(&worker)
alone with using IsXXXWorker(MyLogicalRepWorker) in places where
am_XXX_worker() is used. If implementing this leads to something like
the above with is_worker_type, -1 from me.

What I was suggesting to Peter to have something like:
static inline bool
am_tablesync_worker(void)
{
return (MyLogicalRepWorker->type == TYPE_APPLY_WORKER);
}

--
With Regards,
Amit Kapila.

#12Melih Mutlu
m.melihmutlu@gmail.com
In reply to: Peter Smith (#8)
Re: Adding a LogicalRepWorker type field

Hi,

Peter Smith <smithpb2250@gmail.com>, 2 Ağu 2023 Çar, 09:27 tarihinde şunu
yazdı:

On Wed, Aug 2, 2023 at 3:35 PM Masahiko Sawada <sawada.mshk@gmail.com>
wrote:

I can see the problem you stated and it's true that the worker's type
never changes during its lifetime. But I'm not sure we really need to
add a new variable to LogicalRepWorker since we already have enough
information there to distinguish the worker's type.

Currently, we deduce the worker's type by checking some fields that
never change such as relid and leader_piid. It's fine to me as long as
we encapcel the deduction of worker's type by macros (or inline
functions). Even with the proposed patch (0001 patch), we still need a
similar logic to determine the worker's type.

Thanks for the feedback.

I agree that the calling code will not look very different
with/without this patch 0001, because everything is hidden by the
macros/functions. But those HEAD macros/functions are also hiding the
inefficiencies of the type deduction -- e.g. IMO it is quite strange
that we can only recognize the worker is an "apply worker" by
eliminating the other 2 possibilities.

Isn't the apply worker type still decided by eliminating the other choices
even with the patch?

  /* Prepare the worker slot. */
+ if (OidIsValid(relid))
+ worker->type = TYPE_TABLESYNC_WORKER;
+ else if (is_parallel_apply_worker)
+ worker->type = TYPE_PARALLEL_APPLY_WORKER;
+ else
+ worker->type = TYPE_APPLY_WORKER;
3.
+#define IsLeaderApplyWorker() is_worker_type(MyLogicalRepWorker,
TYPE_APPLY_WORKER)
+#define IsParallelApplyWorker() is_worker_type(MyLogicalRepWorker,
TYPE_PARALLEL_APPLY_WORKER)
+#define IsTablesyncWorker() is_worker_type(MyLogicalRepWorker,
TYPE_TABLESYNC_WORKER)

My thoughts were around removing am_XXX_worker() and
IsXXXWorker(&worker) functions and just have IsXXXWorker(&worker)
alone with using IsXXXWorker(MyLogicalRepWorker) in places where
am_XXX_worker() is used. If implementing this leads to something like
the above with is_worker_type, -1 from me.

I agree that having both is_worker_type and IsXXXWorker makes the code more
confusing. Especially both type check ways are used in the same
file/function as follows:

logicalrep_worker_detach(void)

{
/* Stop the parallel apply workers. */
- if (am_leader_apply_worker())
+ if (IsLeaderApplyWorker())
{
List *workers;
ListCell *lc;
@@ -749,7 +756,7 @@ logicalrep_worker_detach(void)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);

- if (isParallelApplyWorker(w))
+ if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
logicalrep_worker_stop_internal(w, SIGTERM);
}

Regards,
--
Melih Mutlu
Microsoft

#13Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Peter Smith (#9)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 2, 2023 at 12:14 PM Peter Smith <smithpb2250@gmail.com> wrote:

We can't use the same names for both with/without-parameter functions
because there is no function overloading in C. In patch v3-0001 I've
replaced the "dual set of macros", with a single inline function of a
different name, and one set of space-saving macros.

I think it's a good idea to add worker type field. Trying to deduce it
based on the contents of the structure isn't good. RelOptInfo, for example,
has RelOptKind. But RelOptInfo has become really large with members
required by all RelOptKinds crammed under the same structure. If we can
avoid that here at the beginning itself, that will be great. May be we
should create a union of type specific members while we are introducing the
type. This will also provide some protection against unrelated members
being (mis)used in the future.

--
Best Wishes,
Ashutosh Bapat

#14Peter Smith
smithpb2250@gmail.com
In reply to: Ashutosh Bapat (#13)
Re: Adding a LogicalRepWorker type field

Thank you for the feedback received so far. Sorry, I have become busy
lately with other work.

IIUC there is a general +1 for the idea, but I still have some
juggling necessary to make the functions/macros of patch 0001
acceptable to everybody.

The difficulties arise from:
- no function overloading C
- ideally, we prefer the same names for everything (e.g. instead of
dual-set macros)
- but launcher code calls need to pass param (other code always uses
MyLogicalRepWorker)
- would be nice (although no mandatory) to not have to always pass
MyLogicalRepWorker as a param.

Anyway, I will work towards finding some compromise on this next week.
Currently, I feel the best choice is to go with what Bharath suggested
in the first place -- just always pass the parameter (including
passing MyLogicalRepWorker). I will think more about it...

------

Meanwhile, here are some replies to other feedback received:

Ashutosh -- "May be we should create a union of type specific members" [1]Ashutosh - /messages/by-id/CAExHW5uALiimrdpdO0vwuDivD99TW+_9vvfFsErVNzq1ehYV9Q@mail.gmail.com.

Yes, I was wondering this myself, but I won't pursue this yet until
getting over the hurdle of finding acceptable functions/macros for
patch 0001. Hopefully, we can come back to this idea.

~~

Mellih -- "Isn't the apply worker type still decided by eliminating
the other choices even with the patch?".

AFAIK the only case of that now is the one-time check in the
logicalrep_worker_launch() function. IMO, that is quite a different
proposition to the HEAD macros that have to make that deduction
10,000s ot times in executing code. Actually, even the caller knows
exactly the kind of worker it wants to launch so we can pass the
LogicalRepWorkerType directly to logicalrep_worker_launch() and
eliminate even this one-time-check. I can code it that way in the next
patch version.

~~

Barath -- "-1 for these names starting with prefix TYPE_, in fact LR_
looked better."

Hmmm. I'm not sure what is best. Of the options below I prefer
"WORKER_TYPE_xxx", but I don't mind so long as there is a consensus.

LR_APPLY_WORKER
LR_PARALLEL_APPLY_WORKER
LR_TABLESYNC_WORKER

TYPE_APPLY_WORKERT
TYPE_PARALLEL_APPLY_WORKER
TYPE_TABLESYNC_WORKER

WORKER_TYPE_APPLY
WORKER_TYPE_PARALLEL_APPLY
WORKER_TYPE_TABLESYNC

APPLY_WORKER
PARALLEL_APPLY_WORKER
TABLESYNC_WORKER

APPLY
PARALLEL_APPLY
TABLESYNC

------
[1]: Ashutosh - /messages/by-id/CAExHW5uALiimrdpdO0vwuDivD99TW+_9vvfFsErVNzq1ehYV9Q@mail.gmail.com
[2]: Melih - /messages/by-id/CAGPVpCRZ-zEOa2Lkvw+iTU3NhJzfbGwH1dU7Htreup--8e5nxg@mail.gmail.com
[3]: Bharath - /messages/by-id/CALj2ACVro6oCsTg_gpYu+V_LPMSgk4wjmSPDk8d5thArWNRoRQ@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

#15Peter Smith
smithpb2250@gmail.com
In reply to: Peter Smith (#14)
2 attachment(s)
Re: Adding a LogicalRepWorker type field

PSA v4 patches.

On Fri, Aug 4, 2023 at 5:45 PM Peter Smith <smithpb2250@gmail.com> wrote:

Thank you for the feedback received so far. Sorry, I have become busy
lately with other work.

IIUC there is a general +1 for the idea, but I still have some
juggling necessary to make the functions/macros of patch 0001
acceptable to everybody.

The difficulties arise from:
- no function overloading C
- ideally, we prefer the same names for everything (e.g. instead of
dual-set macros)
- but launcher code calls need to pass param (other code always uses
MyLogicalRepWorker)
- would be nice (although no mandatory) to not have to always pass
MyLogicalRepWorker as a param.

Anyway, I will work towards finding some compromise on this next week.
Currently, I feel the best choice is to go with what Bharath suggested
in the first place -- just always pass the parameter (including
passing MyLogicalRepWorker). I will think more about it...

v4-0001 uses only 3 simple inline functions. Callers always pass
parameters as Bharath had suggested.

------

Meanwhile, here are some replies to other feedback received:

Ashutosh -- "May be we should create a union of type specific members" [1].

Yes, I was wondering this myself, but I won't pursue this yet until
getting over the hurdle of finding acceptable functions/macros for
patch 0001. Hopefully, we can come back to this idea.

To be explored later.

~~

Mellih -- "Isn't the apply worker type still decided by eliminating
the other choices even with the patch?".

AFAIK the only case of that now is the one-time check in the
logicalrep_worker_launch() function. IMO, that is quite a different
proposition to the HEAD macros that have to make that deduction
10,000s ot times in executing code. Actually, even the caller knows
exactly the kind of worker it wants to launch so we can pass the
LogicalRepWorkerType directly to logicalrep_worker_launch() and
eliminate even this one-time-check. I can code it that way in the next
patch version.

Now even the one-time checking to assign the worker type is removed.
The callers know the LogicalReWorkerType they want, so v4-0001 just
passes the type into logicalrep_worker_launch()

~~

Barath -- "-1 for these names starting with prefix TYPE_, in fact LR_
looked better."

Hmmm. I'm not sure what is best. Of the options below I prefer
"WORKER_TYPE_xxx", but I don't mind so long as there is a consensus.

LR_APPLY_WORKER
LR_PARALLEL_APPLY_WORKER
LR_TABLESYNC_WORKER

TYPE_APPLY_WORKERT
TYPE_PARALLEL_APPLY_WORKER
TYPE_TABLESYNC_WORKER

WORKER_TYPE_APPLY
WORKER_TYPE_PARALLEL_APPLY
WORKER_TYPE_TABLESYNC

APPLY_WORKER
PARALLEL_APPLY_WORKER
TABLESYNC_WORKER

APPLY
PARALLEL_APPLY
TABLESYNC

For now, in v4-0001 these are called WORKERTYPE_xxx. Please do propose
better names if these are no good.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v4-0001-Add-LogicalRepWorkerType-enum.patchapplication/octet-stream; name=v4-0001-Add-LogicalRepWorkerType-enum.patchDownload
From a8996e809ee8d4f9a381f3589169b1a5a1077b1b Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 8 Aug 2023 17:17:26 +1000
Subject: [PATCH v4] Add LogicalRepWorkerType enum

Current HEAD, deduces a LogicalRepWorker's type from different fields ('relid'
and 'leader_pid'). But, the logical replication worker type is already known at
the time of launching the LogicalRepWorker and it never changes for the lifetime
of that process. So, instead of deducing the type, it is simpler to just store
it one time, and access it directly thereafter.

This patch adds a new enum LogicalRepWorkerType. A "type" field is added to
LogicalRepWorker. This type field is assigned when the worker is launched. Inline
functions is_leader_apply_worker(), is_parallel_apply_worker(), is_tablesync worker()
now just directly return the specified worker type.
---
 .../replication/logical/applyparallelworker.c      | 15 ++++---
 src/backend/replication/logical/launcher.c         | 51 +++++++++++++---------
 src/backend/replication/logical/tablesync.c        |  9 ++--
 src/backend/replication/logical/worker.c           | 40 ++++++++---------
 src/include/replication/worker_internal.h          | 29 +++++++-----
 src/tools/pgindent/typedefs.list                   |  1 +
 6 files changed, 84 insertions(+), 61 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c..779aec5 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -265,7 +265,7 @@ static bool
 pa_can_start(void)
 {
 	/* Only leader apply workers can start parallel apply workers. */
-	if (!am_leader_apply_worker())
+	if (!is_leader_apply_worker(MyLogicalRepWorker))
 		return false;
 
 	/*
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
 		return NULL;
 	}
 
-	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+										MyLogicalRepWorker->dbid,
 										MySubscription->oid,
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
@@ -555,7 +556,7 @@ pa_find_worker(TransactionId xid)
 static void
 pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
-	Assert(!am_parallel_apply_worker());
+	Assert(!is_parallel_apply_worker(MyLogicalRepWorker));
 	Assert(winfo->in_use);
 	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
@@ -1506,7 +1507,7 @@ pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
 
 	if (fileset_state == FS_SERIALIZE_DONE)
 	{
-		Assert(am_leader_apply_worker());
+		Assert(is_leader_apply_worker(MyLogicalRepWorker));
 		Assert(MyLogicalRepWorker->stream_fileset);
 		wshared->fileset = *MyLogicalRepWorker->stream_fileset;
 	}
@@ -1522,7 +1523,7 @@ pa_get_fileset_state(void)
 {
 	PartialFileSetState fileset_state;
 
-	Assert(am_parallel_apply_worker());
+	Assert(is_parallel_apply_worker(MyLogicalRepWorker));
 
 	SpinLockAcquire(&MyParallelShared->mutex);
 	fileset_state = MyParallelShared->fileset_state;
@@ -1593,7 +1594,7 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
 void
 pa_decr_and_wait_stream_block(void)
 {
-	Assert(am_parallel_apply_worker());
+	Assert(is_parallel_apply_worker(MyLogicalRepWorker));
 
 	/*
 	 * It is only possible to not have any pending stream chunks when we are
@@ -1620,7 +1621,7 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
-	Assert(am_leader_apply_worker());
+	Assert(is_leader_apply_worker(MyLogicalRepWorker));
 
 	/*
 	 * Unlock the shared object lock so that parallel apply worker can
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..7dc078a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -259,7 +259,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
 		/* Skip parallel apply workers. */
-		if (isParallelApplyWorker(w))
+		if (is_parallel_apply_worker(w))
 			continue;
 
 		if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+						 Oid dbid, Oid subid, const char *subname, Oid userid,
 						 Oid relid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+	bool		is_tablesync_wkr = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_parallel_apply_wkr = (wtype == WORKERTYPE_PARALLEL_APPLY);
 
-	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	/*
+	 * Sanity checks:
+	 * - must be valid worker type
+	 * - tablesync workers are only ones to have relid
+	 * - parallel apply worker is the only kind of subworker
+	 */
+	Assert(wtype != WORKERTYPE_UNKNOWN);
+	Assert(is_tablesync_wkr == OidIsValid(relid));
+	Assert(is_parallel_apply_wkr == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_wkr && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -405,7 +414,7 @@ retry:
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
 	 */
-	if (is_parallel_apply_worker &&
+	if (is_parallel_apply_wkr &&
 		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
@@ -427,6 +436,7 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type = wtype;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -438,8 +448,8 @@ retry:
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
 	worker->stream_fileset = NULL;
-	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
-	worker->parallel_apply = is_parallel_apply_worker;
+	worker->leader_pid = is_parallel_apply_wkr ? MyProcPid : InvalidPid;
+	worker->parallel_apply = is_parallel_apply_wkr;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -458,7 +468,7 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	if (is_parallel_apply_wkr)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -466,7 +476,7 @@ retry:
 				 subid);
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
 	}
-	else if (OidIsValid(relid))
+	else if (is_tablesync_wkr)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -488,7 +498,7 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
-	if (is_parallel_apply_worker)
+	if (is_parallel_apply_wkr)
 		memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
 
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
@@ -607,7 +617,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 	if (worker)
 	{
-		Assert(!isParallelApplyWorker(worker));
+		Assert(!is_parallel_apply_worker(worker));
 		logicalrep_worker_stop_internal(worker, SIGTERM);
 	}
 
@@ -649,7 +659,7 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 	worker = &LogicalRepCtx->workers[slot_no];
-	Assert(isParallelApplyWorker(worker));
+	Assert(is_parallel_apply_worker(worker));
 
 	/*
 	 * Only stop the worker if the generation matches and the worker is alive.
@@ -735,7 +745,7 @@ static void
 logicalrep_worker_detach(void)
 {
 	/* Stop the parallel apply workers. */
-	if (am_leader_apply_worker())
+	if (is_leader_apply_worker(MyLogicalRepWorker))
 	{
 		List	   *workers;
 		ListCell   *lc;
@@ -755,7 +765,7 @@ logicalrep_worker_detach(void)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-			if (isParallelApplyWorker(w))
+			if (is_parallel_apply_worker(w))
 				logicalrep_worker_stop_internal(w, SIGTERM);
 		}
 
@@ -874,7 +884,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (w->subid == subid && is_parallel_apply_worker(w))
 			res++;
 	}
 
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+				logicalrep_worker_launch(WORKERTYPE_APPLY,
+										 sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
 										 DSM_HANDLE_INVALID);
 			}
@@ -1243,7 +1254,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+		if (is_parallel_apply_worker(w) && w->proc && pid == w->proc->pid)
 		{
 			leader_pid = w->leader_pid;
 			break;
@@ -1296,7 +1307,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
 
-		if (isParallelApplyWorker(&worker))
+		if (is_parallel_apply_worker(&worker))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775..25343be 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
@@ -653,10 +654,10 @@ process_syncing_tables(XLogRecPtr current_lsn)
 	 * that are in a READY state. See pa_can_start() and
 	 * should_apply_changes_for_rel().
 	 */
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 		return;
 
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		process_syncing_tables_for_sync(current_lsn);
 	else
 		process_syncing_tables_for_apply(current_lsn);
@@ -1598,7 +1599,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
 {
 	char	   *sync_slotname = NULL;
 
-	Assert(am_tablesync_worker());
+	Assert(is_tablesync_worker(MyLogicalRepWorker));
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c1..82b09ff 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,9 +485,9 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	else if (is_parallel_apply_worker(MyLogicalRepWorker))
 	{
 		/* We don't synchronize rel's that are in unknown state. */
 		if (rel->state != SUBREL_STATE_READY &&
@@ -1053,7 +1053,7 @@ apply_handle_begin_prepare(StringInfo s)
 	LogicalRepPreparedTxnData begin_data;
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
@@ -1292,7 +1292,7 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
@@ -1422,7 +1422,7 @@ apply_handle_origin(StringInfo s)
 	 */
 	if (!in_streamed_transaction &&
 		(!in_remote_transaction ||
-		 (IsTransactionState() && !am_tablesync_worker())))
+		 (IsTransactionState() && !is_tablesync_worker(MyLogicalRepWorker))))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("ORIGIN message sent out of order")));
@@ -2019,7 +2019,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
-	if (!am_parallel_apply_worker())
+	if (!is_parallel_apply_worker(MyLogicalRepWorker))
 		maybe_start_skipping_changes(lsn);
 
 	/* Make sure we have an open transaction */
@@ -3452,7 +3452,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	 * Skip for parallel apply workers, because the lsn_mapping is maintained
 	 * by the leader apply worker.
 	 */
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 		return;
 
 	/* Need to do this in permanent context */
@@ -3844,7 +3844,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 static void
 apply_worker_exit(void)
 {
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 	{
 		/*
 		 * Don't stop the parallel apply worker as the leader will detect the
@@ -3863,7 +3863,7 @@ apply_worker_exit(void)
 	 * subscription is still active, and so that we won't leak that hash table
 	 * entry if it isn't.
 	 */
-	if (am_leader_apply_worker())
+	if (is_leader_apply_worker(MyLogicalRepWorker))
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	proc_exit(0);
@@ -3906,7 +3906,7 @@ maybe_reread_subscription(void)
 						MySubscription->name)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (am_leader_apply_worker())
+		if (is_leader_apply_worker(MyLogicalRepWorker))
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 		proc_exit(0);
@@ -3945,7 +3945,7 @@ maybe_reread_subscription(void)
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
-		if (am_parallel_apply_worker())
+		if (is_parallel_apply_worker(MyLogicalRepWorker))
 			ereport(LOG,
 					(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
 							MySubscription->name)));
@@ -4436,7 +4436,7 @@ start_apply(XLogRecPtr origin_startpos)
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			pgstat_report_subscription_error(MySubscription->oid, !is_tablesync_worker(MyLogicalRepWorker));
 
 			PG_RE_THROW();
 		}
@@ -4590,7 +4590,7 @@ InitializeLogRepWorker(void)
 						MyLogicalRepWorker->subid)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (am_leader_apply_worker())
+		if (is_leader_apply_worker(MyLogicalRepWorker))
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 		proc_exit(0);
@@ -4617,7 +4617,7 @@ InitializeLogRepWorker(void)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
@@ -4637,7 +4637,7 @@ SetupApplyOrSyncWorker(int worker_slot)
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
 
-	Assert(am_tablesync_worker() || am_leader_apply_worker());
+	Assert(is_tablesync_worker(MyLogicalRepWorker) || is_leader_apply_worker(MyLogicalRepWorker));
 
 	/* Setup signal handling */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -4709,7 +4709,7 @@ DisableSubscriptionAndExit(void)
 
 	/* Report the worker failed during either table synchronization or apply */
 	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
+									 !is_tablesync_worker(MyLogicalRepWorker));
 
 	/* Disable the subscription */
 	StartTransactionCommand();
@@ -4717,7 +4717,7 @@ DisableSubscriptionAndExit(void)
 	CommitTransactionCommand();
 
 	/* Ensure we remove no-longer-useful entry for worker's start time */
-	if (am_leader_apply_worker())
+	if (is_leader_apply_worker(MyLogicalRepWorker))
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	/* Notify the subscription has been disabled and exit */
@@ -4743,7 +4743,7 @@ IsLogicalWorker(void)
 bool
 IsLogicalParallelApplyWorker(void)
 {
-	return IsLogicalWorker() && am_parallel_apply_worker();
+	return IsLogicalWorker() && is_parallel_apply_worker(MyLogicalRepWorker);
 }
 
 /*
@@ -4808,7 +4808,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
 	XLogRecPtr	myskiplsn = MySubscription->skiplsn;
 	bool		started_tx = false;
 
-	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
+	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || is_parallel_apply_worker(MyLogicalRepWorker))
 		return;
 
 	if (!IsTransactionState())
@@ -5042,7 +5042,7 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 {
 	*winfo = NULL;
 
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 	{
 		return TRANS_PARALLEL_APPLY;
 	}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a711..0ae0949 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	WORKERTYPE_UNKNOWN = 0,
+	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_APPLY,
+	WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,25 +327,22 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
-
 static inline bool
-am_tablesync_worker(void)
+is_tablesync_worker(LogicalRepWorker *w)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return w->type == WORKERTYPE_TABLESYNC;
 }
 
 static inline bool
-am_leader_apply_worker(void)
+is_leader_apply_worker(LogicalRepWorker *w)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return w->type == WORKERTYPE_APPLY;
 }
 
 static inline bool
-am_parallel_apply_worker(void)
+is_parallel_apply_worker(LogicalRepWorker *w)
 {
-	return isParallelApplyWorker(MyLogicalRepWorker);
+	return w->type == WORKERTYPE_PARALLEL_APPLY;
 }
 
 #endif							/* WORKER_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc..52a8789 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

v4-0002-Switch-on-worker-type.patchapplication/octet-stream; name=v4-0002-Switch-on-worker-type.patchDownload
From 1900bb2c1305b59713d66fa6194515cfdbf16c93 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 8 Aug 2023 17:53:42 +1000
Subject: [PATCH v4] Switch on worker type.

Now that the LogicalRepWorker has an enum type we can code to switch on that type,
which should be more efficient than cascading if/else.
---
 src/backend/replication/logical/launcher.c  | 51 ++++++++++++++++-------------
 src/backend/replication/logical/tablesync.c | 31 +++++++++++-------
 src/backend/replication/logical/worker.c    | 42 ++++++++++++++----------
 3 files changed, 73 insertions(+), 51 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7dc078a..45a7739 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,30 +468,35 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_wkr)
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	}
-	else if (is_tablesync_wkr)
+	switch (wtype)
 	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication tablesync worker for subscription %u sync %u",
-				 subid,
-				 relid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
-	}
-	else
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+		case WORKERTYPE_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication tablesync worker for subscription %u sync %u",
+					 subid,
+					 relid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("logicalrep_worker_launch: unknown worker type"));
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 25343be..0b12461 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (is_parallel_apply_worker(MyLogicalRepWorker))
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
 
-	if (is_tablesync_worker(MyLogicalRepWorker))
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case WORKERTYPE_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("process_syncing_tables: Unknown worker type"));
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 82b09ff..93188f5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (is_tablesync_worker(MyLogicalRepWorker))
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (is_parallel_apply_worker(MyLogicalRepWorker))
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
-		return rel->state == SUBREL_STATE_READY;
+		case WORKERTYPE_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case WORKERTYPE_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case WORKERTYPE_PARALLEL_APPLY:
+				/* We don't synchronize rel's that are in unknown state. */
+				if (rel->state != SUBREL_STATE_READY &&
+					rel->state != SUBREL_STATE_UNKNOWN)
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+										MySubscription->name),
+								 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+				return rel->state == SUBREL_STATE_READY;
+
+		case WORKERTYPE_UNKNOWN:
+				ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));
 	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

#16Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#15)
Re: Adding a LogicalRepWorker type field

On Tue, Aug 8, 2023 at 1:39 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 4, 2023 at 5:45 PM Peter Smith <smithpb2250@gmail.com> wrote:

v4-0001 uses only 3 simple inline functions. Callers always pass
parameters as Bharath had suggested.

*
- Assert(am_leader_apply_worker());
+ Assert(is_leader_apply_worker(MyLogicalRepWorker));
...
- if (am_leader_apply_worker())
+ if (is_leader_apply_worker(MyLogicalRepWorker))

Passing everywhere MyLogicalRepWorker not only increased the code
change footprint but doesn't appear any better to me. Instead, let
am_parallel_apply_worker() keep calling
isParallelApplyWorker(MyLogicalRepWorker) as it is doing now. I feel
even if you or others feel that is a better idea, we can debate it
separately after the main patch is done because as far as I understand
that is not the core idea of this proposal.

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

--
With Regards,
Amit Kapila.

#17Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#16)
2 attachment(s)
Re: Adding a LogicalRepWorker type field

On Wed, Aug 9, 2023 at 4:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Tue, Aug 8, 2023 at 1:39 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 4, 2023 at 5:45 PM Peter Smith <smithpb2250@gmail.com> wrote:

v4-0001 uses only 3 simple inline functions. Callers always pass
parameters as Bharath had suggested.

*
- Assert(am_leader_apply_worker());
+ Assert(is_leader_apply_worker(MyLogicalRepWorker));
...
- if (am_leader_apply_worker())
+ if (is_leader_apply_worker(MyLogicalRepWorker))

Passing everywhere MyLogicalRepWorker not only increased the code
change footprint but doesn't appear any better to me. Instead, let
am_parallel_apply_worker() keep calling
isParallelApplyWorker(MyLogicalRepWorker) as it is doing now. I feel
even if you or others feel that is a better idea, we can debate it
separately after the main patch is done because as far as I understand
that is not the core idea of this proposal.

Right, those changes were not really core. Reverted as suggested. PSA v5.

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

Done.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v5-0002-Switch-on-worker-type.patchapplication/octet-stream; name=v5-0002-Switch-on-worker-type.patchDownload
From 9e35a9d521023ed4d54f5738b3c99407a5fe3e02 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 10 Aug 2023 12:11:27 +1000
Subject: [PATCH v5] Switch on worker type.

---
 src/backend/replication/logical/launcher.c  | 51 ++++++++++++++++-------------
 src/backend/replication/logical/tablesync.c | 31 +++++++++++-------
 src/backend/replication/logical/worker.c    | 42 ++++++++++++++----------
 3 files changed, 73 insertions(+), 51 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 2a1602c..3ad315c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,30 +468,35 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	}
-	else if (is_tablesync_worker)
+	switch (worker->type)
 	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication tablesync worker for subscription %u sync %u",
-				 subid,
-				 relid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
-	}
-	else
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+		case WORKERTYPE_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication tablesync worker for subscription %u sync %u",
+					 subid,
+					 relid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("logicalrep_worker_launch: unknown worker type"));
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67bdd14..89d9cec 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (am_parallel_apply_worker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_PARALLEL_APPLY:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
 
-	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case WORKERTYPE_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
+
+		case WORKERTYPE_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("process_syncing_tables: Unknown worker type"));
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c1..f596366 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+		case WORKERTYPE_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+								MySubscription->name),
+						 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+			return rel->state == SUBREL_STATE_READY;
+
+		case WORKERTYPE_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));
+		}
 
-		return rel->state == SUBREL_STATE_READY;
-	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

v5-0001-Add-LogicalRepWorkerType-enum.patchapplication/octet-stream; name=v5-0001-Add-LogicalRepWorkerType-enum.patchDownload
From da405938e1f9040f241478fa03a9f9db4ebc615a Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 10 Aug 2023 11:39:12 +1000
Subject: [PATCH v5] Add LogicalRepWorkerType enum.

Current HEAD code deduces a LogicalRepWorker's type from the values of several
different fields ('relid' and 'leader_pid') whenever logic needs to know it.

In fact, the logical replication worker type is already known at the time of
launching the LogicalRepWorker and it never changes for the lifetime of that
process. Instead of deducing the type, it is simpler to just store it one time,
and access it directly thereafter.

This patch adds a new enum LogicalRepWorkerType. A 'type' field is added to
LogicalRepWorker. This field is assigned when the worker is launched.
---
 .../replication/logical/applyparallelworker.c      |  3 ++-
 src/backend/replication/logical/launcher.c         | 25 ++++++++++++++++------
 src/backend/replication/logical/tablesync.c        |  3 ++-
 src/include/replication/worker_internal.h          | 23 +++++++++++++++-----
 src/tools/pgindent/typedefs.list                   |  1 +
 5 files changed, 41 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c..4e8ee29 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
 		return NULL;
 	}
 
-	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+										MyLogicalRepWorker->dbid,
 										MySubscription->oid,
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..2a1602c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+						 Oid dbid, Oid subid, const char *subname, Oid userid,
 						 Oid relid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+	bool		is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
 
-	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	/*
+	 * Sanity checks:
+	 * - must be valid worker type
+	 * - tablesync workers are only ones to have relid
+	 * - parallel apply worker is the only kind of subworker
+	 */
+	Assert(wtype != WORKERTYPE_UNKNOWN);
+	Assert(is_tablesync_worker == OidIsValid(relid));
+	Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -427,6 +436,7 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type = wtype;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -466,7 +476,7 @@ retry:
 				 subid);
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
 	}
-	else if (OidIsValid(relid))
+	else if (is_tablesync_worker)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+				logicalrep_worker_launch(WORKERTYPE_APPLY,
+										 sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
 										 DSM_HANDLE_INVALID);
 			}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775..67bdd14 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a711..ce44776 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	WORKERTYPE_UNKNOWN = 0,
+	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_APPLY,
+	WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,20 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isLeaderApplyWorker(worker) ((worker)->type == WORKERTYPE_APPLY)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return isTablesyncWorker(MyLogicalRepWorker);
 }
 
 static inline bool
 am_leader_apply_worker(void)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return isLeaderApplyWorker(MyLogicalRepWorker);
 }
 
 static inline bool
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc..52a8789 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

#18Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#17)
1 attachment(s)
Re: Adding a LogicalRepWorker type field

On Thu, Aug 10, 2023 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

Done.

I don't think the addition of two new macros isTablesyncWorker() and
isLeaderApplyWorker() adds much value, so removed those and ran
pgindent. I am planning to commit this patch early next week unless
you or others have any comments.

--
With Regards,
Amit Kapila.

Attachments:

v6-0001-Simplify-determining-logical-replication-worker-t.patchapplication/octet-stream; name=v6-0001-Simplify-determining-logical-replication-worker-t.patchDownload
From 969055559fdfc93dddc51a9010f9741cd63b47c2 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 10 Aug 2023 11:39:12 +1000
Subject: [PATCH v7] Simplify determining logical replication worker types.

We deduce a LogicalRepWorker's type from the values of several different
fields ('relid' and 'leader_pid') whenever logic needs to know it.

In fact, the logical replication worker type is already known at the time
of launching the LogicalRepWorker and it never changes for the lifetime of
that process. Instead of deducing the type, it is simpler to just store it
one time, and access it directly thereafter.

Author: Peter Smith
Reviewed-by: Amit Kapila, Bharath Rupireddy
Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
---
 .../replication/logical/applyparallelworker.c |  3 ++-
 src/backend/replication/logical/launcher.c    | 27 +++++++++++++------
 src/backend/replication/logical/tablesync.c   |  3 ++-
 src/include/replication/worker_internal.h     | 21 +++++++++++----
 src/tools/pgindent/typedefs.list              |  1 +
 5 files changed, 40 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c4c1..4e8ee2973e 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
 		return NULL;
 	}
 
-	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+										MyLogicalRepWorker->dbid,
 										MySubscription->oid,
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7f95..7454c8ca27 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+						 Oid dbid, Oid subid, const char *subname, Oid userid,
 						 Oid relid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
-
-	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	bool		is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+
+	/*----------
+	 * Sanity checks:
+	 * - must be valid worker type
+	 * - tablesync workers are only ones to have relid
+	 * - parallel apply worker is the only kind of subworker
+	 */
+	Assert(wtype != WORKERTYPE_UNKNOWN);
+	Assert(is_tablesync_worker == OidIsValid(relid));
+	Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -427,6 +436,7 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type = wtype;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -466,7 +476,7 @@ retry:
 				 subid);
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
 	}
-	else if (OidIsValid(relid))
+	else if (is_tablesync_worker)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+				logicalrep_worker_launch(WORKERTYPE_APPLY,
+										 sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
 										 DSM_HANDLE_INVALID);
 			}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775065..67bdd14095 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a7117c0..d5c9de484e 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	WORKERTYPE_UNKNOWN = 0,
+	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_APPLY,
+	WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,18 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return (MyLogicalRepWorker->type == WORKERTYPE_TABLESYNC);
 }
 
 static inline bool
 am_leader_apply_worker(void)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
 }
 
 static inline bool
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc2a7..52a8789cc4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
2.39.1

#19Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#18)
2 attachment(s)
Re: Adding a LogicalRepWorker type field

On Fri, Aug 11, 2023 at 7:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 10, 2023 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

Done.

I don't think the addition of two new macros isTablesyncWorker() and
isLeaderApplyWorker() adds much value, so removed those and ran
pgindent. I am planning to commit this patch early next week unless
you or others have any comments.

Thanks for considering this patch fit for pushing.

Actually, I recently found 2 more overlooked places in the launcher.c
code which can benefit from using the isTablesyncWorker(w) macro that
was removed in patch v6-0001.

I have posted another v7. (v7-0001 is identical to v6-0001). The
v7-0002 patch has the isTablesyncWorker changes. I think wherever
possible it is better to check the worker-type via macro instead of
deducing it by fields like 'relid', and patch v7-0002 makes the code
more consistent with other nearby isParallelApplyWorker checks in
launcher.c

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v7-0002-Add-isTablesyncWorker.patchapplication/octet-stream; name=v7-0002-Add-isTablesyncWorker.patchDownload
From 372da0f445a2bc33d9c7ee5f53a56ab1d771858e Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 11 Aug 2023 19:59:21 +1000
Subject: [PATCH v7] Add isTablesyncWorker

---
 src/backend/replication/logical/launcher.c | 4 ++--
 src/include/replication/worker_internal.h  | 3 ++-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7454c8c..7cc0a16 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -857,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && OidIsValid(w->relid))
+		if (w->subid == subid && isTablesyncWorker(w))
 			res++;
 	}
 
@@ -1301,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		worker_pid = worker.proc->pid;
 
 		values[0] = ObjectIdGetDatum(worker.subid);
-		if (OidIsValid(worker.relid))
+		if (isTablesyncWorker(&worker))
 			values[1] = ObjectIdGetDatum(worker.relid);
 		else
 			nulls[1] = true;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index d5c9de4..a428663 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -328,11 +328,12 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
 #define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return (MyLogicalRepWorker->type == WORKERTYPE_TABLESYNC);
+	return isTablesyncWorker(MyLogicalRepWorker);
 }
 
 static inline bool
-- 
1.8.3.1

v7-0001-Simplify-determining-logical-replication-worker-t.patchapplication/octet-stream; name=v7-0001-Simplify-determining-logical-replication-worker-t.patchDownload
From e6cabe717637cf4095cfae85806e199a122935d3 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 11 Aug 2023 19:41:39 +1000
Subject: [PATCH v7] Simplify determining logical replication worker types.

We deduce a LogicalRepWorker's type from the values of several different
fields ('relid' and 'leader_pid') whenever logic needs to know it.

In fact, the logical replication worker type is already known at the time
of launching the LogicalRepWorker and it never changes for the lifetime of
that process. Instead of deducing the type, it is simpler to just store it
one time, and access it directly thereafter.

Author: Peter Smith
Reviewed-by: Amit Kapila, Bharath Rupireddy
Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
---
 .../replication/logical/applyparallelworker.c      |  3 ++-
 src/backend/replication/logical/launcher.c         | 27 +++++++++++++++-------
 src/backend/replication/logical/tablesync.c        |  3 ++-
 src/include/replication/worker_internal.h          | 21 +++++++++++++----
 src/tools/pgindent/typedefs.list                   |  1 +
 5 files changed, 40 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c..4e8ee29 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
 		return NULL;
 	}
 
-	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+										MyLogicalRepWorker->dbid,
 										MySubscription->oid,
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..7454c8c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+						 Oid dbid, Oid subid, const char *subname, Oid userid,
 						 Oid relid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
-
-	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	bool		is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+
+	/*----------
+	 * Sanity checks:
+	 * - must be valid worker type
+	 * - tablesync workers are only ones to have relid
+	 * - parallel apply worker is the only kind of subworker
+	 */
+	Assert(wtype != WORKERTYPE_UNKNOWN);
+	Assert(is_tablesync_worker == OidIsValid(relid));
+	Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -427,6 +436,7 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type = wtype;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -466,7 +476,7 @@ retry:
 				 subid);
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
 	}
-	else if (OidIsValid(relid))
+	else if (is_tablesync_worker)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+				logicalrep_worker_launch(WORKERTYPE_APPLY,
+										 sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
 										 DSM_HANDLE_INVALID);
 			}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775..67bdd14 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a711..d5c9de4 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	WORKERTYPE_UNKNOWN = 0,
+	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_APPLY,
+	WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,18 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return (MyLogicalRepWorker->type == WORKERTYPE_TABLESYNC);
 }
 
 static inline bool
 am_leader_apply_worker(void)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
 }
 
 static inline bool
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc..52a8789 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

#20Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#19)
Re: Adding a LogicalRepWorker type field

On Fri, Aug 11, 2023 at 3:41 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 11, 2023 at 7:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 10, 2023 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

Done.

I don't think the addition of two new macros isTablesyncWorker() and
isLeaderApplyWorker() adds much value, so removed those and ran
pgindent. I am planning to commit this patch early next week unless
you or others have any comments.

Thanks for considering this patch fit for pushing.

Actually, I recently found 2 more overlooked places in the launcher.c
code which can benefit from using the isTablesyncWorker(w) macro that
was removed in patch v6-0001.

@@ -1301,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;

  values[0] = ObjectIdGetDatum(worker.subid);
- if (OidIsValid(worker.relid))
+ if (isTablesyncWorker(&worker))
  values[1] = ObjectIdGetDatum(worker.relid);

I don't see this as a good fit for using isTablesyncWorker(). If we
were returning worker_type then using it would be okay.

--
With Regards,
Amit Kapila.

#21Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#20)
Re: Adding a LogicalRepWorker type field

On Fri, Aug 11, 2023 at 9:13 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Aug 11, 2023 at 3:41 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 11, 2023 at 7:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 10, 2023 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

Done.

I don't think the addition of two new macros isTablesyncWorker() and
isLeaderApplyWorker() adds much value, so removed those and ran
pgindent. I am planning to commit this patch early next week unless
you or others have any comments.

Thanks for considering this patch fit for pushing.

Actually, I recently found 2 more overlooked places in the launcher.c
code which can benefit from using the isTablesyncWorker(w) macro that
was removed in patch v6-0001.

@@ -1301,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;

values[0] = ObjectIdGetDatum(worker.subid);
- if (OidIsValid(worker.relid))
+ if (isTablesyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);

I don't see this as a good fit for using isTablesyncWorker(). If we
were returning worker_type then using it would be okay.

Yeah, I also wasn't very sure about that one, except it seems
analogous to the existing code immediately below it, where you could
say the same thing:
if (isParallelApplyWorker(&worker))
values[3] = Int32GetDatum(worker.leader_pid);

Whatever you think is best for that one is fine by me.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#22Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#21)
Re: Adding a LogicalRepWorker type field

On Sat, Aug 12, 2023 at 5:01 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 11, 2023 at 9:13 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Aug 11, 2023 at 3:41 PM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 11, 2023 at 7:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 10, 2023 at 7:50 AM Peter Smith <smithpb2250@gmail.com> wrote:

* If you do the above then there won't be a need to change the
variable name is_parallel_apply_worker in logicalrep_worker_launch.

Done.

I don't think the addition of two new macros isTablesyncWorker() and
isLeaderApplyWorker() adds much value, so removed those and ran
pgindent. I am planning to commit this patch early next week unless
you or others have any comments.

Thanks for considering this patch fit for pushing.

Actually, I recently found 2 more overlooked places in the launcher.c
code which can benefit from using the isTablesyncWorker(w) macro that
was removed in patch v6-0001.

@@ -1301,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;

values[0] = ObjectIdGetDatum(worker.subid);
- if (OidIsValid(worker.relid))
+ if (isTablesyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);

I don't see this as a good fit for using isTablesyncWorker(). If we
were returning worker_type then using it would be okay.

Yeah, I also wasn't very sure about that one, except it seems
analogous to the existing code immediately below it, where you could
say the same thing:
if (isParallelApplyWorker(&worker))
values[3] = Int32GetDatum(worker.leader_pid);

Fair point. I think it is better to keep the code consistent. So, I'll
merge your changes and push the patch early next week.

--
With Regards,
Amit Kapila.

#23Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#22)
1 attachment(s)
Re: Adding a LogicalRepWorker type field

The main patch for adding the worker type enum has been pushed [1]https://github.com/postgres/postgres/commit/2a8b40e3681921943a2989fd4ec6cdbf8766566c.

Here is the remaining (rebased) patch for changing some previous
cascading if/else to switch on the LogicalRepWorkerType enum instead.

PSA v8.

------
[1]: https://github.com/postgres/postgres/commit/2a8b40e3681921943a2989fd4ec6cdbf8766566c

Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v8-0001-Switch-on-worker-type.patchapplication/octet-stream; name=v8-0001-Switch-on-worker-type.patchDownload
From b0e09eec056e6ecaf9d7bea7b22af278a62ead61 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 14 Aug 2023 16:21:45 +1000
Subject: [PATCH v8] Switch on worker type.

---
 src/backend/replication/logical/launcher.c  | 56 +++++++++++++++--------------
 src/backend/replication/logical/tablesync.c | 31 ++++++++++------
 src/backend/replication/logical/worker.c    | 42 +++++++++++++---------
 3 files changed, 75 insertions(+), 54 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16..2119bf6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,39 +468,43 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	switch (worker->type)
 	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	}
-	else if (is_tablesync_worker)
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication tablesync worker for subscription %u sync %u",
-				 subid,
-				 relid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
-	}
-	else
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+		case WORKERTYPE_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+			memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication tablesync worker for subscription %u sync %u",
+					 subid,
+					 relid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("logicalrep_worker_launch: unknown worker type"));
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
-	if (is_parallel_apply_worker)
-		memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
-
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
 		/* Failed to start worker, so clean up the worker slot. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67bdd14..89d9cec 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (am_parallel_apply_worker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_PARALLEL_APPLY:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
 
-	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case WORKERTYPE_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
+
+		case WORKERTYPE_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("process_syncing_tables: Unknown worker type"));
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c1..f596366 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+		case WORKERTYPE_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+								MySubscription->name),
+						 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+			return rel->state == SUBREL_STATE_READY;
+
+		case WORKERTYPE_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));
+		}
 
-		return rel->state == SUBREL_STATE_READY;
-	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

#24Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#23)
Re: Adding a LogicalRepWorker type field

On Mon, Aug 14, 2023 at 12:08 PM Peter Smith <smithpb2250@gmail.com> wrote:

The main patch for adding the worker type enum has been pushed [1].

Here is the remaining (rebased) patch for changing some previous
cascading if/else to switch on the LogicalRepWorkerType enum instead.

I see this as being useful if we plan to add more worker types. Does
anyone else see this remaining patch as an improvement?

--
With Regards,
Amit Kapila.

#25shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#24)
Re: Adding a LogicalRepWorker type field

On Fri, Aug 18, 2023 at 8:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 14, 2023 at 12:08 PM Peter Smith <smithpb2250@gmail.com> wrote:

The main patch for adding the worker type enum has been pushed [1].

Here is the remaining (rebased) patch for changing some previous
cascading if/else to switch on the LogicalRepWorkerType enum instead.

I see this as being useful if we plan to add more worker types. Does
anyone else see this remaining patch as an improvement?

I feel it does give a tad bit more clarity for cases where we have
'else' part with no clear comments or relevant keywords. As an
example, in function 'should_apply_changes_for_rel' , we have:
else
return (rel->state == SUBREL_STATE_READY ||
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));

It is difficult to figure out which worker is this if I do not know
the concept completely; 'case WORKERTYPE_APPLY' makes it better for
the reader to understand.

thanks
Shveta

#26Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#24)
RE: Adding a LogicalRepWorker type field

On Friday, August 18, 2023 11:20 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 14, 2023 at 12:08 PM Peter Smith <smithpb2250@gmail.com>
wrote:

The main patch for adding the worker type enum has been pushed [1].

Here is the remaining (rebased) patch for changing some previous
cascading if/else to switch on the LogicalRepWorkerType enum instead.

I see this as being useful if we plan to add more worker types. Does anyone else
see this remaining patch as an improvement?

+1

I have one comment for the new error message.

+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));

I think reporting an ERROR in this case is fine. However, I would suggest
refraining from mentioning the function name in the error message, as
recommended in the error style document [1]https://www.postgresql.org/docs/devel/error-style-guide.html. Also, it appears we could use
elog() here.

[1]: https://www.postgresql.org/docs/devel/error-style-guide.html

Best Regards,
Hou zj

#27Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#26)
1 attachment(s)
Re: Adding a LogicalRepWorker type field

On Fri, Aug 18, 2023 at 6:16 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Friday, August 18, 2023 11:20 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 14, 2023 at 12:08 PM Peter Smith <smithpb2250@gmail.com>
wrote:

The main patch for adding the worker type enum has been pushed [1].

Here is the remaining (rebased) patch for changing some previous
cascading if/else to switch on the LogicalRepWorkerType enum instead.

I see this as being useful if we plan to add more worker types. Does anyone else
see this remaining patch as an improvement?

+1

I have one comment for the new error message.

+               case WORKERTYPE_UNKNOWN:
+                       ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));

I think reporting an ERROR in this case is fine. However, I would suggest
refraining from mentioning the function name in the error message, as
recommended in the error style document [1]. Also, it appears we could use
elog() here.

[1] https://www.postgresql.org/docs/devel/error-style-guide.html

OK. Modified as suggested. Anyway, getting these errors should not
even be possible, so I added a comment to emphasise that.

PSA v9

------
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachments:

v9-0001-Switch-on-worker-type.patchapplication/octet-stream; name=v9-0001-Switch-on-worker-type.patchDownload
From 7f0470e576bf4f5fa157f959432a1b38f7c66ae2 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 21 Aug 2023 09:58:32 +1000
Subject: [PATCH v9] Switch on worker type.

---
 src/backend/replication/logical/launcher.c  | 57 ++++++++++++++++-------------
 src/backend/replication/logical/tablesync.c | 32 ++++++++++------
 src/backend/replication/logical/worker.c    | 43 +++++++++++++---------
 3 files changed, 78 insertions(+), 54 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16..72e44d5 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,39 +468,44 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	switch (worker->type)
 	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	}
-	else if (is_tablesync_worker)
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication tablesync worker for subscription %u sync %u",
-				 subid,
-				 relid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
-	}
-	else
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+		case WORKERTYPE_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+			memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication tablesync worker for subscription %u sync %u",
+					 subid,
+					 relid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			/* Should never happen. */
+			elog(ERROR, "unknown worker type");
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
-	if (is_parallel_apply_worker)
-		memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
-
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
 		/* Failed to start worker, so clean up the worker slot. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67bdd14..54cf957 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (am_parallel_apply_worker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_PARALLEL_APPLY:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
 
-	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case WORKERTYPE_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
+
+		case WORKERTYPE_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			/* Should never happen. */
+			elog(ERROR, "Unknown worker type");
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c1..d6b594d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,34 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+		case WORKERTYPE_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+								MySubscription->name),
+						 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+			return rel->state == SUBREL_STATE_READY;
+
+		case WORKERTYPE_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case WORKERTYPE_UNKNOWN:
+			/* Should never happen. */
+			elog(ERROR, "Unknown worker type");
+		}
 
-		return rel->state == SUBREL_STATE_READY;
-	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

#28Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#27)
Re: Adding a LogicalRepWorker type field

On Mon, Aug 21, 2023 at 5:34 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Aug 18, 2023 at 6:16 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Friday, August 18, 2023 11:20 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 14, 2023 at 12:08 PM Peter Smith <smithpb2250@gmail.com>
wrote:

The main patch for adding the worker type enum has been pushed [1].

Here is the remaining (rebased) patch for changing some previous
cascading if/else to switch on the LogicalRepWorkerType enum instead.

I see this as being useful if we plan to add more worker types. Does anyone else
see this remaining patch as an improvement?

+1

I have one comment for the new error message.

+               case WORKERTYPE_UNKNOWN:
+                       ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));

I think reporting an ERROR in this case is fine. However, I would suggest
refraining from mentioning the function name in the error message, as
recommended in the error style document [1]. Also, it appears we could use
elog() here.

[1] https://www.postgresql.org/docs/devel/error-style-guide.html

OK. Modified as suggested. Anyway, getting these errors should not
even be possible, so I added a comment to emphasise that.

PSA v9

LGTM. I'll push this tomorrow unless there are any more comments.

--
With Regards,
Amit Kapila.

#29Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#28)
Re: Adding a LogicalRepWorker type field

On Mon, Aug 21, 2023 at 3:48 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 21, 2023 at 5:34 AM Peter Smith <smithpb2250@gmail.com> wrote:

PSA v9

LGTM. I'll push this tomorrow unless there are any more comments.

Pushed.

--
With Regards,
Amit Kapila.

#30Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#29)
Re: Adding a LogicalRepWorker type field

On Tue, Aug 22, 2023 at 6:24 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 21, 2023 at 3:48 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Mon, Aug 21, 2023 at 5:34 AM Peter Smith <smithpb2250@gmail.com>

wrote:

PSA v9

LGTM. I'll push this tomorrow unless there are any more comments.

Pushed.

Thanks for pushing this. The CF entry has been updated [1]. https://commitfest.postgresql.org/44/4472/

------
[1]: . https://commitfest.postgresql.org/44/4472/

Kind Regards,
Peter Smith.
Fujitsu Australia