Possible incorrect row estimation for Gather paths

Started by Anthonin Bonnefoyover 1 year ago7 messages
#1Anthonin Bonnefoy
anthonin.bonnefoy@datadoghq.com
2 attachment(s)

Hi,

While experimenting on an explain option to display all plan candidates
(very rough prototype here [1]https://github.com/bonnefoa/postgres/tree/plan-candidates), I've noticed some discrepancies in some
generated plans.

EXPLAIN (ALL_CANDIDATES) SELECT * FROM pgbench_accounts order by aid;
Plan 1
-> Gather Merge (cost=11108.32..22505.38 rows=100000 width=97)
Workers Planned: 1
-> Sort (cost=10108.31..10255.37 rows=58824 width=97)
Sort Key: aid
-> Parallel Seq Scan on pgbench_accounts (cost=0.00..2228.24
rows=58824 width=97)
Plan 2
-> Gather Merge (cost=11108.32..17873.08 rows=58824 width=97)
Workers Planned: 1
-> Sort (cost=10108.31..10255.37 rows=58824 width=97)
Sort Key: aid
-> Parallel Seq Scan on pgbench_accounts (cost=0.00..2228.24
rows=58824 width=97)

The 2 plans are similar except one Gather Merge has a lower 58K estimated
rows.

The first plan is created with generate_useful_gather_paths with
override_rows=false then create_gather_merge_path and will use rel->rows as
the row count (so the 100K rows of pgbench_accounts).
#0: create_gather_merge_path(...) at pathnode.c:1885:30
#1: generate_useful_gather_paths(... override_rows=false) at
allpaths.c:3286:11
#2: apply_scanjoin_target_to_paths(...) at planner.c:7744:3
#3: grouping_planner(...) at planner.c:1638:3

The second plan is created through create_ordered_paths then
create_gather_merge_path and the number of rows is estimated to
(worker_rows * parallel_workers). Since we only have 1 parallel worker,
this yields 58K rows.
#0: create_gather_merge_path(...) at pathnode.c:1885:30
#1: create_ordered_paths(...) at planner.c:5287:5
#2: grouping_planner(...) at planner.c:1717:17

The 58K row estimate looks possibly incorrect. A worker row count is
estimated using total_rows/parallel_divisor. The parallel_divisor will
include the possible leader participation and will be 1.7 for 1 worker thus
the 58K rows (100K/1.7=58K)
However, the gather merge will only do 58K*1, dropping the leader
participation component.

I have a tentative patch split in two changes:
1: This is a related refactoring to remove an unnecessary and confusing
assignment of rows in create_gather_merge_path. This value is never used
and immediately overwritten in cost_gather_merge
2: This changes the row estimation of gather path to use
worker_rows*parallel_divisor to get a more accurate estimation.
Additionally, when creating a gather merge path in create_ordered_paths, we
try to use the source's rel rows when available.

The patch triggered a small change in the hash_join regression test. Pre
patch, we had the following candidates.
Plan 4
-> Aggregate (cost=511.01..511.02 rows=1 width=8)
-> Gather (cost=167.02..511.00 rows=2 width=0)
Workers Planned: 1
-> Parallel Hash Join (cost=167.02..510.80 rows=1 width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r (cost=0.00..299.65
rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1 width=4)
-> Parallel Seq Scan on extremely_skewed s
(cost=0.00..167.01 rows=1 width=4)
Plan 5
-> Finalize Aggregate (cost=510.92..510.93 rows=1 width=8)
-> Gather (cost=510.80..510.91 rows=1 width=8)
Workers Planned: 1
-> Partial Aggregate (cost=510.80..510.81 rows=1 width=8)
-> Parallel Hash Join (cost=167.02..510.80 rows=1
width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r
(cost=0.00..299.65 rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1
width=4)
-> Parallel Seq Scan on extremely_skewed s
(cost=0.00..167.01 rows=1 width=4)

Post patch, the plan candidates became:
Plan 4
-> Finalize Aggregate (cost=511.02..511.03 rows=1 width=8)
-> Gather (cost=510.80..511.01 rows=2 width=8)
Workers Planned: 1
-> Partial Aggregate (cost=510.80..510.81 rows=1 width=8)
-> Parallel Hash Join (cost=167.02..510.80 rows=1
width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r
(cost=0.00..299.65 rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1
width=4)
-> Parallel Seq Scan on extremely_skewed s
(cost=0.00..167.01 rows=1 width=4)
Plan 5
-> Aggregate (cost=511.01..511.02 rows=1 width=8)
-> Gather (cost=167.02..511.00 rows=2 width=0)
Workers Planned: 1
-> Parallel Hash Join (cost=167.02..510.80 rows=1 width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r (cost=0.00..299.65
rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1 width=4)
-> Parallel Seq Scan on extremely_skewed s
(cost=0.00..167.01 rows=1 width=4)

The FinalizeAggregate plan has an increased cost of 1 post patch due to the
number of rows in the Gather node that went from 1 to 2 (rint(1 * 1.7)=2).
This was enough to make the Agggregate plan cheaper. The test is to check
the parallel hash join so updating it with the new cheapest plan looked
fine.

Regards,
Anthonin

[1]: https://github.com/bonnefoa/postgres/tree/plan-candidates

Attachments:

v1-0002-Fix-row-estimation-in-gather-paths.patchapplication/octet-stream; name=v1-0002-Fix-row-estimation-in-gather-paths.patchDownload
From 95558d304f8880ef2f1f0700b9d62a6f9617d248 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 11:24:44 +0200
Subject: Fix row estimation in gather paths

In parallel plans, the row count of a partial plan is estimated to
(rows/parallel_divisor). The parallel_divisor is the number of
parallel_workers plus a possible leader contribution.

When creating a gather path, we currently estimate the sum of gathered
rows to worker_rows*parallel_workers which leads to a lower estimated
row count.

This patch changes the gather path row estimation to
worker_rows*parallel_divisor to get a more accurate estimation.
Additionally, when creating a gather merge path in create_ordered_paths,
we try to use the source's rel rows when available.
---
 src/backend/optimizer/path/allpaths.c   |  7 +++----
 src/backend/optimizer/path/costsize.c   | 19 +++++++++++++++++++
 src/backend/optimizer/plan/planner.c    | 12 +++++++++---
 src/include/optimizer/optimizer.h       |  2 ++
 src/test/regress/expected/join_hash.out | 19 +++++++++----------
 5 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 4895cee994..c1244a9b83 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3071,8 +3071,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+	rows = gather_rows_estimate(cheapest_partial_path);
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
 						   NULL, rowsp);
@@ -3090,7 +3089,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
-		rows = subpath->rows * subpath->parallel_workers;
+		rows = gather_rows_estimate(subpath);
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, NULL, rowsp);
 		add_path(rel, &path->path);
@@ -3274,7 +3273,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 													subpath,
 													useful_pathkeys,
 													-1.0);
-				rows = subpath->rows * subpath->parallel_workers;
+				rows = gather_rows_estimate(subpath);
 			}
 			else
 				subpath = (Path *) create_incremental_sort_path(root,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..24feb513ce 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -217,6 +217,25 @@ clamp_row_est(double nrows)
 	return nrows;
 }
 
+/*
+ * gather_rows_estimate
+ *		Estimate the number of rows for gather nodes.
+ *
+ * When creating a gather (merge) path, we need to estimate the sum of rows
+ * distributed to all workers. A worker will have an estimated row set to
+ * (rows / parallel_divisor). Since parallel_divisor may include the leader
+ * contribution, we can't simply multiply workers' rows by the number of
+ * parallel_workers and instead need to reuse the parallel_divisor to get a
+ * more accurate estimation.
+ */
+double
+gather_rows_estimate(Path *partial_path)
+{
+	double		parallel_divisor = get_parallel_divisor(partial_path);
+
+	return clamp_row_est(partial_path->rows * parallel_divisor);
+}
+
 /*
  * clamp_width_est
  *		Force a tuple-width estimate to a sane value.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 032818423f..1f0a78be76 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -5281,8 +5281,14 @@ create_ordered_paths(PlannerInfo *root,
 																	root->sort_pathkeys,
 																	presorted_keys,
 																	limit_tuples);
-			total_groups = input_path->rows *
-				input_path->parallel_workers;
+			total_groups = input_rel->rows;
+
+			/*
+			 * If the number of rows is unknown, fallback to gather rows
+			 * estimation
+			 */
+			if (total_groups == 0)
+				total_groups = gather_rows_estimate(input_path);
 			sorted_path = (Path *)
 				create_gather_merge_path(root, ordered_rel,
 										 sorted_path,
@@ -7453,7 +7459,7 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 			(presorted_keys == 0 || !enable_incremental_sort))
 			continue;
 
-		total_groups = path->rows * path->parallel_workers;
+		total_groups = gather_rows_estimate(path);
 
 		/*
 		 * We've no need to consider both a sort and incremental sort. We'll
diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h
index 7b63c5cf71..8e602cf95f 100644
--- a/src/include/optimizer/optimizer.h
+++ b/src/include/optimizer/optimizer.h
@@ -23,6 +23,7 @@
 #define OPTIMIZER_H
 
 #include "nodes/parsenodes.h"
+#include "nodes/pathnodes.h"
 
 /*
  * We don't want to include nodes/pathnodes.h here, because non-planner
@@ -92,6 +93,7 @@ extern PGDLLIMPORT int effective_cache_size;
 extern double clamp_row_est(double nrows);
 extern int32 clamp_width_est(int64 tuple_width);
 extern long clamp_cardinality_to_long(Cardinality x);
+extern double gather_rows_estimate(Path *partial_path);
 
 /* in path/indxpath.c: */
 
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 262fa71ed8..4fc34a0e72 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -508,18 +508,17 @@ set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = on;
 explain (costs off)
   select count(*) from simple r join extremely_skewed s using (id);
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Finalize Aggregate
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
    ->  Gather
          Workers Planned: 1
-         ->  Partial Aggregate
-               ->  Parallel Hash Join
-                     Hash Cond: (r.id = s.id)
-                     ->  Parallel Seq Scan on simple r
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on extremely_skewed s
-(9 rows)
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
  count 
-- 
2.39.3 (Apple Git-146)

v1-0001-Remove-unnecessary-assignment-of-path-rows-in-gat.patchapplication/octet-stream; name=v1-0001-Remove-unnecessary-assignment-of-path-rows-in-gat.patchDownload
From 0c878b8cb3684f7673026d350a7aa9d29ef5033e Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 09:24:04 +0200
Subject: Remove unnecessary assignment of path rows in gather merge

The number of rows of a gather merge node is set inside
cost_gather_merge, making the assignment in create_gather_merge_path
unnecessary and slightly misleading.

This patch removes the unnecessary assignment from
create_gather_merge_path, mirroring what's done in create_gather_path.
---
 src/backend/optimizer/util/pathnode.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 3491c3af1c..b4736d4986 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1899,7 +1899,6 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->num_workers = subpath->parallel_workers;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
-	pathnode->path.rows += subpath->rows;
 
 	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
 	{
-- 
2.39.3 (Apple Git-146)

#2Rafia Sabih
rafia.pghackers@gmail.com
In reply to: Anthonin Bonnefoy (#1)
Re: Possible incorrect row estimation for Gather paths

Hello Anthonin,

I spent some time on this problem and your proposed solution.
As I understand it, this is the correction for the row count when the
number of parallel workers < 4.
Once the number of workers is 4 or more, the output from
parallel_divisor is the same as the number of parallel workers.
And then the number of rows for such cases would be the same with or
without the proposed patch.
So that way I think it is good to fix this case for a smaller number of workers.

But I don't quite understood the purpose of this,
+ total_groups = input_rel->rows;
+
+ /*
+ * If the number of rows is unknown, fallback to gather rows
+ * estimation
+ */
+ if (total_groups == 0)
+ total_groups = gather_rows_estimate(input_path);

why not just use total_groups = gather_rows_estimate(input_path), what
is the importance of having total_groups = input_rel->rows?

With respect to the change introduced by the patch in the regression
test, I wonder if we should test it on the tables of a larger scale
and check for performance issues.
Imagine the case when Parallel Seq Scan on extremely_skewed s
(cost=0.00..167.01 rows=1 width=4) returns 1000 rows instead of 1 ...
I wonder which plan would perform better then or will there be a
totally different plan.

However, my hunch is that there should not be serious problems,
because before this patch the number of estimated rows was incorrect
anyway.

I don't see a problem in merging the two patches.

On Fri, 24 May 2024 at 11:35, Anthonin Bonnefoy
<anthonin.bonnefoy@datadoghq.com> wrote:

Hi,

While experimenting on an explain option to display all plan candidates (very rough prototype here [1]), I've noticed some discrepancies in some generated plans.

EXPLAIN (ALL_CANDIDATES) SELECT * FROM pgbench_accounts order by aid;
Plan 1
-> Gather Merge (cost=11108.32..22505.38 rows=100000 width=97)
Workers Planned: 1
-> Sort (cost=10108.31..10255.37 rows=58824 width=97)
Sort Key: aid
-> Parallel Seq Scan on pgbench_accounts (cost=0.00..2228.24 rows=58824 width=97)
Plan 2
-> Gather Merge (cost=11108.32..17873.08 rows=58824 width=97)
Workers Planned: 1
-> Sort (cost=10108.31..10255.37 rows=58824 width=97)
Sort Key: aid
-> Parallel Seq Scan on pgbench_accounts (cost=0.00..2228.24 rows=58824 width=97)

The 2 plans are similar except one Gather Merge has a lower 58K estimated rows.

The first plan is created with generate_useful_gather_paths with override_rows=false then create_gather_merge_path and will use rel->rows as the row count (so the 100K rows of pgbench_accounts).
#0: create_gather_merge_path(...) at pathnode.c:1885:30
#1: generate_useful_gather_paths(... override_rows=false) at allpaths.c:3286:11
#2: apply_scanjoin_target_to_paths(...) at planner.c:7744:3
#3: grouping_planner(...) at planner.c:1638:3

The second plan is created through create_ordered_paths then create_gather_merge_path and the number of rows is estimated to (worker_rows * parallel_workers). Since we only have 1 parallel worker, this yields 58K rows.
#0: create_gather_merge_path(...) at pathnode.c:1885:30
#1: create_ordered_paths(...) at planner.c:5287:5
#2: grouping_planner(...) at planner.c:1717:17

The 58K row estimate looks possibly incorrect. A worker row count is estimated using total_rows/parallel_divisor. The parallel_divisor will include the possible leader participation and will be 1.7 for 1 worker thus the 58K rows (100K/1.7=58K)
However, the gather merge will only do 58K*1, dropping the leader participation component.

I have a tentative patch split in two changes:
1: This is a related refactoring to remove an unnecessary and confusing assignment of rows in create_gather_merge_path. This value is never used and immediately overwritten in cost_gather_merge
2: This changes the row estimation of gather path to use worker_rows*parallel_divisor to get a more accurate estimation. Additionally, when creating a gather merge path in create_ordered_paths, we try to use the source's rel rows when available.

The patch triggered a small change in the hash_join regression test. Pre patch, we had the following candidates.
Plan 4
-> Aggregate (cost=511.01..511.02 rows=1 width=8)
-> Gather (cost=167.02..511.00 rows=2 width=0)
Workers Planned: 1
-> Parallel Hash Join (cost=167.02..510.80 rows=1 width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r (cost=0.00..299.65 rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1 width=4)
-> Parallel Seq Scan on extremely_skewed s (cost=0.00..167.01 rows=1 width=4)
Plan 5
-> Finalize Aggregate (cost=510.92..510.93 rows=1 width=8)
-> Gather (cost=510.80..510.91 rows=1 width=8)
Workers Planned: 1
-> Partial Aggregate (cost=510.80..510.81 rows=1 width=8)
-> Parallel Hash Join (cost=167.02..510.80 rows=1 width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r (cost=0.00..299.65 rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1 width=4)
-> Parallel Seq Scan on extremely_skewed s (cost=0.00..167.01 rows=1 width=4)

Post patch, the plan candidates became:
Plan 4
-> Finalize Aggregate (cost=511.02..511.03 rows=1 width=8)
-> Gather (cost=510.80..511.01 rows=2 width=8)
Workers Planned: 1
-> Partial Aggregate (cost=510.80..510.81 rows=1 width=8)
-> Parallel Hash Join (cost=167.02..510.80 rows=1 width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r (cost=0.00..299.65 rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1 width=4)
-> Parallel Seq Scan on extremely_skewed s (cost=0.00..167.01 rows=1 width=4)
Plan 5
-> Aggregate (cost=511.01..511.02 rows=1 width=8)
-> Gather (cost=167.02..511.00 rows=2 width=0)
Workers Planned: 1
-> Parallel Hash Join (cost=167.02..510.80 rows=1 width=0)
Hash Cond: (r.id = s.id)
-> Parallel Seq Scan on simple r (cost=0.00..299.65 rows=11765 width=4)
-> Parallel Hash (cost=167.01..167.01 rows=1 width=4)
-> Parallel Seq Scan on extremely_skewed s (cost=0.00..167.01 rows=1 width=4)

The FinalizeAggregate plan has an increased cost of 1 post patch due to the number of rows in the Gather node that went from 1 to 2 (rint(1 * 1.7)=2). This was enough to make the Agggregate plan cheaper. The test is to check the parallel hash join so updating it with the new cheapest plan looked fine.

Regards,
Anthonin

[1]: https://github.com/bonnefoa/postgres/tree/plan-candidates

--
Regards,
Rafia Sabih

#3Anthonin Bonnefoy
anthonin.bonnefoy@datadoghq.com
In reply to: Rafia Sabih (#2)
2 attachment(s)
Re: Possible incorrect row estimation for Gather paths

Hi Rafia,

Thanks for reviewing.

On Wed, Jul 10, 2024 at 4:54 PM Rafia Sabih <rafia.pghackers@gmail.com> wrote:

But I don't quite understood the purpose of this,
+ total_groups = input_rel->rows;
+
+ /*
+ * If the number of rows is unknown, fallback to gather rows
+ * estimation
+ */
+ if (total_groups == 0)
+ total_groups = gather_rows_estimate(input_path);

why not just use total_groups = gather_rows_estimate(input_path), what
is the importance of having total_groups = input_rel->rows?

The initial goal was to use the source tuples if available and avoid
possible rounding errors. Though I realise that the difference would
be minimal. For example, 200K tuples and 3 workers would yield
int(int(200000 / 2.4) * 2.4)=199999. That is probably not worth the
additional complexity, I've updated the patch to just use
gather_rows_estimate.

With respect to the change introduced by the patch in the regression
test, I wonder if we should test it on the tables of a larger scale
and check for performance issues.
Imagine the case when Parallel Seq Scan on extremely_skewed s
(cost=0.00..167.01 rows=1 width=4) returns 1000 rows instead of 1 ...
I wonder which plan would perform better then or will there be a
totally different plan.

For the extremely_skewed test, having the parallel seq scan returning
more rows won't impact the Gather since The Hash Join will reduce the
number of rows to 1. I've found an example where we can see the plan
changes with the default settings:

CREATE TABLE simple (id SERIAL PRIMARY KEY, other bigint);
insert into simple select generate_series(1,500000), ceil(random()*100);
analyze simple;
EXPLAIN SELECT * FROM simple where other < 10 order by id;

Unpatched:
Gather Merge (cost=9377.85..12498.60 rows=27137 width=12)
Workers Planned: 1
-> Sort (cost=8377.84..8445.68 rows=27137 width=12)
Sort Key: id
-> Parallel Seq Scan on simple (cost=0.00..6379.47
rows=27137 width=12)
Filter: (other < 10)

Patched:
Sort (cost=12381.73..12492.77 rows=44417 width=12)
Sort Key: id
-> Seq Scan on simple (cost=0.00..8953.00 rows=44417 width=12)
Filter: (other < 10)

Looking at the candidates, the previous Gather Merge now has an
estimated number of rows of 44418. The 1 difference compared to the
other Gather Merge plan is due to rounding (26128 * 1.7 = 44417.6).

Plan 3
-> Gather Merge (cost=9296.40..14358.75 rows=44418 width=12)
Workers Planned: 1
-> Sort (cost=8296.39..8361.71 rows=26128 width=12)
Sort Key: id
-> Parallel Seq Scan on simple (cost=0.00..6379.47
rows=26128 width=12)
Filter: (other < 10)
Plan 4
-> Gather Merge (cost=9296.40..14358.63 rows=44417 width=12)
Workers Planned: 1
-> Sort (cost=8296.39..8361.71 rows=26128 width=12)
Sort Key: id
-> Parallel Seq Scan on simple (cost=0.00..6379.47
rows=26128 width=12)
Filter: (other < 10)
Plan 5
-> Sort (cost=12381.73..12492.77 rows=44417 width=12)
Sort Key: id
-> Seq Scan on simple (cost=0.00..8953.00 rows=44417 width=12)
Filter: (other < 10)

The Sort plan is slightly slower than the Gather Merge plan: 100ms
average against 83ms but the Gather Merge comes at the additional cost
of creating and using a parallel worker. The unpatched row estimation
makes the parallel plan look cheaper and running a parallel query for
a 17ms improvement doesn't seem like a good trade.

I've also realised from the comments in optimizer.h that
nodes/pathnodes.h should not be included there and fixed it.

Regards,
Anthonin

Attachments:

v2-0001-Remove-unnecessary-assignment-of-path-rows-in-gat.patchapplication/octet-stream; name=v2-0001-Remove-unnecessary-assignment-of-path-rows-in-gat.patchDownload
From 2206c1cde09778aa589691d2a16696ee13140619 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 09:24:04 +0200
Subject: Remove unnecessary assignment of path rows in gather merge

The number of rows of a gather merge node is set inside
cost_gather_merge, making the assignment in create_gather_merge_path
unnecessary and slightly misleading.

This patch removes the unnecessary assignment from
create_gather_merge_path, mirroring what's done in create_gather_path.
---
 src/backend/optimizer/util/pathnode.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index c42742d2c7..d1c4e1a6aa 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1899,7 +1899,6 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->num_workers = subpath->parallel_workers;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
-	pathnode->path.rows += subpath->rows;
 
 	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
 	{
-- 
2.39.3 (Apple Git-146)

v2-0002-Fix-row-estimation-in-gather-paths.patchapplication/octet-stream; name=v2-0002-Fix-row-estimation-in-gather-paths.patchDownload
From fa7fc19c8d021fe314cd780af7ae724c1dd53096 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 11:24:44 +0200
Subject: Fix row estimation in gather paths

In parallel plans, the row count of a partial plan is estimated to
(rows/parallel_divisor). The parallel_divisor is the number of
parallel_workers plus a possible leader contribution.

When creating a gather path, we currently estimate the sum of gathered
rows to worker_rows*parallel_workers which leads to a lower estimated
row count.

This patch changes the gather path row estimation to
worker_rows*parallel_divisor to get a more accurate estimation.
---
 src/backend/optimizer/path/allpaths.c   |  7 +++----
 src/backend/optimizer/path/costsize.c   | 19 +++++++++++++++++++
 src/backend/optimizer/plan/planner.c    |  6 +++---
 src/include/nodes/pathnodes.h           |  5 +++++
 src/include/optimizer/optimizer.h       |  7 ++++++-
 src/test/regress/expected/join_hash.out | 19 +++++++++----------
 6 files changed, 45 insertions(+), 18 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 4895cee994..c1244a9b83 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3071,8 +3071,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+	rows = gather_rows_estimate(cheapest_partial_path);
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
 						   NULL, rowsp);
@@ -3090,7 +3089,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
-		rows = subpath->rows * subpath->parallel_workers;
+		rows = gather_rows_estimate(subpath);
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, NULL, rowsp);
 		add_path(rel, &path->path);
@@ -3274,7 +3273,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 													subpath,
 													useful_pathkeys,
 													-1.0);
-				rows = subpath->rows * subpath->parallel_workers;
+				rows = gather_rows_estimate(subpath);
 			}
 			else
 				subpath = (Path *) create_incremental_sort_path(root,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..24feb513ce 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -217,6 +217,25 @@ clamp_row_est(double nrows)
 	return nrows;
 }
 
+/*
+ * gather_rows_estimate
+ *		Estimate the number of rows for gather nodes.
+ *
+ * When creating a gather (merge) path, we need to estimate the sum of rows
+ * distributed to all workers. A worker will have an estimated row set to
+ * (rows / parallel_divisor). Since parallel_divisor may include the leader
+ * contribution, we can't simply multiply workers' rows by the number of
+ * parallel_workers and instead need to reuse the parallel_divisor to get a
+ * more accurate estimation.
+ */
+double
+gather_rows_estimate(Path *partial_path)
+{
+	double		parallel_divisor = get_parallel_divisor(partial_path);
+
+	return clamp_row_est(partial_path->rows * parallel_divisor);
+}
+
 /*
  * clamp_width_est
  *		Force a tuple-width estimate to a sane value.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 4711f91239..c3e234902e 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -5370,8 +5370,8 @@ create_ordered_paths(PlannerInfo *root,
 																	root->sort_pathkeys,
 																	presorted_keys,
 																	limit_tuples);
-			total_groups = input_path->rows *
-				input_path->parallel_workers;
+			total_groups = gather_rows_estimate(input_path);
+
 			sorted_path = (Path *)
 				create_gather_merge_path(root, ordered_rel,
 										 sorted_path,
@@ -7543,7 +7543,7 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 			(presorted_keys == 0 || !enable_incremental_sort))
 			continue;
 
-		total_groups = path->rows * path->parallel_workers;
+		total_groups = gather_rows_estimate(path);
 
 		/*
 		 * We've no need to consider both a sort and incremental sort. We'll
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 14ccfc1ac1..f460539d6c 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -1616,6 +1616,11 @@ typedef struct ParamPathInfo
  * between RelOptInfo and Path nodes can't be handled easily in a simple
  * depth-first traversal.  We also don't have read support at the moment.
  */
+#ifndef HAVE_PATH_TYPEDEF
+typedef struct Path Path;
+#define HAVE_PATH_TYPEDEF 1
+#endif
+
 typedef struct Path
 {
 	pg_node_attr(no_copy_equal, no_read, no_query_jumble)
diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h
index 7b63c5cf71..d1b16d1258 100644
--- a/src/include/optimizer/optimizer.h
+++ b/src/include/optimizer/optimizer.h
@@ -35,7 +35,7 @@ typedef struct PlannerInfo PlannerInfo;
 #define HAVE_PLANNERINFO_TYPEDEF 1
 #endif
 
-/* Likewise for IndexOptInfo and SpecialJoinInfo. */
+/* Likewise for IndexOptInfo, SpecialJoinInfo and Path. */
 #ifndef HAVE_INDEXOPTINFO_TYPEDEF
 typedef struct IndexOptInfo IndexOptInfo;
 #define HAVE_INDEXOPTINFO_TYPEDEF 1
@@ -44,6 +44,10 @@ typedef struct IndexOptInfo IndexOptInfo;
 typedef struct SpecialJoinInfo SpecialJoinInfo;
 #define HAVE_SPECIALJOININFO_TYPEDEF 1
 #endif
+#ifndef HAVE_PATH_TYPEDEF
+typedef struct Path Path;
+#define HAVE_PATH_TYPEDEF 1
+#endif
 
 /* It also seems best not to include plannodes.h, params.h, or htup.h here */
 struct PlannedStmt;
@@ -92,6 +96,7 @@ extern PGDLLIMPORT int effective_cache_size;
 extern double clamp_row_est(double nrows);
 extern int32 clamp_width_est(int64 tuple_width);
 extern long clamp_cardinality_to_long(Cardinality x);
+extern double gather_rows_estimate(Path *partial_path);
 
 /* in path/indxpath.c: */
 
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 262fa71ed8..4fc34a0e72 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -508,18 +508,17 @@ set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = on;
 explain (costs off)
   select count(*) from simple r join extremely_skewed s using (id);
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Finalize Aggregate
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
    ->  Gather
          Workers Planned: 1
-         ->  Partial Aggregate
-               ->  Parallel Hash Join
-                     Hash Cond: (r.id = s.id)
-                     ->  Parallel Seq Scan on simple r
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on extremely_skewed s
-(9 rows)
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
  count 
-- 
2.39.3 (Apple Git-146)

#4Richard Guo
guofenglinux@gmail.com
In reply to: Anthonin Bonnefoy (#3)
Re: Possible incorrect row estimation for Gather paths

I can reproduce this problem with the query below.

explain (costs on) select * from tenk1 order by twenty;
QUERY PLAN
---------------------------------------------------------------------------------
Gather Merge (cost=772.11..830.93 rows=5882 width=244)
Workers Planned: 1
-> Sort (cost=772.10..786.80 rows=5882 width=244)
Sort Key: twenty
-> Parallel Seq Scan on tenk1 (cost=0.00..403.82 rows=5882 width=244)
(5 rows)

On Tue, Jul 16, 2024 at 3:56 PM Anthonin Bonnefoy
<anthonin.bonnefoy@datadoghq.com> wrote:

The initial goal was to use the source tuples if available and avoid
possible rounding errors. Though I realise that the difference would
be minimal. For example, 200K tuples and 3 workers would yield
int(int(200000 / 2.4) * 2.4)=199999. That is probably not worth the
additional complexity, I've updated the patch to just use
gather_rows_estimate.

I wonder if the changes in create_ordered_paths should also be reduced
to 'total_groups = gather_rows_estimate(path);'.

I've also realised from the comments in optimizer.h that
nodes/pathnodes.h should not be included there and fixed it.

I think perhaps it's better to declare gather_rows_estimate() in
cost.h rather than optimizer.h.
(BTW, I wonder if compute_gather_rows() would be a better name?)

I noticed another issue in generate_useful_gather_paths() -- *rowsp
would have a random value if override_rows is true and we use
incremental sort for gather merge. I think we should fix this too.

Thanks
Richard

#5Anthonin Bonnefoy
anthonin.bonnefoy@datadoghq.com
In reply to: Richard Guo (#4)
2 attachment(s)
Re: Possible incorrect row estimation for Gather paths

On Wed, Jul 17, 2024 at 3:59 AM Richard Guo <guofenglinux@gmail.com> wrote:

I can reproduce this problem with the query below.

explain (costs on) select * from tenk1 order by twenty;
QUERY PLAN
---------------------------------------------------------------------------------
Gather Merge (cost=772.11..830.93 rows=5882 width=244)
Workers Planned: 1
-> Sort (cost=772.10..786.80 rows=5882 width=244)
Sort Key: twenty
-> Parallel Seq Scan on tenk1 (cost=0.00..403.82 rows=5882 width=244)
(5 rows)

I was looking for a test to add in the regress checks that wouldn't
rely on explain cost since it is disabled. However, I've realised I
could do something similar to what's done in stats_ext with the
check_estimated_rows function. I've added the get_estimated_rows test
function that extracts the estimated rows from the top node and uses
it to check the gather nodes' estimates. get_estimated_rows uses a
simple explain compared to check_estimated_rows which relies on an
explain analyze.

I wonder if the changes in create_ordered_paths should also be reduced
to 'total_groups = gather_rows_estimate(path);'.

It should already be the case with the patch v2. It does create
rounding errors that are visible in the tests but they shouldn't
exceed +1 or -1.

I think perhaps it's better to declare gather_rows_estimate() in
cost.h rather than optimizer.h.
(BTW, I wonder if compute_gather_rows() would be a better name?)

Good point, I've moved the declaration and renamed it.

I noticed another issue in generate_useful_gather_paths() -- *rowsp
would have a random value if override_rows is true and we use
incremental sort for gather merge. I think we should fix this too.

That seems to be the case. I've tried to find a query that could
trigger this codepath without success. All grouping and distinct paths
I've tried where fully sorted and didn't trigger an incremental sort.
I will need a bit more time to check this.

In the meantime, I've updated the patches with the review comments.

Regards,
Anthonin

Attachments:

v3-0001-Remove-unnecessary-assignment-of-path-rows-in-gat.patchapplication/octet-stream; name=v3-0001-Remove-unnecessary-assignment-of-path-rows-in-gat.patchDownload
From 8559e5fe2e7697d96c86d5c00ab54b789bbc6c8c Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 09:24:04 +0200
Subject: Remove unnecessary assignment of path rows in gather merge

The number of rows of a gather merge node is set inside
cost_gather_merge, making the assignment in create_gather_merge_path
unnecessary and slightly misleading.

This patch removes the unnecessary assignment from
create_gather_merge_path, mirroring what's done in create_gather_path.
---
 src/backend/optimizer/util/pathnode.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index c42742d2c7..d1c4e1a6aa 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1899,7 +1899,6 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->num_workers = subpath->parallel_workers;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
-	pathnode->path.rows += subpath->rows;
 
 	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
 	{
-- 
2.39.3 (Apple Git-146)

v3-0002-Fix-row-estimation-in-gather-paths.patchapplication/octet-stream; name=v3-0002-Fix-row-estimation-in-gather-paths.patchDownload
From 2dca72bb66c086ef77a27f7d7ff0bb524b4b9108 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 11:24:44 +0200
Subject: Fix row estimation in gather paths

In parallel plans, the row count of a partial plan is estimated to
(rows/parallel_divisor). The parallel_divisor is the number of
parallel_workers plus a possible leader contribution.

When creating a gather path, we currently estimate the sum of gathered
rows to worker_rows*parallel_workers which leads to a lower estimated
row count.

This patch changes the gather path row estimation to
worker_rows*parallel_divisor to get a more accurate estimation.
---
 src/backend/optimizer/path/allpaths.c         |  7 ++--
 src/backend/optimizer/path/costsize.c         | 19 +++++++++
 src/backend/optimizer/plan/planner.c          |  6 +--
 src/include/optimizer/cost.h                  |  1 +
 src/test/regress/expected/join_hash.out       | 19 +++++----
 src/test/regress/expected/select_parallel.out | 39 +++++++++++++++++++
 src/test/regress/expected/test_setup.out      | 20 ++++++++++
 src/test/regress/sql/select_parallel.sql      | 11 ++++++
 src/test/regress/sql/test_setup.sql           | 21 ++++++++++
 9 files changed, 126 insertions(+), 17 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 4895cee994..fc72dfdeab 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3071,8 +3071,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+	rows = compute_gather_rows(cheapest_partial_path);
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
 						   NULL, rowsp);
@@ -3090,7 +3089,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
-		rows = subpath->rows * subpath->parallel_workers;
+		rows = compute_gather_rows(subpath);
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, NULL, rowsp);
 		add_path(rel, &path->path);
@@ -3274,7 +3273,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 													subpath,
 													useful_pathkeys,
 													-1.0);
-				rows = subpath->rows * subpath->parallel_workers;
+				rows = compute_gather_rows(subpath);
 			}
 			else
 				subpath = (Path *) create_incremental_sort_path(root,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..c197d3f9e4 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -217,6 +217,25 @@ clamp_row_est(double nrows)
 	return nrows;
 }
 
+/*
+ * compute_gather_rows
+ *		Compute the number of rows for gather nodes.
+ *
+ * When creating a gather (merge) path, we need to estimate the sum of rows
+ * distributed to all workers. A worker will have an estimated row set to
+ * (rows / parallel_divisor). Since parallel_divisor may include the leader
+ * contribution, we can't simply multiply workers' rows by the number of
+ * parallel_workers and instead need to reuse the parallel_divisor to get a
+ * more accurate estimation.
+ */
+double
+compute_gather_rows(Path *partial_path)
+{
+	double		parallel_divisor = get_parallel_divisor(partial_path);
+
+	return clamp_row_est(partial_path->rows * parallel_divisor);
+}
+
 /*
  * clamp_width_est
  *		Force a tuple-width estimate to a sane value.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 4711f91239..c7aea3db9f 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -5370,8 +5370,8 @@ create_ordered_paths(PlannerInfo *root,
 																	root->sort_pathkeys,
 																	presorted_keys,
 																	limit_tuples);
-			total_groups = input_path->rows *
-				input_path->parallel_workers;
+			total_groups = compute_gather_rows(input_path);
+
 			sorted_path = (Path *)
 				create_gather_merge_path(root, ordered_rel,
 										 sorted_path,
@@ -7543,7 +7543,7 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 			(presorted_keys == 0 || !enable_incremental_sort))
 			continue;
 
-		total_groups = path->rows * path->parallel_workers;
+		total_groups = compute_gather_rows(path);
 
 		/*
 		 * We've no need to consider both a sort and incremental sort. We'll
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index b1c51a4e70..393fc8a9e5 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -212,5 +212,6 @@ extern PathTarget *set_pathtarget_cost_width(PlannerInfo *root, PathTarget *targ
 extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel,
 								   Path *bitmapqual, double loop_count,
 								   Cost *cost_p, double *tuples_p);
+extern double compute_gather_rows(Path *partial_path);
 
 #endif							/* COST_H */
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 262fa71ed8..4fc34a0e72 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -508,18 +508,17 @@ set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = on;
 explain (costs off)
   select count(*) from simple r join extremely_skewed s using (id);
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Finalize Aggregate
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
    ->  Gather
          Workers Planned: 1
-         ->  Partial Aggregate
-               ->  Parallel Hash Join
-                     Hash Cond: (r.id = s.id)
-                     ->  Parallel Seq Scan on simple r
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on extremely_skewed s
-(9 rows)
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
  count 
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 5a603f86b7..f95f882704 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -1328,4 +1328,43 @@ SELECT 1 FROM tenk1_vw_sec
                  Filter: (f1 < tenk1_vw_sec.unique1)
 (9 rows)
 
+-- test estimated rows in gather nodes with different numbers of workers
+EXPLAIN (COSTS OFF)
+SELECT * FROM tenk1 ORDER BY twenty;
+               QUERY PLAN               
+----------------------------------------
+ Gather Merge
+   Workers Planned: 4
+   ->  Sort
+         Sort Key: twenty
+         ->  Parallel Seq Scan on tenk1
+(5 rows)
+
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+ estimated 
+-----------
+     10000
+(1 row)
+
+set max_parallel_workers_per_gather=3;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+ estimated 
+-----------
+     10000
+(1 row)
+
+set max_parallel_workers_per_gather=2;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+ estimated 
+-----------
+     10000
+(1 row)
+
+set max_parallel_workers_per_gather=1;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+ estimated 
+-----------
+      9999
+(1 row)
+
 rollback;
diff --git a/src/test/regress/expected/test_setup.out b/src/test/regress/expected/test_setup.out
index 3d0eeec996..8f2d863b9c 100644
--- a/src/test/regress/expected/test_setup.out
+++ b/src/test/regress/expected/test_setup.out
@@ -239,3 +239,23 @@ create function fipshash(text)
     returns text
     strict immutable parallel safe leakproof
     return substr(encode(sha256($1::bytea), 'hex'), 1, 32);
+-- get the number of estimated rows in the top node
+create function get_estimated_rows(text) returns table (estimated int)
+language plpgsql as
+$$
+declare
+    ln text;
+    tmp text[];
+    first_row bool := true;
+begin
+    for ln in
+        execute format('explain %s', $1)
+    loop
+        if first_row then
+            first_row := false;
+            tmp := regexp_match(ln, 'rows=(\d*)');
+            return query select tmp[1]::int;
+        end if;
+    end loop;
+end;
+$$;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index c7df8f775c..b162cab7e9 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -510,4 +510,15 @@ EXPLAIN (COSTS OFF)
 SELECT 1 FROM tenk1_vw_sec
   WHERE (SELECT sum(f1) FROM int4_tbl WHERE f1 < unique1) < 100;
 
+-- test estimated rows in gather nodes with different numbers of workers
+EXPLAIN (COSTS OFF)
+SELECT * FROM tenk1 ORDER BY twenty;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+set max_parallel_workers_per_gather=3;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+set max_parallel_workers_per_gather=2;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+set max_parallel_workers_per_gather=1;
+SELECT * FROM get_estimated_rows('SELECT * FROM tenk1 ORDER BY twenty');
+
 rollback;
diff --git a/src/test/regress/sql/test_setup.sql b/src/test/regress/sql/test_setup.sql
index 06b0e2121f..937d1619c8 100644
--- a/src/test/regress/sql/test_setup.sql
+++ b/src/test/regress/sql/test_setup.sql
@@ -294,3 +294,24 @@ create function fipshash(text)
     returns text
     strict immutable parallel safe leakproof
     return substr(encode(sha256($1::bytea), 'hex'), 1, 32);
+
+-- get the number of estimated rows in the top node
+create function get_estimated_rows(text) returns table (estimated int)
+language plpgsql as
+$$
+declare
+    ln text;
+    tmp text[];
+    first_row bool := true;
+begin
+    for ln in
+        execute format('explain %s', $1)
+    loop
+        if first_row then
+            first_row := false;
+            tmp := regexp_match(ln, 'rows=(\d*)');
+            return query select tmp[1]::int;
+        end if;
+    end loop;
+end;
+$$;
-- 
2.39.3 (Apple Git-146)

#6Richard Guo
guofenglinux@gmail.com
In reply to: Anthonin Bonnefoy (#5)
1 attachment(s)
Re: Possible incorrect row estimation for Gather paths

On Mon, Jul 22, 2024 at 4:47 PM Anthonin Bonnefoy
<anthonin.bonnefoy@datadoghq.com> wrote:

On Wed, Jul 17, 2024 at 3:59 AM Richard Guo <guofenglinux@gmail.com> wrote:

I can reproduce this problem with the query below.

explain (costs on) select * from tenk1 order by twenty;
QUERY PLAN
---------------------------------------------------------------------------------
Gather Merge (cost=772.11..830.93 rows=5882 width=244)
Workers Planned: 1
-> Sort (cost=772.10..786.80 rows=5882 width=244)
Sort Key: twenty
-> Parallel Seq Scan on tenk1 (cost=0.00..403.82 rows=5882 width=244)
(5 rows)

I was looking for a test to add in the regress checks that wouldn't
rely on explain cost since it is disabled. However, I've realised I
could do something similar to what's done in stats_ext with the
check_estimated_rows function. I've added the get_estimated_rows test
function that extracts the estimated rows from the top node and uses
it to check the gather nodes' estimates. get_estimated_rows uses a
simple explain compared to check_estimated_rows which relies on an
explain analyze.

Hmm, I'm hesitant about adding the tests that verify the number of
estimated rows in this patch. The table 'tenk1' isn't created with
autovacuum_enabled = off, so we may have unstable results from
auto-analyze happening. I think the plan change in join_hash.out is
sufficient to verify the changes in this patch.

I wonder if the changes in create_ordered_paths should also be reduced
to 'total_groups = gather_rows_estimate(path);'.

It should already be the case with the patch v2. It does create
rounding errors that are visible in the tests but they shouldn't
exceed +1 or -1.

I think perhaps it's better to declare gather_rows_estimate() in
cost.h rather than optimizer.h.
(BTW, I wonder if compute_gather_rows() would be a better name?)

Good point, I've moved the declaration and renamed it.

I noticed another issue in generate_useful_gather_paths() -- *rowsp
would have a random value if override_rows is true and we use
incremental sort for gather merge. I think we should fix this too.

That seems to be the case. I've tried to find a query that could
trigger this codepath without success. All grouping and distinct paths
I've tried where fully sorted and didn't trigger an incremental sort.
I will need a bit more time to check this.

In the meantime, I've updated the patches with the review comments.

Otherwise I think the v3 patch looks good to me.

Attached is an updated version of this patch with cosmetic changes and
comment updates. It also squishes the two patches together into one.
I'm planning to push it soon, barring any objections or comments.

Thanks
Richard

Attachments:

v4-0001-Fix-rowcount-estimate-for-gather-merge-paths.patchapplication/octet-stream; name=v4-0001-Fix-rowcount-estimate-for-gather-merge-paths.patchDownload
From 0d770256af8056e1ef4051afbb65bdd7b201c216 Mon Sep 17 00:00:00 2001
From: Richard Guo <guofenglinux@gmail.com>
Date: Wed, 17 Jul 2024 15:31:11 +0900
Subject: [PATCH v4] Fix rowcount estimate for gather (merge) paths

In the case of a parallel plan, when computing the number of tuples
processed per worker, we divide the total number of tuples by the
parallel_divisor obtained from get_parallel_divisor(), which accounts
for the leader's contribution in addition to the number of workers.

Accordingly, when estimating the number of tuples for gather (merge)
nodes, we should multiply the number of tuples per worker by the same
parallel_divisor to reverse the division.  However, currently we use
parallel_workers rather than parallel_divisor for the multiplication.
This could result in an underestimation of the number of tuples for
gather (merge) nodes, especially when there are fewer than four
workers.

This patch fixes this issue by using the same parallel_divisor for the
multiplication.  There is one ensuing plan change in the regression
tests, but it looks reasonable and does not compromise its original
purpose of testing parallel-aware hash join.

In passing, this patch removes an unnecessary assignment for path.rows
in create_gather_merge_path, and fixes an uninitialized-variable issue
in generate_useful_gather_paths.

No backpatch as this could result in plan changes.

Author: Anthonin Bonnefoy
Reviewed-by: Rafia Sabih, Richard Guo
Discussion: https://postgr.es/m/CAO6_Xqr9+51NxgO=XospEkUeAg-p=EjAWmtpdcZwjRgGKJ53iA@mail.gmail.com
---
 src/backend/optimizer/path/allpaths.c   |  7 +++----
 src/backend/optimizer/path/costsize.c   | 18 ++++++++++++++++++
 src/backend/optimizer/plan/planner.c    |  7 ++-----
 src/backend/optimizer/util/pathnode.c   |  1 -
 src/include/optimizer/cost.h            |  1 +
 src/test/regress/expected/join_hash.out | 19 +++++++++----------
 6 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 4895cee994..b07b106938 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3071,8 +3071,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+	rows = compute_gather_rows(cheapest_partial_path);
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
 						   NULL, rowsp);
@@ -3090,7 +3089,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
-		rows = subpath->rows * subpath->parallel_workers;
+		rows = compute_gather_rows(subpath);
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, NULL, rowsp);
 		add_path(rel, &path->path);
@@ -3274,7 +3273,6 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 													subpath,
 													useful_pathkeys,
 													-1.0);
-				rows = subpath->rows * subpath->parallel_workers;
 			}
 			else
 				subpath = (Path *) create_incremental_sort_path(root,
@@ -3283,6 +3281,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 																useful_pathkeys,
 																presorted_keys,
 																-1);
+			rows = compute_gather_rows(subpath);
 			path = create_gather_merge_path(root, rel,
 											subpath,
 											rel->reltarget,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..353728874a 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -6499,3 +6499,21 @@ compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel,
 
 	return pages_fetched;
 }
+
+/*
+ * compute_gather_rows
+ *	  Estimate number of rows for gather (merge) nodes.
+ *
+ * In a parallel plan, each worker's row estimate is determined by dividing the
+ * total number of rows by parallel_divisor, which accounts for the leader's
+ * contribution in addition to the number of workers.  Accordingly, when
+ * estimating the number of rows for gather (merge) nodes, we multiply the rows
+ * per worker by the same parallel_divisor to undo the division.
+ */
+double
+compute_gather_rows(Path *path)
+{
+	Assert(path->parallel_workers > 0);
+
+	return clamp_row_est(path->rows * get_parallel_divisor(path));
+}
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 4711f91239..948afd9094 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -5370,8 +5370,7 @@ create_ordered_paths(PlannerInfo *root,
 																	root->sort_pathkeys,
 																	presorted_keys,
 																	limit_tuples);
-			total_groups = input_path->rows *
-				input_path->parallel_workers;
+			total_groups = compute_gather_rows(sorted_path);
 			sorted_path = (Path *)
 				create_gather_merge_path(root, ordered_rel,
 										 sorted_path,
@@ -7543,8 +7542,6 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 			(presorted_keys == 0 || !enable_incremental_sort))
 			continue;
 
-		total_groups = path->rows * path->parallel_workers;
-
 		/*
 		 * We've no need to consider both a sort and incremental sort. We'll
 		 * just do a sort if there are no presorted keys and an incremental
@@ -7561,7 +7558,7 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 														 groupby_pathkeys,
 														 presorted_keys,
 														 -1.0);
-
+		total_groups = compute_gather_rows(path);
 		path = (Path *)
 			create_gather_merge_path(root,
 									 rel,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index c42742d2c7..d1c4e1a6aa 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1899,7 +1899,6 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->num_workers = subpath->parallel_workers;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
-	pathnode->path.rows += subpath->rows;
 
 	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
 	{
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index b1c51a4e70..57861bfb44 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -212,5 +212,6 @@ extern PathTarget *set_pathtarget_cost_width(PlannerInfo *root, PathTarget *targ
 extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel,
 								   Path *bitmapqual, double loop_count,
 								   Cost *cost_p, double *tuples_p);
+extern double compute_gather_rows(Path *path);
 
 #endif							/* COST_H */
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 262fa71ed8..4fc34a0e72 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -508,18 +508,17 @@ set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = on;
 explain (costs off)
   select count(*) from simple r join extremely_skewed s using (id);
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Finalize Aggregate
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
    ->  Gather
          Workers Planned: 1
-         ->  Partial Aggregate
-               ->  Parallel Hash Join
-                     Hash Cond: (r.id = s.id)
-                     ->  Parallel Seq Scan on simple r
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on extremely_skewed s
-(9 rows)
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
  count 
-- 
2.43.0

#7Richard Guo
guofenglinux@gmail.com
In reply to: Richard Guo (#6)
Re: Possible incorrect row estimation for Gather paths

On Mon, Jul 22, 2024 at 5:55 PM Richard Guo <guofenglinux@gmail.com> wrote:

Otherwise I think the v3 patch looks good to me.

Attached is an updated version of this patch with cosmetic changes and
comment updates. It also squishes the two patches together into one.
I'm planning to push it soon, barring any objections or comments.

Pushed.

Thanks
Richard