From 782147f8f8a22ed7acb51484568c8ac69425ee6b Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 26 Sep 2024 10:01:36 +0900
Subject: [PATCH] Set query ID in parallel workers for vacuum, brin and btree

All these code paths use their own entry point when starting their
parallel workers, but fail to set their query IDs, even if they set a
text query.  Hence, this data would be missed in pg_stat_activity when
spawning their parallel workers.

Some tests are added to show how the failures can happen, which are able
to trigger the assertion failure in the TAP test 027_stream_regress.pl
where pg_stat_statements is loaded.
---
 src/backend/access/brin/brin.c            |  5 +++++
 src/backend/access/nbtree/nbtsort.c       |  5 +++++
 src/backend/commands/vacuumparallel.c     | 11 ++++++++---
 src/test/regress/expected/brin.out        | 10 ++++++++++
 src/test/regress/expected/btree_index.out | 10 ++++++++++
 src/test/regress/sql/brin.sql             | 11 +++++++++++
 src/test/regress/sql/btree_index.sql      | 11 +++++++++++
 7 files changed, 60 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 60853a0f6a..cb2f497da1 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -66,6 +66,7 @@ typedef struct BrinShared
 	bool		isconcurrent;
 	BlockNumber pagesPerRange;
 	int			scantuplesortstates;
+	uint64		queryid;
 
 	/*
 	 * workersdonecv is used to monitor the progress of workers.  All parallel
@@ -2448,6 +2449,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	brinshared->isconcurrent = isconcurrent;
 	brinshared->scantuplesortstates = scantuplesortstates;
 	brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+	brinshared->queryid = pgstat_get_my_query_id();
 	ConditionVariableInit(&brinshared->workersdonecv);
 	SpinLockInit(&brinshared->mutex);
 
@@ -2891,6 +2893,9 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 		indexLockmode = RowExclusiveLock;
 	}
 
+	/* Track query ID */
+	pgstat_report_query_id(brinshared->queryid, false);
+
 	/* Open relations within worker */
 	heapRel = table_open(brinshared->heaprelid, heapLockmode);
 	indexRel = index_open(brinshared->indexrelid, indexLockmode);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index f5d7b3b0c3..4f1a54cffe 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -104,6 +104,7 @@ typedef struct BTShared
 	bool		nulls_not_distinct;
 	bool		isconcurrent;
 	int			scantuplesortstates;
+	uint64		queryid;
 
 	/*
 	 * workersdonecv is used to monitor the progress of workers.  All parallel
@@ -1505,6 +1506,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->nulls_not_distinct = btspool->nulls_not_distinct;
 	btshared->isconcurrent = isconcurrent;
 	btshared->scantuplesortstates = scantuplesortstates;
+	btshared->queryid = pgstat_get_my_query_id();
 	ConditionVariableInit(&btshared->workersdonecv);
 	SpinLockInit(&btshared->mutex);
 	/* Initialize mutable state */
@@ -1787,6 +1789,9 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 		indexLockmode = RowExclusiveLock;
 	}
 
+	/* Track query ID */
+	pgstat_report_query_id(btshared->queryid, false);
+
 	/* Open relations within worker */
 	heapRel = table_open(btshared->heaprelid, heapLockmode);
 	indexRel = index_open(btshared->indexrelid, indexLockmode);
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 22c057fe61..4fd6574e12 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -57,12 +57,13 @@
 typedef struct PVShared
 {
 	/*
-	 * Target table relid and log level (for messages about parallel workers
-	 * launched during VACUUM VERBOSE).  These fields are not modified during
-	 * the parallel vacuum.
+	 * Target table relid, log level (for messages about parallel workers
+	 * launched during VACUUM VERBOSE) and query ID.  These fields are not
+	 * modified during the parallel vacuum.
 	 */
 	Oid			relid;
 	int			elevel;
+	uint64		queryid;
 
 	/*
 	 * Fields for both index vacuum and cleanup.
@@ -369,6 +370,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	MemSet(shared, 0, est_shared_len);
 	shared->relid = RelationGetRelid(rel);
 	shared->elevel = elevel;
+	shared->queryid = pgstat_get_my_query_id();
 	shared->maintenance_work_mem_worker =
 		(nindexes_mwm > 0) ?
 		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
@@ -1014,6 +1016,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	debug_query_string = sharedquery;
 	pgstat_report_activity(STATE_RUNNING, debug_query_string);
 
+	/* Track query ID */
+	pgstat_report_query_id(shared->queryid, false);
+
 	/*
 	 * Open table.  The lock mode is the same as the leader process.  It's
 	 * okay because the lock mode does not conflict among the parallel
diff --git a/src/test/regress/expected/brin.out b/src/test/regress/expected/brin.out
index d6779d8c7d..e1db2280cf 100644
--- a/src/test/regress/expected/brin.out
+++ b/src/test/regress/expected/brin.out
@@ -567,6 +567,16 @@ SELECT * FROM brintest_3 WHERE b < '0';
 
 DROP TABLE brintest_3;
 RESET enable_seqscan;
+-- test parallel build with immutable function.
+CREATE TABLE brintest_expr (n int);
+CREATE FUNCTION brintest_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX brintest_expr_idx ON brintest_expr USING brin (brintest_func());
+COMMIT;
+DROP TABLE brintest_expr;
+DROP FUNCTION brintest_func();
 -- test an unlogged table, mostly to get coverage of brinbuildempty
 CREATE UNLOGGED TABLE brintest_unlogged (n numrange);
 CREATE INDEX brinidx_unlogged ON brintest_unlogged USING brin (n);
diff --git a/src/test/regress/expected/btree_index.out b/src/test/regress/expected/btree_index.out
index b350efe128..d3f4c7e08c 100644
--- a/src/test/regress/expected/btree_index.out
+++ b/src/test/regress/expected/btree_index.out
@@ -476,6 +476,16 @@ INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1,1000) i;
 -- Test unsupported btree opclass parameters
 create index on btree_tall_tbl (id int4_ops(foo=1));
 ERROR:  operator class int4_ops has no options
+-- test parallel build with immutable function.
+CREATE TABLE btree_test_expr (n int);
+CREATE FUNCTION btree_test_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX btree_test_expr_idx ON btree_test_expr USING btree (btree_test_func());
+COMMIT;
+DROP TABLE btree_test_expr;
+DROP FUNCTION btree_test_func();
 -- Test case of ALTER INDEX with abuse of column names for indexes.
 -- This grammar is not officially supported, but the parser allows it.
 CREATE INDEX btree_tall_idx2 ON btree_tall_tbl (id);
diff --git a/src/test/regress/sql/brin.sql b/src/test/regress/sql/brin.sql
index 695cfad4be..7ea97f47c8 100644
--- a/src/test/regress/sql/brin.sql
+++ b/src/test/regress/sql/brin.sql
@@ -510,6 +510,17 @@ SELECT * FROM brintest_3 WHERE b < '0';
 DROP TABLE brintest_3;
 RESET enable_seqscan;
 
+-- test parallel build with immutable function.
+CREATE TABLE brintest_expr (n int);
+CREATE FUNCTION brintest_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX brintest_expr_idx ON brintest_expr USING brin (brintest_func());
+COMMIT;
+DROP TABLE brintest_expr;
+DROP FUNCTION brintest_func();
+
 -- test an unlogged table, mostly to get coverage of brinbuildempty
 CREATE UNLOGGED TABLE brintest_unlogged (n numrange);
 CREATE INDEX brinidx_unlogged ON brintest_unlogged USING brin (n);
diff --git a/src/test/regress/sql/btree_index.sql b/src/test/regress/sql/btree_index.sql
index 0d2a33f370..2c3b135292 100644
--- a/src/test/regress/sql/btree_index.sql
+++ b/src/test/regress/sql/btree_index.sql
@@ -272,6 +272,17 @@ INSERT INTO delete_test_table SELECT i, 1, 2, 3 FROM generate_series(1,1000) i;
 -- Test unsupported btree opclass parameters
 create index on btree_tall_tbl (id int4_ops(foo=1));
 
+-- test parallel build with immutable function.
+CREATE TABLE btree_test_expr (n int);
+CREATE FUNCTION btree_test_func() RETURNS int LANGUAGE sql IMMUTABLE RETURN 0;
+BEGIN;
+SET LOCAL min_parallel_table_scan_size = 0;
+SET LOCAL max_parallel_maintenance_workers = 4;
+CREATE INDEX btree_test_expr_idx ON btree_test_expr USING btree (btree_test_func());
+COMMIT;
+DROP TABLE btree_test_expr;
+DROP FUNCTION btree_test_func();
+
 -- Test case of ALTER INDEX with abuse of column names for indexes.
 -- This grammar is not officially supported, but the parser allows it.
 CREATE INDEX btree_tall_idx2 ON btree_tall_tbl (id);
-- 
2.45.2

