Fix BUG #17335: Duplicate result rows in Gather node
Good day, hackers.
Problem:
- Append path is created with explicitely parallel_aware = true
- It has two child, one is trivial, other is parallel_aware = false .
Trivial child is dropped.
- Gather/GatherMerge path takes Append path as a child and thinks
its child is parallel_aware = true.
- But Append path is removed at the last since it has only one child.
- Now Gather/GatherMerge thinks its child is parallel_aware, but it
is not.
Gather/GatherMerge runs its child twice: in a worker and in a leader,
and gathers same rows twice.
Reproduction code attached (repro.sql. Included as a test as well).
Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.
Bug were introduced with commit 8edd0e79460b414b1d971895312e549e95e12e4f
"Suppress Append and MergeAppend plan nodes that have a single child."
During discussion, it were supposed [1]/messages/by-id/17500.1551669976@sss.pgh.pa.us those fields should be copied:
I haven't looked into whether this does the right things for parallel
planning --- possibly create_[merge]append_path need to propagate up
parallel-related path fields from the single child?
But it were not so obvious [2]/messages/by-id/CAKJS1f_Wt_tL3S32R3wpU86zQjuHfbnZbFt0eqm=qcRFcdbLvw@mail.gmail.com.
Better fix could contain removing Gather/GatherMerge node as well if
its child is not parallel aware.
Bug is reported in /messages/by-id/flat/17335-4dc92e1aea3a78af@postgresql.org
Since no way to add thread from pgsql-bugs to commitfest, I write here.
[1]: /messages/by-id/17500.1551669976@sss.pgh.pa.us
[2]: /messages/by-id/CAKJS1f_Wt_tL3S32R3wpU86zQjuHfbnZbFt0eqm=qcRFcdbLvw@mail.gmail.com
----
regards
Yura Sokolov
y.sokolov@postgrespro.ru
funny.falcon@gmail.com
Attachments:
v1-0001-Quick-fix-to-duplicate-result-rows-after-Append-p.patchtext/x-patch; charset=UTF-8; name=v1-0001-Quick-fix-to-duplicate-result-rows-after-Append-p.patchDownload
From 47c6e161de4fc9d2d6eff45f427ebf49b4c9d11c Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.sokolov@postgrespro.ru>
Date: Mon, 20 Dec 2021 11:48:10 +0300
Subject: [PATCH v1] Quick fix to duplicate result rows after Append path
removal.
It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.
With this fix Append path copies its single child parallel_aware / cost /
workers values.
Copied `num_workers == 0` triggers assert `num_workers > 0` in
cost_gather_merge function. But changing this assert to `num_workers >= 0`
doesn't lead to any runtime and/or logical error.
Fixes bug 17335
https://postgr.es/m/flat/17335-4dc92e1aea3a78af%40postgresql.org
---
src/backend/optimizer/path/costsize.c | 2 +-
src/backend/optimizer/util/pathnode.c | 3 +
.../expected/gather_removed_append.out | 131 ++++++++++++++++++
src/test/regress/parallel_schedule | 1 +
.../regress/sql/gather_removed_append.sql | 82 +++++++++++
5 files changed, 218 insertions(+), 1 deletion(-)
create mode 100644 src/test/regress/expected/gather_removed_append.out
create mode 100644 src/test/regress/sql/gather_removed_append.sql
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 1e4d404f024..9949c3ab555 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -440,7 +440,7 @@ cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
* be overgenerous since the leader will do less work than other workers
* in typical cases, but we'll go with it for now.
*/
- Assert(path->num_workers > 0);
+ Assert(path->num_workers >= 0);
N = (double) path->num_workers + 1;
logN = LOG2(N);
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index af5e8df26b4..2ff4678937a 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1340,6 +1340,9 @@ create_append_path(PlannerInfo *root,
pathnode->path.startup_cost = child->startup_cost;
pathnode->path.total_cost = child->total_cost;
pathnode->path.pathkeys = child->pathkeys;
+ pathnode->path.parallel_aware = child->parallel_aware;
+ pathnode->path.parallel_safe = child->parallel_safe;
+ pathnode->path.parallel_workers = child->parallel_workers;
}
else
cost_append(pathnode);
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..f6e861ce59d
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,131 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE: table "gather_append_1" does not exist, skipping
+NOTICE: table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------------
+ Finalize Aggregate (actual rows=1 loops=1)
+ -> Gather (actual rows=1 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ Single Copy: true
+ -> Partial Aggregate (actual rows=1 loops=1)
+ -> Parallel Hash Left Join (actual rows=200 loops=1)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=10000 loops=1)
+ Buckets: 16384 Batches: 1 Memory Usage: 544kB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 0
+ Workers Launched: 0
+ -> Sort (actual rows=200 loops=1)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=200 loops=1)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=10000 loops=1)
+ Buckets: 16384 Batches: 1 Memory Usage: 519kB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 0
+ Workers Launched: 0
+ -> Sort (actual rows=200 loops=1)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=200 loops=1)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=10000 loops=1)
+ Buckets: 16384 Batches: 1 Memory Usage: 519kB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1)
+(13 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
test: select_parallel
test: write_parallel
test: vacuum_parallel
+test: gather_removed_append
# no relation related tests can be put in this group
test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..df1b796a4f6
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,82 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
--
2.34.1
On Thu, Dec 30, 2021 at 4:44 PM Yura Sokolov <y.sokolov@postgrespro.ru>
wrote:
Good day, hackers.
Problem:
- Append path is created with explicitely parallel_aware = true
- It has two child, one is trivial, other is parallel_aware = false .
Trivial child is dropped.
- Gather/GatherMerge path takes Append path as a child and thinks
its child is parallel_aware = true.
- But Append path is removed at the last since it has only one child.
- Now Gather/GatherMerge thinks its child is parallel_aware, but it
is not.
Gather/GatherMerge runs its child twice: in a worker and in a leader,
and gathers same rows twice.Reproduction code attached (repro.sql. Included as a test as well).
Yeah, this is a problem.
Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.Bug were introduced with commit 8edd0e79460b414b1d971895312e549e95e12e4f
"Suppress Append and MergeAppend plan nodes that have a single child."During discussion, it were supposed [1] those fields should be copied:
I haven't looked into whether this does the right things for parallel
planning --- possibly create_[merge]append_path need to propagate up
parallel-related path fields from the single child?But it were not so obvious [2].
Better fix could contain removing Gather/GatherMerge node as well if
its child is not parallel aware.
The Gather path will only be created if we have an underlying partial path,
so I think if we are generating the append path only from the non-partial
paths then we can see if the number of child nodes is just 1 then don't
generate the partial append path, so from that you will node generate the
partial join and eventually gather will be avoided.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
Problem:
- Append path is created with explicitely parallel_aware = true
- It has two child, one is trivial, other is parallel_aware = false .
Trivial child is dropped.
- Gather/GatherMerge path takes Append path as a child and thinks
its child is parallel_aware = true.
- But Append path is removed at the last since it has only one child.
- Now Gather/GatherMerge thinks its child is parallel_aware, but it
is not.
Gather/GatherMerge runs its child twice: in a worker and in a leader,
and gathers same rows twice.
Thanks for the report. I can confirm that I can recreate the problem
with your script.
I will look into this further later next week.
David
В Сб, 01/01/2022 в 15:19 +1300, David Rowley пишет:
On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
Problem:
- Append path is created with explicitely parallel_aware = true
- It has two child, one is trivial, other is parallel_aware = false .
Trivial child is dropped.
- Gather/GatherMerge path takes Append path as a child and thinks
its child is parallel_aware = true.
- But Append path is removed at the last since it has only one child.
- Now Gather/GatherMerge thinks its child is parallel_aware, but it
is not.
Gather/GatherMerge runs its child twice: in a worker and in a leader,
and gathers same rows twice.Thanks for the report. I can confirm that I can recreate the problem
with your script.I will look into this further later next week.
Good day, David.
Excuse me for disturbing.
Any update on this?
Any chance to be fixed in next minor release?
Could this simple fix be merged before further improvements?
Yura.
On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.
I've been looking at this and I've gone through changing my mind about
what's the right fix quite a number of times.
My current thoughts are that I don't really like the fact that we can
have plans in the following shape:
Finalize Aggregate
-> Gather
Workers Planned: 1
-> Partial Aggregate
-> Parallel Hash Left Join
Hash Cond: (gather_append_1.fk = gather_append_2.fk)
-> Index Scan using gather_append_1_ix on gather_append_1
Index Cond: (f = true)
-> Parallel Hash
-> Parallel Seq Scan on gather_append_2
It's only made safe by the fact that Gather will only use 1 worker.
To me, it just seems too fragile to assume that's always going to be
the case. I feel like this fix just relies on the fact that
create_gather_path() and create_gather_merge_path() do
"pathnode->num_workers = subpath->parallel_workers;". If someone
decided that was to work a different way, then we risk this breaking
again. Additionally, today we have Gather and GatherMerge, but we may
one day end up with more node types that gather results from parallel
workers, or even a completely different way of executing plans.
I think a safer way to fix this is to just not remove the
Append/MergeAppend node if the parallel_aware flag of the only-child
and the Append/MergeAppend don't match. I've done that in the
attached.
I believe the code at the end of add_paths_to_append_rel() can remain as is.
David
Attachments:
dont_remove_singlechild_appends_with_mismatching_parallel_awareness.patchapplication/octet-stream; name=dont_remove_singlechild_appends_with_mismatching_parallel_awareness.patchDownload
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index e44ae971b4..a7b11b7f03 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the Append and return that child */
- if (list_length(aplan->appendplans) == 1)
+ /*
+ * See if it's safe to get rid of the Append entirely. For this to be
+ * safe, there must be only one child plan and that child plan's parallel
+ * awareness must match that of the Append's. The reason for the latter
+ * is that the if the Append is parallel aware and the child is not then
+ * the calling plan may execute the non-parallel aware child multiple
+ * times.
+ */
+ if (list_length(aplan->appendplans) == 1 &&
+ ((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) aplan,
(Plan *) linitial(aplan->appendplans));
@@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the MergeAppend and return that child */
- if (list_length(mplan->mergeplans) == 1)
+ /*
+ * See if it's safe to get rid of the MergeAppend entirely. For this to
+ * be safe, there must be only one child plan and that child plan's
+ * parallel awareness must match that of the MergeAppend's. The reason
+ * for the latter is that the if the MergeAppend is parallel aware and the
+ * child is not then the calling plan may execute the non-parallel aware
+ * child multiple times.
+ */
+ if (list_length(mplan->mergeplans) == 1 &&
+ ((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) mplan,
(Plan *) linitial(mplan->mergeplans));
В Чт, 20/01/2022 в 09:32 +1300, David Rowley пишет:
On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.I've been looking at this and I've gone through changing my mind about
what's the right fix quite a number of times.My current thoughts are that I don't really like the fact that we can
have plans in the following shape:Finalize Aggregate
-> Gather
Workers Planned: 1
-> Partial Aggregate
-> Parallel Hash Left Join
Hash Cond: (gather_append_1.fk = gather_append_2.fk)
-> Index Scan using gather_append_1_ix on gather_append_1
Index Cond: (f = true)
-> Parallel Hash
-> Parallel Seq Scan on gather_append_2It's only made safe by the fact that Gather will only use 1 worker.
To me, it just seems too fragile to assume that's always going to be
the case. I feel like this fix just relies on the fact that
create_gather_path() and create_gather_merge_path() do
"pathnode->num_workers = subpath->parallel_workers;". If someone
decided that was to work a different way, then we risk this breaking
again. Additionally, today we have Gather and GatherMerge, but we may
one day end up with more node types that gather results from parallel
workers, or even a completely different way of executing plans.
It seems strange parallel_aware and parallel_safe flags neither affect
execution nor are properly checked.
Except parallel_safe is checked in ExecSerializePlan which is called from
ExecInitParallelPlan, which is called from ExecGather and ExecGatherMerge.
But looks like this check doesn't affect execution as well.
I think a safer way to fix this is to just not remove the
Append/MergeAppend node if the parallel_aware flag of the only-child
and the Append/MergeAppend don't match. I've done that in the
attached.I believe the code at the end of add_paths_to_append_rel() can remain as is.
I found clean_up_removed_plan_level also called from set_subqueryscan_references.
Is there a need to patch there as well?
And there is strange state:
- in the loop by subpaths, pathnode->node.parallel_safe is set to AND of
all its subpath's parallel_safe
(therefore there were need to copy it in my patch version),
- that means, our AppendPath is parallel_aware but not parallel_safe.
It is ridiculous a bit.
And it is strange AppendPath could have more parallel_workers than sum of
its children parallel_workers.
So it looks like whole machinery around parallel_aware/parallel_safe has
no enough consistency.
Either way, I attach you version of fix with my tests as new patch version.
regards,
Yura Sokolov
Attachments:
v2-0001-Fix-duplicate-result-rows-after-Append-path-remov.patchtext/x-patch; charset=UTF-8; name=v2-0001-Fix-duplicate-result-rows-after-Append-path-remov.patchDownload
From 359df37ae76170a4621cafd3ad8b318473c94a46 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.sokolov@postgrespro.ru>
Date: Sun, 23 Jan 2022 14:53:21 +0300
Subject: [PATCH v2] Fix duplicate result rows after Append path removal.
It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.
To fix it don't remove Append/MergeAppend node if it's parallel_aware !=
single child parallel_aware.
Authors: David Rowley, Sokolov Yura.
---
src/backend/optimizer/plan/setrefs.c | 24 +++-
.../expected/gather_removed_append.out | 135 ++++++++++++++++++
src/test/regress/parallel_schedule | 1 +
.../regress/sql/gather_removed_append.sql | 82 +++++++++++
4 files changed, 238 insertions(+), 4 deletions(-)
create mode 100644 src/test/regress/expected/gather_removed_append.out
create mode 100644 src/test/regress/sql/gather_removed_append.sql
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index e44ae971b4b..a7b11b7f03a 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the Append and return that child */
- if (list_length(aplan->appendplans) == 1)
+ /*
+ * See if it's safe to get rid of the Append entirely. For this to be
+ * safe, there must be only one child plan and that child plan's parallel
+ * awareness must match that of the Append's. The reason for the latter
+ * is that the if the Append is parallel aware and the child is not then
+ * the calling plan may execute the non-parallel aware child multiple
+ * times.
+ */
+ if (list_length(aplan->appendplans) == 1 &&
+ ((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) aplan,
(Plan *) linitial(aplan->appendplans));
@@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the MergeAppend and return that child */
- if (list_length(mplan->mergeplans) == 1)
+ /*
+ * See if it's safe to get rid of the MergeAppend entirely. For this to
+ * be safe, there must be only one child plan and that child plan's
+ * parallel awareness must match that of the MergeAppend's. The reason
+ * for the latter is that the if the MergeAppend is parallel aware and the
+ * child is not then the calling plan may execute the non-parallel aware
+ * child multiple times.
+ */
+ if (list_length(mplan->mergeplans) == 1 &&
+ ((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) mplan,
(Plan *) linitial(mplan->mergeplans));
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..1c2d40d7c76
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,135 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE: table "gather_append_1" does not exist, skipping
+NOTICE: table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------------------
+ Finalize Aggregate (actual rows=1 loops=1)
+ -> Gather (actual rows=2 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Partial Aggregate (actual rows=1 loops=2)
+ -> Parallel Hash Left Join (actual rows=100 loops=2)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=20 loops=2)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=5000 loops=2)
+ Buckets: 16384 Batches: 1 Memory Usage: 544kB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Sort (actual rows=100 loops=2)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ Worker 0: Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=100 loops=2)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=20 loops=2)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=5000 loops=2)
+ Buckets: 16384 Batches: 1 Memory Usage: 576kB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(15 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+ QUERY PLAN
+---------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Sort (actual rows=100 loops=2)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ Worker 0: Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=100 loops=2)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=20 loops=2)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=5000 loops=2)
+ Buckets: 16384 Batches: 1 Memory Usage: 544kB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(15 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
test: select_parallel
test: write_parallel
test: vacuum_parallel
+test: gather_removed_append
# no relation related tests can be put in this group
test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..df1b796a4f6
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,82 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
--
2.34.1
В Вс, 23/01/2022 в 14:56 +0300, Yura Sokolov пишет:
В Чт, 20/01/2022 в 09:32 +1300, David Rowley пишет:
On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.I've been looking at this and I've gone through changing my mind about
what's the right fix quite a number of times.My current thoughts are that I don't really like the fact that we can
have plans in the following shape:Finalize Aggregate
-> Gather
Workers Planned: 1
-> Partial Aggregate
-> Parallel Hash Left Join
Hash Cond: (gather_append_1.fk = gather_append_2.fk)
-> Index Scan using gather_append_1_ix on gather_append_1
Index Cond: (f = true)
-> Parallel Hash
-> Parallel Seq Scan on gather_append_2It's only made safe by the fact that Gather will only use 1 worker.
To me, it just seems too fragile to assume that's always going to be
the case. I feel like this fix just relies on the fact that
create_gather_path() and create_gather_merge_path() do
"pathnode->num_workers = subpath->parallel_workers;". If someone
decided that was to work a different way, then we risk this breaking
again. Additionally, today we have Gather and GatherMerge, but we may
one day end up with more node types that gather results from parallel
workers, or even a completely different way of executing plans.It seems strange parallel_aware and parallel_safe flags neither affect
execution nor are properly checked.Except parallel_safe is checked in ExecSerializePlan which is called from
ExecInitParallelPlan, which is called from ExecGather and ExecGatherMerge.
But looks like this check doesn't affect execution as well.I think a safer way to fix this is to just not remove the
Append/MergeAppend node if the parallel_aware flag of the only-child
and the Append/MergeAppend don't match. I've done that in the
attached.I believe the code at the end of add_paths_to_append_rel() can remain as is.
I found clean_up_removed_plan_level also called from set_subqueryscan_references.
Is there a need to patch there as well?And there is strange state:
- in the loop by subpaths, pathnode->node.parallel_safe is set to AND of
all its subpath's parallel_safe
(therefore there were need to copy it in my patch version),
- that means, our AppendPath is parallel_aware but not parallel_safe.
It is ridiculous a bit.And it is strange AppendPath could have more parallel_workers than sum of
its children parallel_workers.So it looks like whole machinery around parallel_aware/parallel_safe has
no enough consistency.Either way, I attach you version of fix with my tests as new patch version.
Looks like volatile "Memory Usage:" in EXPLAIN brokes 'make check'
sporadically.
Applied replacement in style of memoize.sql test.
Why there is no way to disable "Buckets: %d Buffers: %d Memory Usage: %dkB"
output in show_hash_info?
regards,
Yura Sokolov
Attachments:
v3-0001-Fix-duplicate-result-rows-after-Append-path-remov.patchtext/x-patch; charset=UTF-8; name=v3-0001-Fix-duplicate-result-rows-after-Append-path-remov.patchDownload
From 9ed2139495b2026433ff5e7c4092fcfa8f10e4d1 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.sokolov@postgrespro.ru>
Date: Sun, 23 Jan 2022 14:53:21 +0300
Subject: [PATCH v3] Fix duplicate result rows after Append path removal.
It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.
To fix it don't remove Append/MergeAppend node if it's parallel_aware !=
single child parallel_aware.
Authors: David Rowley, Sokolov Yura.
---
src/backend/optimizer/plan/setrefs.c | 24 ++-
.../expected/gather_removed_append.out | 154 ++++++++++++++++++
src/test/regress/parallel_schedule | 1 +
.../regress/sql/gather_removed_append.sql | 102 ++++++++++++
4 files changed, 277 insertions(+), 4 deletions(-)
create mode 100644 src/test/regress/expected/gather_removed_append.out
create mode 100644 src/test/regress/sql/gather_removed_append.sql
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index e44ae971b4b..a7b11b7f03a 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the Append and return that child */
- if (list_length(aplan->appendplans) == 1)
+ /*
+ * See if it's safe to get rid of the Append entirely. For this to be
+ * safe, there must be only one child plan and that child plan's parallel
+ * awareness must match that of the Append's. The reason for the latter
+ * is that the if the Append is parallel aware and the child is not then
+ * the calling plan may execute the non-parallel aware child multiple
+ * times.
+ */
+ if (list_length(aplan->appendplans) == 1 &&
+ ((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) aplan,
(Plan *) linitial(aplan->appendplans));
@@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the MergeAppend and return that child */
- if (list_length(mplan->mergeplans) == 1)
+ /*
+ * See if it's safe to get rid of the MergeAppend entirely. For this to
+ * be safe, there must be only one child plan and that child plan's
+ * parallel awareness must match that of the MergeAppend's. The reason
+ * for the latter is that the if the MergeAppend is parallel aware and the
+ * child is not then the calling plan may execute the non-parallel aware
+ * child multiple times.
+ */
+ if (list_length(mplan->mergeplans) == 1 &&
+ ((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) mplan,
(Plan *) linitial(mplan->mergeplans));
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..1676b00d9dc
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,154 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE: table "gather_append_1" does not exist, skipping
+NOTICE: table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+-- The buckets/batches/memory values from the Parallel Hash node can vary between
+-- machines. Let's just replace the number with an 'N'.
+create function explain_gather(query text) returns setof text
+language plpgsql as
+$$
+declare
+ ln text;
+begin
+ for ln in
+ execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s',
+ query)
+ loop
+ ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N');
+ ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N');
+ ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N');
+ return next ln;
+ end loop;
+end;
+$$;
+SELECT explain_gather('
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);');
+ explain_gather
+---------------------------------------------------------------------------------------------------------------
+ Finalize Aggregate (actual rows=1 loops=1)
+ -> Gather (actual rows=2 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Partial Aggregate (actual rows=1 loops=2)
+ -> Parallel Hash Left Join (actual rows=100 loops=2)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=20 loops=2)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=5000 loops=2)
+ Buckets: N Batches: N Memory Usage: NkB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
+ explain_gather
+---------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Sort (actual rows=100 loops=2)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ Worker 0: Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=100 loops=2)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=20 loops=2)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=5000 loops=2)
+ Buckets: N Batches: N Memory Usage: NkB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(15 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
+ explain_gather
+---------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Sort (actual rows=100 loops=2)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ Worker 0: Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=100 loops=2)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=20 loops=2)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=5000 loops=2)
+ Buckets: N Batches: N Memory Usage: NkB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(15 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
test: select_parallel
test: write_parallel
test: vacuum_parallel
+test: gather_removed_append
# no relation related tests can be put in this group
test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..8c0374a8f8b
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,102 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- The buckets/batches/memory values from the Parallel Hash node can vary between
+-- machines. Let's just replace the number with an 'N'.
+create function explain_gather(query text) returns setof text
+language plpgsql as
+$$
+declare
+ ln text;
+begin
+ for ln in
+ execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s',
+ query)
+ loop
+ ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N');
+ ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N');
+ ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N');
+ return next ln;
+ end loop;
+end;
+$$;
+
+SELECT explain_gather('
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);');
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
--
2.34.1
В Пн, 24/01/2022 в 16:24 +0300, Yura Sokolov пишет:
В Вс, 23/01/2022 в 14:56 +0300, Yura Sokolov пишет:
В Чт, 20/01/2022 в 09:32 +1300, David Rowley пишет:
On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
Suggested quick (and valid) fix in the patch attached:
- If Append has single child, then copy its parallel awareness.I've been looking at this and I've gone through changing my mind about
what's the right fix quite a number of times.My current thoughts are that I don't really like the fact that we can
have plans in the following shape:Finalize Aggregate
-> Gather
Workers Planned: 1
-> Partial Aggregate
-> Parallel Hash Left Join
Hash Cond: (gather_append_1.fk = gather_append_2.fk)
-> Index Scan using gather_append_1_ix on gather_append_1
Index Cond: (f = true)
-> Parallel Hash
-> Parallel Seq Scan on gather_append_2It's only made safe by the fact that Gather will only use 1 worker.
To me, it just seems too fragile to assume that's always going to be
the case. I feel like this fix just relies on the fact that
create_gather_path() and create_gather_merge_path() do
"pathnode->num_workers = subpath->parallel_workers;". If someone
decided that was to work a different way, then we risk this breaking
again. Additionally, today we have Gather and GatherMerge, but we may
one day end up with more node types that gather results from parallel
workers, or even a completely different way of executing plans.It seems strange parallel_aware and parallel_safe flags neither affect
execution nor are properly checked.Except parallel_safe is checked in ExecSerializePlan which is called from
ExecInitParallelPlan, which is called from ExecGather and ExecGatherMerge.
But looks like this check doesn't affect execution as well.I think a safer way to fix this is to just not remove the
Append/MergeAppend node if the parallel_aware flag of the only-child
and the Append/MergeAppend don't match. I've done that in the
attached.I believe the code at the end of add_paths_to_append_rel() can remain as is.
I found clean_up_removed_plan_level also called from set_subqueryscan_references.
Is there a need to patch there as well?And there is strange state:
- in the loop by subpaths, pathnode->node.parallel_safe is set to AND of
all its subpath's parallel_safe
(therefore there were need to copy it in my patch version),
- that means, our AppendPath is parallel_aware but not parallel_safe.
It is ridiculous a bit.And it is strange AppendPath could have more parallel_workers than sum of
its children parallel_workers.So it looks like whole machinery around parallel_aware/parallel_safe has
no enough consistency.Either way, I attach you version of fix with my tests as new patch version.
Looks like volatile "Memory Usage:" in EXPLAIN brokes 'make check'
sporadically.Applied replacement in style of memoize.sql test.
Why there is no way to disable "Buckets: %d Buffers: %d Memory Usage: %dkB"
output in show_hash_info?
And another attempt to fix tests volatility.
Attachments:
v4-0001-Fix-duplicate-result-rows-after-Append-path-remov.patchtext/x-patch; charset=UTF-8; name=v4-0001-Fix-duplicate-result-rows-after-Append-path-remov.patchDownload
From fb09491a401f0df828faf6088158f431b2a69381 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.sokolov@postgrespro.ru>
Date: Sun, 23 Jan 2022 14:53:21 +0300
Subject: [PATCH v4] Fix duplicate result rows after Append path removal.
It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.
To fix it don't remove Append/MergeAppend node if it's parallel_aware !=
single child parallel_aware.
Authors: David Rowley, Sokolov Yura.
---
src/backend/optimizer/plan/setrefs.c | 24 +++-
.../expected/gather_removed_append.out | 126 ++++++++++++++++++
src/test/regress/parallel_schedule | 1 +
.../regress/sql/gather_removed_append.sql | 94 +++++++++++++
4 files changed, 241 insertions(+), 4 deletions(-)
create mode 100644 src/test/regress/expected/gather_removed_append.out
create mode 100644 src/test/regress/sql/gather_removed_append.sql
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index e44ae971b4b..a7b11b7f03a 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the Append and return that child */
- if (list_length(aplan->appendplans) == 1)
+ /*
+ * See if it's safe to get rid of the Append entirely. For this to be
+ * safe, there must be only one child plan and that child plan's parallel
+ * awareness must match that of the Append's. The reason for the latter
+ * is that the if the Append is parallel aware and the child is not then
+ * the calling plan may execute the non-parallel aware child multiple
+ * times.
+ */
+ if (list_length(aplan->appendplans) == 1 &&
+ ((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) aplan,
(Plan *) linitial(aplan->appendplans));
@@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root,
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
- /* Now, if there's just one, forget the MergeAppend and return that child */
- if (list_length(mplan->mergeplans) == 1)
+ /*
+ * See if it's safe to get rid of the MergeAppend entirely. For this to
+ * be safe, there must be only one child plan and that child plan's
+ * parallel awareness must match that of the MergeAppend's. The reason
+ * for the latter is that the if the MergeAppend is parallel aware and the
+ * child is not then the calling plan may execute the non-parallel aware
+ * child multiple times.
+ */
+ if (list_length(mplan->mergeplans) == 1 &&
+ ((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware)
return clean_up_removed_plan_level((Plan *) mplan,
(Plan *) linitial(mplan->mergeplans));
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..849cf0ae97e
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,126 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE: table "gather_append_1" does not exist, skipping
+NOTICE: table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count
+-------
+ 200
+(1 row)
+
+-- The buckets/batches/memory values from the Parallel Hash node can vary between
+-- machines. Let's just replace the number with an 'N'.
+create function explain_gather(query text) returns setof text
+language plpgsql as
+$$
+declare
+ ln text;
+begin
+ for ln in
+ execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s',
+ query)
+ loop
+ if not ln like '%Gather%' then
+ ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=R loops=L');
+ end if;
+ ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N');
+ ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N');
+ ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N');
+ return next ln;
+ end loop;
+end;
+$$;
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);');
+ explain_gather
+--------------------------------------------------------------------------------------------------
+ Gather (actual rows=200 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Parallel Hash Left Join (actual rows=R loops=L)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=R loops=L)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=R loops=L)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=R loops=L)
+ Buckets: N Batches: N Memory Usage: NkB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=R loops=L)
+(11 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
+ explain_gather
+--------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+ Workers Planned: 1
+ Workers Launched: 1
+ -> Sort (actual rows=R loops=L)
+ Sort Key: gather_append_2.val
+ Sort Method: quicksort Memory: 25kB
+ Worker 0: Sort Method: quicksort Memory: 25kB
+ -> Parallel Hash Left Join (actual rows=R loops=L)
+ Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+ -> Parallel Append (actual rows=R loops=L)
+ -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=R loops=L)
+ Index Cond: (f = true)
+ -> Parallel Hash (actual rows=R loops=L)
+ Buckets: N Batches: N Memory Usage: NkB
+ -> Parallel Seq Scan on gather_append_2 (actual rows=R loops=L)
+(15 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
test: select_parallel
test: write_parallel
test: vacuum_parallel
+test: gather_removed_append
# no relation related tests can be put in this group
test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..3af88b29f0a
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,94 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+ fk int,
+ f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+ fk int,
+ val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- The buckets/batches/memory values from the Parallel Hash node can vary between
+-- machines. Let's just replace the number with an 'N'.
+create function explain_gather(query text) returns setof text
+language plpgsql as
+$$
+declare
+ ln text;
+begin
+ for ln in
+ execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s',
+ query)
+ loop
+ if not ln like '%Gather%' then
+ ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=R loops=L');
+ end if;
+ ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N');
+ ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N');
+ ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N');
+ return next ln;
+ end loop;
+end;
+$$;
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);');
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+ SELECT fk FROM gather_append_1 WHERE f
+ UNION ALL
+ SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
--
2.34.1
On Tue, 25 Jan 2022 at 17:35, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
And another attempt to fix tests volatility.
FWIW, I had not really seen the point in adding a test for this. I
did however see a point in it with your original patch. It seemed
useful there to verify that Gather and GatherMerge did what we
expected with 1 worker.
David
On Tue, 25 Jan 2022 at 20:03, David Rowley <dgrowleyml@gmail.com> wrote:
On Tue, 25 Jan 2022 at 17:35, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
And another attempt to fix tests volatility.
FWIW, I had not really seen the point in adding a test for this. I
did however see a point in it with your original patch. It seemed
useful there to verify that Gather and GatherMerge did what we
expected with 1 worker.
I ended up pushing just the last patch I sent.
The reason I didn't think it was worth adding a new test was that no
tests were added in the original commit. Existing tests did cover it,
but here we're just restoring the original behaviour for one simple
case. The test in your patch just seemed a bit more hassle than it
was worth. I struggle to imagine how we'll break this again.
David
В Вт, 25/01/2022 в 21:20 +1300, David Rowley пишет:
On Tue, 25 Jan 2022 at 20:03, David Rowley <dgrowleyml@gmail.com> wrote:
On Tue, 25 Jan 2022 at 17:35, Yura Sokolov <y.sokolov@postgrespro.ru> wrote:
And another attempt to fix tests volatility.
FWIW, I had not really seen the point in adding a test for this. I
did however see a point in it with your original patch. It seemed
useful there to verify that Gather and GatherMerge did what we
expected with 1 worker.I ended up pushing just the last patch I sent.
The reason I didn't think it was worth adding a new test was that no
tests were added in the original commit. Existing tests did cover it,
Existed tests didn't catched the issue. It is pitty fix is merged
without test case it fixes.
but here we're just restoring the original behaviour for one simple
case. The test in your patch just seemed a bit more hassle than it
was worth. I struggle to imagine how we'll break this again.
Thank you for attention and for fix.
regards,
Yura Sokolov.
Yura Sokolov <y.sokolov@postgrespro.ru> writes:
В Вт, 25/01/2022 в 21:20 +1300, David Rowley пишет:
The reason I didn't think it was worth adding a new test was that no
tests were added in the original commit. Existing tests did cover it,
Existed tests didn't catched the issue. It is pitty fix is merged
without test case it fixes.
I share David's skepticism about the value of a test case. The
failure mode that seems likely to me is some other code path making
the same mistake, which a predetermined test would not catch.
Therefore, what I think could be useful is some very-late-stage
assertion check (probably in createplan.c) verifying that the
child of a Gather is parallel-aware. Or maybe the condition
needs to be more general than that, but anyway the idea is for
the back end of the planner to verify that we didn't build a
silly plan.
regards, tom lane
On Wed, 26 Jan 2022 at 05:32, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Therefore, what I think could be useful is some very-late-stage
assertion check (probably in createplan.c) verifying that the
child of a Gather is parallel-aware. Or maybe the condition
needs to be more general than that, but anyway the idea is for
the back end of the planner to verify that we didn't build a
silly plan.
Yeah, it would be nice to have something like this. I think to do it,
we might need to invent some sort of path traversal function that can
take a custom callback function. The problem is that the parallel
aware path does not need to be directly below the gather/gathermerge.
For example (from select_distinct.out)
Unique
-> Sort
Sort Key: four
-> Gather
Workers Planned: 2
-> HashAggregate
Group Key: four
-> Parallel Seq Scan on tenk1
For this case, the custom callback would check that there's at least 1
parallel_aware subpath below the Gather/GatherMerge.
There's probably some other rules that we could Assert are true. I
think any parallel_aware paths (unless they're scans) must contain
only parallel_aware subpaths. For example, parallel hash join must
have a parallel aware inner and outer.
David
On Wed, 26 Jan 2022 at 05:32, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Therefore, what I think could be useful is some very-late-stage
assertion check (probably in createplan.c) verifying that the
child of a Gather is parallel-aware. Or maybe the condition
needs to be more general than that, but anyway the idea is for
the back end of the planner to verify that we didn't build a
silly plan.
I had a go at writing something along these lines, but I've ended up
with something I really don't like very much.
I ended up having to write a recursive path traversal function. It's
generic and it can be given a callback function to do whatever we like
with the Path. The problem is, that this seems like quite a bit of
code to maintain just for plan validation in Assert builds.
Currently, the patch validates 3 rules:
1) Ensure a parallel_aware path has only parallel_aware or
parallel_safe subpaths.
2) Ensure Gather is either single_copy or contains at least one
parallel_aware subnode.
3) Ensure GatherMerge contains at least one parallel_aware subnode.
I had to relax rule #1 a little as a Parallel Append can run subnodes
that are only parallel_safe and not parallel_aware. The problem with
relaxing this rule is that it does not catch the case that this bug
report was about. I could maybe tweak that so there's a special case
for Append to allow parallel aware or safe and ensure all other nodes
have only parallel_safe subnodes. I just don't really like that
special case as it's likely to get broken/forgotten over time when we
add new nodes.
I'm unsure if just being able to enforce rules #2 and #3 make this worthwhile.
Happy to listen to other people's opinions and ideas on this. Without
those, I'm unlikely to try to push this any further.
David
Attachments:
do_some_plan_validation_during_create_plan.patchapplication/octet-stream; name=do_some_plan_validation_during_create_plan.patchDownload
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index cd6d72c763..56e28ba739 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -313,6 +313,21 @@ static ModifyTable *make_modifytable(PlannerInfo *root, Plan *subplan,
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
+static bool contains_a_parallel_path(Path *path);
+static bool contains_only_parallel_aware_and_safe_paths(Path *path);
+
+/*
+ * PathTypeCount
+ * Used for various checks to assert plans are sane in assert enabled
+ * builds.
+ */
+typedef struct PathTypeCount
+{
+ uint64 count;
+ uint64 parallel_safe_count;
+ uint64 parallel_aware_count;
+
+} PathTypeCount;
/*
@@ -389,6 +404,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
/* Guard against stack overflow due to overly complex plans */
check_stack_depth();
+ /* Parallel aware paths should contain only parallel aware subpaths. */
+ Assert(!best_path->parallel_aware ||
+ contains_only_parallel_aware_and_safe_paths(best_path));
+
switch (best_path->pathtype)
{
case T_SeqScan:
@@ -481,6 +500,13 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
case T_Gather:
plan = (Plan *) create_gather_plan(root,
(GatherPath *) best_path);
+
+ /*
+ * We expect a Gather to contain at least one parallel path unless
+ * running in single_copy mode.
+ */
+ Assert(((GatherPath *) best_path)->single_copy ||
+ contains_a_parallel_path(((GatherPath *) best_path)->subpath));
break;
case T_Sort:
plan = (Plan *) create_sort_plan(root,
@@ -537,6 +563,8 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
case T_GatherMerge:
plan = (Plan *) create_gather_merge_plan(root,
(GatherMergePath *) best_path);
+ /* We expect a GatherMerge to contain at least one parallel path */
+ Assert(contains_a_parallel_path(((GatherMergePath *) best_path)->subpath));
break;
default:
elog(ERROR, "unrecognized node type: %d",
@@ -7052,6 +7080,210 @@ make_modifytable(PlannerInfo *root, Plan *subplan,
return node;
}
+/*
+ * path_tree_walker
+ * Walk a path tree beginning with 'path' and call the 'walker' function
+ * for that path and each of its subpaths recursively.
+ */
+static void
+path_tree_walker(Path *path, void (*walker) (), void *context)
+
+{
+ if (path == NULL)
+ return;
+
+ /* Guard against stack overflow due to overly complex path trees */
+ check_stack_depth();
+
+ walker(path, context);
+
+ switch (path->pathtype)
+ {
+ case T_SeqScan:
+ case T_SampleScan:
+ case T_IndexScan:
+ case T_IndexOnlyScan:
+ case T_BitmapHeapScan:
+ case T_TidScan:
+ case T_TidRangeScan:
+ case T_SubqueryScan:
+ case T_FunctionScan:
+ case T_TableFuncScan:
+ case T_ValuesScan:
+ case T_CteScan:
+ case T_WorkTableScan:
+ case T_NamedTuplestoreScan:
+ case T_ForeignScan:
+ case T_CustomScan:
+ /* Scan paths have no subpaths */
+ break;
+ case T_HashJoin:
+ case T_MergeJoin:
+ case T_NestLoop:
+ path_tree_walker(((JoinPath *) path)->outerjoinpath, walker, context);
+ path_tree_walker(((JoinPath *) path)->innerjoinpath, walker, context);
+ break;
+ case T_Append:
+ {
+ AppendPath *apath = (AppendPath *) path;
+ ListCell *lc;
+
+ foreach(lc, apath->subpaths)
+ {
+ Path *subpath = lfirst(lc);
+
+ path_tree_walker(subpath, walker, context);
+ }
+ }
+ break;
+ case T_MergeAppend:
+ {
+ MergeAppendPath *mpath = (MergeAppendPath *) path;
+ ListCell *lc;
+
+ foreach(lc, mpath->subpaths)
+ {
+ Path *subpath = lfirst(lc);
+
+ path_tree_walker(subpath, walker, context);
+ }
+ }
+ break;
+ case T_Result:
+ if (IsA(path, ProjectionPath))
+ {
+ path_tree_walker(((ProjectionPath *) path)->subpath, walker, context);
+ }
+ else if (IsA(path, MinMaxAggPath))
+ {
+ /* MinMaxAggPath has no subpaths */
+ }
+ else if (IsA(path, GroupResultPath))
+ {
+ /* GroupResultPath has no subpaths */
+ }
+ else
+ {
+ /* No subpaths for any other Result type path */
+ }
+ break;
+ case T_ProjectSet:
+ path_tree_walker(((ProjectSetPath *) path)->subpath, walker, context);
+ break;
+ case T_Material:
+ path_tree_walker(((MaterialPath *) path)->subpath, walker, context);
+ break;
+ case T_Memoize:
+ path_tree_walker(((MemoizePath *) path)->subpath, walker, context);
+ break;
+ case T_Unique:
+ if (IsA(path, UpperUniquePath))
+ {
+ path_tree_walker(((UpperUniquePath *) path)->subpath, walker, context);
+ }
+ else
+ {
+ Assert(IsA(path, UniquePath));
+ path_tree_walker(((UniquePath *) path)->subpath, walker, context);
+ }
+ break;
+ case T_Gather:
+ path_tree_walker(((GatherPath *) path)->subpath, walker, context);
+ break;
+ case T_Sort:
+ path_tree_walker(((SortPath *) path)->subpath, walker, context);
+ break;
+ case T_IncrementalSort:
+ path_tree_walker(((IncrementalSortPath *) path)->spath.subpath, walker, context);
+ break;
+ case T_Group:
+ path_tree_walker(((GroupPath *) path)->subpath, walker, context);
+ break;
+ case T_Agg:
+ if (IsA(path, GroupingSetsPath))
+ path_tree_walker(((GroupingSetsPath *) path)->subpath, walker, context);
+ else
+ {
+ Assert(IsA(path, AggPath));
+ path_tree_walker(((AggPath *) path)->subpath, walker, context);
+ }
+ break;
+ case T_WindowAgg:
+ path_tree_walker(((WindowAggPath *) path)->subpath, walker, context);
+ break;
+ case T_SetOp:
+ path_tree_walker(((SetOpPath *) path)->subpath, walker, context);
+ break;
+ case T_RecursiveUnion:
+ path_tree_walker(((RecursiveUnionPath *) path)->leftpath, walker, context);
+ path_tree_walker(((RecursiveUnionPath *) path)->rightpath, walker, context);
+ break;
+ case T_LockRows:
+ path_tree_walker(((LockRowsPath *) path)->subpath, walker, context);
+ break;
+ case T_ModifyTable:
+ path_tree_walker(((ModifyTablePath *) path)->subpath, walker, context);
+ break;
+ case T_Limit:
+ path_tree_walker(((LimitPath *) path)->subpath, walker, context);
+ break;
+ case T_GatherMerge:
+ path_tree_walker(((GatherMergePath *) path)->subpath, walker, context);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d", (int) path->pathtype);
+ break;
+ }
+}
+
+/*
+ * path_type_counter
+ * Determine the total number of paths and the number of paths that are
+ * parallel_aware and the number that are parallel safe.
+ */
+static void
+path_type_counter(Path *path, PathTypeCount *pathcount)
+{
+ pathcount->count++;
+ if (path->parallel_aware)
+ pathcount->parallel_aware_count++;
+ else if (path->parallel_safe)
+ pathcount->parallel_safe_count++;
+}
+
+/*
+ * contains_a_parallel_path
+ * Determine if 'path' or any of its subpaths are parallel aware
+ */
+static bool
+contains_a_parallel_path(Path *path)
+{
+ PathTypeCount pathcount;
+
+ memset(&pathcount, 0, sizeof(pathcount));
+
+ path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+ return (pathcount.parallel_aware_count > 0);
+}
+
+/*
+ * contains_only_parallel_aware_and_safe_paths
+ * Returns true if 'path' and all of its subpaths are either parallel
+ * aware or parallel safe.
+ */
+static bool
+contains_only_parallel_aware_and_safe_paths(Path *path)
+{
+ PathTypeCount pathcount;
+
+ memset(&pathcount, 0, sizeof(pathcount));
+
+ path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+ return (pathcount.parallel_aware_count + pathcount.parallel_safe_count == pathcount.count);
+}
+
/*
* is_projection_capable_path
* Check whether a given Path node is able to do projection.
On Thu, Feb 3, 2022 at 7:08 PM David Rowley <dgrowleyml@gmail.com> wrote:
Currently, the patch validates 3 rules:
1) Ensure a parallel_aware path has only parallel_aware or
parallel_safe subpaths.
I think that every path that is parallel_aware must also be
parallel_safe. So checking for either parallel_aware or parallel_safe
should be equivalent to just checking parallel_safe, unless I am
confused.
I think the actual rule is: every path under a Gather or GatherMerge
must be parallel-safe.
I don't think there's any real rule about what has to be under
parallel-aware paths -- except that it would have to be all
parallel-safe stuff, because the whole thing is under a Gather
(Merge). There may seem to be such a rule, but I suspect it's just an
accident of whatever code we have now rather than anything intrinsic.
2) Ensure Gather is either single_copy or contains at least one
parallel_aware subnode.
I agree that this one is a rule which we could check.
3) Ensure GatherMerge contains at least one parallel_aware subnode.
This one, too.
--
Robert Haas
EDB: http://www.enterprisedb.com
Thanks for having a look at this.
On Fri, 4 Feb 2022 at 13:48, Robert Haas <robertmhaas@gmail.com> wrote:
I think the actual rule is: every path under a Gather or GatherMerge
must be parallel-safe.
I've adjusted the patch so that it counts parallel_aware and
parallel_safe Paths independently and verifies everything below a
Gather[Merge] is parallel_safe.
The diff stat currently looks like:
src/backend/optimizer/plan/createplan.c | 230
1 file changed, 230 insertions(+)
I still feel this is quite a bit of code for what we're getting here.
I'd be more for it if the path traversal function existed for some
other reason and I was just adding the callback functions and Asserts.
I'm keen to hear what others think about that.
David
Attachments:
do_some_plan_validation_during_create_plan_v2.patchtext/plain; charset=US-ASCII; name=do_some_plan_validation_during_create_plan_v2.patchDownload
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index cd6d72c763..898046ca07 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -313,6 +313,20 @@ static ModifyTable *make_modifytable(PlannerInfo *root, Plan *subplan,
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
+static bool contains_a_parallel_aware_path(Path *path);
+static bool contains_only_parallel_safe_paths(Path *path);
+
+/*
+ * PathTypeCount
+ * Used for various checks to assert plans are sane in assert enabled
+ * builds.
+ */
+typedef struct PathTypeCount
+{
+ uint64 count;
+ uint64 parallel_safe_count;
+ uint64 parallel_aware_count;
+} PathTypeCount;
/*
@@ -389,6 +403,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
/* Guard against stack overflow due to overly complex plans */
check_stack_depth();
+ /* Parallel aware paths should contain only parallel safe subpaths. */
+ Assert(!best_path->parallel_aware ||
+ contains_only_parallel_safe_paths(best_path));
+
switch (best_path->pathtype)
{
case T_SeqScan:
@@ -481,6 +499,14 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
case T_Gather:
plan = (Plan *) create_gather_plan(root,
(GatherPath *) best_path);
+
+ /*
+ * We expect a Gather to contain at least one parallel aware path
+ * unless running in single_copy mode.
+ */
+ Assert(((GatherPath *) best_path)->single_copy ||
+ contains_a_parallel_aware_path(((GatherPath *)
+ best_path)->subpath));
break;
case T_Sort:
plan = (Plan *) create_sort_plan(root,
@@ -537,6 +563,9 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
case T_GatherMerge:
plan = (Plan *) create_gather_merge_plan(root,
(GatherMergePath *) best_path);
+ /* GatherMerge must contain at least one parallel aware path */
+ Assert(contains_a_parallel_aware_path(((GatherMergePath *)
+ best_path)->subpath));
break;
default:
elog(ERROR, "unrecognized node type: %d",
@@ -7052,6 +7081,207 @@ make_modifytable(PlannerInfo *root, Plan *subplan,
return node;
}
+/*
+ * path_tree_walker
+ * Walk a path tree beginning with 'path' and call the 'walker' function
+ * for that path and each of its subpaths recursively.
+ */
+static void
+path_tree_walker(Path *path, void (*walker) (), void *context)
+
+{
+ if (path == NULL)
+ return;
+
+ /* Guard against stack overflow due to overly complex path trees */
+ check_stack_depth();
+
+ walker(path, context);
+
+ switch (path->pathtype)
+ {
+ case T_SeqScan:
+ case T_SampleScan:
+ case T_IndexScan:
+ case T_IndexOnlyScan:
+ case T_BitmapHeapScan:
+ case T_TidScan:
+ case T_TidRangeScan:
+ case T_SubqueryScan:
+ case T_FunctionScan:
+ case T_TableFuncScan:
+ case T_ValuesScan:
+ case T_CteScan:
+ case T_WorkTableScan:
+ case T_NamedTuplestoreScan:
+ case T_ForeignScan:
+ case T_CustomScan:
+ /* Scan paths have no subpaths */
+ break;
+ case T_HashJoin:
+ case T_MergeJoin:
+ case T_NestLoop:
+ path_tree_walker(((JoinPath *) path)->outerjoinpath, walker, context);
+ path_tree_walker(((JoinPath *) path)->innerjoinpath, walker, context);
+ break;
+ case T_Append:
+ {
+ AppendPath *apath = (AppendPath *) path;
+ ListCell *lc;
+
+ foreach(lc, apath->subpaths)
+ {
+ Path *subpath = lfirst(lc);
+
+ path_tree_walker(subpath, walker, context);
+ }
+ }
+ break;
+ case T_MergeAppend:
+ {
+ MergeAppendPath *mpath = (MergeAppendPath *) path;
+ ListCell *lc;
+
+ foreach(lc, mpath->subpaths)
+ {
+ Path *subpath = lfirst(lc);
+
+ path_tree_walker(subpath, walker, context);
+ }
+ }
+ break;
+ case T_Result:
+ if (IsA(path, ProjectionPath))
+ {
+ path_tree_walker(((ProjectionPath *) path)->subpath, walker, context);
+ }
+ else if (IsA(path, MinMaxAggPath))
+ {
+ /* MinMaxAggPath has no subpaths */
+ }
+ else if (IsA(path, GroupResultPath))
+ {
+ /* GroupResultPath has no subpaths */
+ }
+ else
+ {
+ /* No subpaths for any other Result type path */
+ }
+ break;
+ case T_ProjectSet:
+ path_tree_walker(((ProjectSetPath *) path)->subpath, walker, context);
+ break;
+ case T_Material:
+ path_tree_walker(((MaterialPath *) path)->subpath, walker, context);
+ break;
+ case T_Memoize:
+ path_tree_walker(((MemoizePath *) path)->subpath, walker, context);
+ break;
+ case T_Unique:
+ if (IsA(path, UpperUniquePath))
+ path_tree_walker(((UpperUniquePath *) path)->subpath, walker, context);
+ else
+ {
+ Assert(IsA(path, UniquePath));
+ path_tree_walker(((UniquePath *) path)->subpath, walker, context);
+ }
+ break;
+ case T_Gather:
+ path_tree_walker(((GatherPath *) path)->subpath, walker, context);
+ break;
+ case T_Sort:
+ path_tree_walker(((SortPath *) path)->subpath, walker, context);
+ break;
+ case T_IncrementalSort:
+ path_tree_walker(((IncrementalSortPath *) path)->spath.subpath, walker, context);
+ break;
+ case T_Group:
+ path_tree_walker(((GroupPath *) path)->subpath, walker, context);
+ break;
+ case T_Agg:
+ if (IsA(path, GroupingSetsPath))
+ path_tree_walker(((GroupingSetsPath *) path)->subpath, walker, context);
+ else
+ {
+ Assert(IsA(path, AggPath));
+ path_tree_walker(((AggPath *) path)->subpath, walker, context);
+ }
+ break;
+ case T_WindowAgg:
+ path_tree_walker(((WindowAggPath *) path)->subpath, walker, context);
+ break;
+ case T_SetOp:
+ path_tree_walker(((SetOpPath *) path)->subpath, walker, context);
+ break;
+ case T_RecursiveUnion:
+ path_tree_walker(((RecursiveUnionPath *) path)->leftpath, walker, context);
+ path_tree_walker(((RecursiveUnionPath *) path)->rightpath, walker, context);
+ break;
+ case T_LockRows:
+ path_tree_walker(((LockRowsPath *) path)->subpath, walker, context);
+ break;
+ case T_ModifyTable:
+ path_tree_walker(((ModifyTablePath *) path)->subpath, walker, context);
+ break;
+ case T_Limit:
+ path_tree_walker(((LimitPath *) path)->subpath, walker, context);
+ break;
+ case T_GatherMerge:
+ path_tree_walker(((GatherMergePath *) path)->subpath, walker, context);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d", (int) path->pathtype);
+ break;
+ }
+}
+
+/*
+ * path_type_counter
+ * Determine the total number of paths and the number of paths that are
+ * parallel_aware and the number that are parallel safe.
+ */
+static void
+path_type_counter(Path *path, PathTypeCount *pathcount)
+{
+ pathcount->count++;
+ if (path->parallel_aware)
+ pathcount->parallel_aware_count++;
+ if (path->parallel_safe)
+ pathcount->parallel_safe_count++;
+}
+
+/*
+ * contains_a_parallel_aware_path
+ * Determine if 'path' or any of its subpaths are parallel aware
+ */
+static bool
+contains_a_parallel_aware_path(Path *path)
+{
+ PathTypeCount pathcount;
+
+ memset(&pathcount, 0, sizeof(pathcount));
+
+ path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+ return (pathcount.parallel_aware_count > 0);
+}
+
+/*
+ * contains_only_parallel_safe_paths
+ * Returns true if 'path' and all of its subpaths are parallel safe
+ */
+static bool
+contains_only_parallel_safe_paths(Path *path)
+{
+ PathTypeCount pathcount;
+
+ memset(&pathcount, 0, sizeof(pathcount));
+
+ path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+ return (pathcount.parallel_safe_count == pathcount.count);
+}
+
/*
* is_projection_capable_path
* Check whether a given Path node is able to do projection.
On Tue, Feb 8, 2022 at 1:11 PM David Rowley <dgrowleyml@gmail.com> wrote:
Thanks for having a look at this.
On Fri, 4 Feb 2022 at 13:48, Robert Haas <robertmhaas@gmail.com> wrote:
I think the actual rule is: every path under a Gather or GatherMerge
must be parallel-safe.I've adjusted the patch so that it counts parallel_aware and
parallel_safe Paths independently and verifies everything below a
Gather[Merge] is parallel_safe.The diff stat currently looks like:
src/backend/optimizer/plan/createplan.c | 230
1 file changed, 230 insertions(+)I still feel this is quite a bit of code for what we're getting here.
I'd be more for it if the path traversal function existed for some
other reason and I was just adding the callback functions and Asserts.I'm keen to hear what others think about that.
David
Hi,
+ break;
+ case T_MergeAppend:
The case for T_MergeAppend should be left indented.
+ case T_Result:
+ if (IsA(path, ProjectionPath))
Since the remaining sub-cases don't have subpath, they are covered by the
final `else` block - MinMaxAggPath and GroupResultPath don't need to be
checked.
For contains_a_parallel_aware_path(), it seems path_type_counter() can
return bool indicating whether the walker should return early (when
parallel aware count reaches 1).
Cheers
On Tue, Feb 8, 2022 at 4:11 PM David Rowley <dgrowleyml@gmail.com> wrote:
I still feel this is quite a bit of code for what we're getting here.
I'd be more for it if the path traversal function existed for some
other reason and I was just adding the callback functions and Asserts.I'm keen to hear what others think about that.
My view is that functions like path_tree_walker are good things to
have on general principle. I find it likely that it will find other
uses, and that if we don't add as part of this patch, someone will add
it for some other reason in the future. So I would not really count
that in deciding how big this patch is, and the rest of what you have
here is pretty short and to the point.
There is the more difficult philosophical question of whether it's
worth expending any code on this at all. I think it is pretty clear
that this has positive value: it could easily prevent >0 future bugs,
which IMHO is not bad for such a small patch. However, it does feel a
little bit primitive somehow, in the sense that there are a lot of
things you could do wrong which this wouldn't catch. For example, a
Gather with no parallel-aware node under it is probably busted, unless
someone invents new kinds of parallel operators that work differently
from what we have now. But a join beneath a Gather that is not itself
parallel-aware should have a parallel-aware node under exactly one
side of the join. If there's a parallel scan on both sides or neither
side, even with stuff on top of it, that's wrong. But a parallel-aware
join could do something else, e.g. Parallel Hash Join expects a
parallel path on both sides. Some other parallel-aware join type could
expect a parallel path on exactly one side without caring which one,
or on one specific side, or maybe even on neither side.
What we're really reasoning about here is whether the input is going
to be partitioned across multiple executions of the plan in a proper
way. A Gather is going to run the same plan in all of its workers, so
it wants a subplan that when run in all workers will together produce
all output rows. Parallel-aware scans partition the results across
workers, so they behave that way. A non-parallel aware join will work
that way if it joins a partition the input on one side to all of the
input from the other side, hence the rule I describe above. For
aggregates, you can't safely apply a plain old Aggregate operation
either to a regular scan or to a parallel-aware scan and get the right
answer, which is why we need Partial and Finalize stages for parallel
query. But for a lot of other nodes, like Materialize, their output
will have the same properties as the input: if the subplan of a
Materialize node produces all the rows on each execution, the
Materialize node will too; if it produces a partition of the output
rows each time it's executed, once per worker, the Materialize node
will do the same. And I think it's that kind of case that leads to the
check we have here, that there ought to be a parallel-aware node in
there someplace.
It might be the case that there's some more sophisticated check we
could be doing here that would be more satisfying than the one you've
written, but I'm not sure. Such a check might end up needing to know
the behavior of the existing nodes in a lot of detail, which then
wouldn't help with finding bugs in new functionality we add in the
future. In that sense, the kind of simple check you've got here has
something to recommend it: it won't catch everything people can do
wrong, but when it does trip, chances are good it's found a bug, and
it's got a good chance of continuing to work as well as it does today
even in the face of future additions. So I guess I'm mildly in favor
of it, but I would also find it entirely reasonable if you were to
decide it's not quite worth it.
--
Robert Haas
EDB: http://www.enterprisedb.com