Allow parallel DISTINCT

Started by David Rowleyover 4 years ago11 messages
#1David Rowley
dgrowleyml@gmail.com
1 attachment(s)

Back in March 2016, e06a38965 added support for parallel aggregation.
IIRC, because it was fairly late in the release cycle, I dropped
parallel DISTINCT to reduce the scope a little. It's been on my list
of things to fix since then. I just didn't get around to it until
today.

The patch is just some plumbing work to connect all the correct paths
up to make it work. It's all fairly trivial.

I thought about refactoring things a bit more to get rid of the
additional calls to grouping_is_sortable() and grouping_is_hashable(),
but I just don't think it's worth making the code ugly for. We'll
only call them again if we're considering a parallel plan, in which
case it's most likely not a trivial query. Those functions are pretty
cheap anyway.

I understand that there's another patch in the September commitfest
that does some stuff with Parallel DISTINCT, but that goes about
things a completely different way by creating multiple queues to
distribute values by hash. I don't think there's any overlap here.
We'd likely want to still have the planner consider both methods if we
get that patch sometime.

David

Attachments:

parallel_distinct.patchapplication/octet-stream; name=parallel_distinct.patchDownload
diff --git a/src/backend/optimizer/README b/src/backend/optimizer/README
index 2339347c24..41c120e0cd 100644
--- a/src/backend/optimizer/README
+++ b/src/backend/optimizer/README
@@ -1015,6 +1015,7 @@ UPPERREL_SETOP		result of UNION/INTERSECT/EXCEPT, if any
 UPPERREL_PARTIAL_GROUP_AGG	result of partial grouping/aggregation, if any
 UPPERREL_GROUP_AGG	result of grouping/aggregation, if any
 UPPERREL_WINDOW		result of window functions, if any
+UPPERREL_PARTIAL_DISTINCT	result of partial "SELECT DISTINCT", if any
 UPPERREL_DISTINCT	result of "SELECT DISTINCT", if any
 UPPERREL_ORDERED	result of ORDER BY, if any
 UPPERREL_FINAL		result of any remaining top-level actions
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2cd691191c..d89d215133 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -187,8 +187,11 @@ static void create_one_window_path(PlannerInfo *root,
 								   PathTarget *output_target,
 								   WindowFuncLists *wflists,
 								   List *activeWindows);
+static void create_partial_distinct_paths(PlannerInfo *root,
+										  RelOptInfo *input_rel);
 static RelOptInfo *create_distinct_paths(PlannerInfo *root,
-										 RelOptInfo *input_rel);
+										 RelOptInfo *input_rel,
+										 bool parallel_paths);
 static RelOptInfo *create_ordered_paths(PlannerInfo *root,
 										RelOptInfo *input_rel,
 										PathTarget *target,
@@ -1570,6 +1573,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
 		 */
 		root->upper_targets[UPPERREL_FINAL] = final_target;
 		root->upper_targets[UPPERREL_ORDERED] = final_target;
+		root->upper_targets[UPPERREL_PARTIAL_DISTINCT] = sort_input_target;
 		root->upper_targets[UPPERREL_DISTINCT] = sort_input_target;
 		root->upper_targets[UPPERREL_WINDOW] = sort_input_target;
 		root->upper_targets[UPPERREL_GROUP_AGG] = grouping_target;
@@ -1619,8 +1623,15 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
 		 */
 		if (parse->distinctClause)
 		{
-			current_rel = create_distinct_paths(root,
-												current_rel);
+			RelOptInfo	   *distinct_rel;
+
+			/* handle normal paths */
+			distinct_rel = create_distinct_paths(root, current_rel, false);
+
+			/* handle partial paths */
+			create_partial_distinct_paths(root, current_rel);
+
+			current_rel = distinct_rel;
 		}
 	}							/* end of if (setOperations) */
 
@@ -4216,6 +4227,102 @@ create_one_window_path(PlannerInfo *root,
 	add_path(window_rel, path);
 }
 
+/*
+ * create_partial_distinct_paths
+ *
+ * Process 'input_rel' partial paths and add unique / aggregate paths to the
+ * UPPERREL_PARTIAL_DISTINCT rel.  For any paths created, add Gather /
+ * GatherMerge paths on top and add a final unique / aggregate path to remove
+ * any duplicates produced from	combining the results from parallel workers.
+ */
+static void
+create_partial_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel)
+{
+	RelOptInfo *partial_distinct_rel;
+	Query	   *parse;
+	List	   *distinctExprs;
+	double		numDistinctRows;
+	Path	   *cheapest_partial_path;
+	ListCell   *lc;
+
+	/* nothing to do when there are no partial paths in the input rel */
+	if (!input_rel->consider_parallel || input_rel->partial_pathlist == NIL)
+		return;
+
+	parse = root->parse;
+
+	/* can't do parallel DISTINCT ON */
+	if (parse->hasDistinctOn)
+		return;
+
+	partial_distinct_rel = fetch_upper_rel(root, UPPERREL_PARTIAL_DISTINCT, NULL);
+	partial_distinct_rel->reltarget = root->upper_targets[UPPERREL_PARTIAL_DISTINCT];
+	partial_distinct_rel->consider_parallel = input_rel->consider_parallel;
+
+	cheapest_partial_path = linitial(input_rel->partial_pathlist);
+
+	distinctExprs = get_sortgrouplist_exprs(parse->distinctClause,
+											parse->targetList);
+
+	/* estimate how many distinct rows we'll get from each worker */
+	numDistinctRows = estimate_num_groups(root, distinctExprs,
+										  cheapest_partial_path->rows,
+										  NULL, NULL);
+
+	/* first try adding unique paths atop of sorted paths */
+	if (grouping_is_sortable(parse->distinctClause))
+	{
+		foreach(lc, input_rel->partial_pathlist)
+		{
+			Path	   *path = (Path *)lfirst(lc);
+
+			if (pathkeys_contained_in(root->distinct_pathkeys, path->pathkeys))
+			{
+				add_partial_path(partial_distinct_rel, (Path *)
+					create_upper_unique_path(root,
+											 partial_distinct_rel,
+											 path,
+											 list_length(root->distinct_pathkeys),
+											 numDistinctRows));
+			}
+		}
+	}
+
+	/* now try hash aggregate paths, if enabled and hashing is possible */
+	if (enable_hashagg && grouping_is_hashable(parse->distinctClause))
+	{
+		add_partial_path(partial_distinct_rel, (Path *)
+						 create_agg_path(root,
+										 partial_distinct_rel,
+										 cheapest_partial_path,
+										 cheapest_partial_path->pathtarget,
+										 AGG_HASHED,
+										 AGGSPLIT_SIMPLE,
+										 parse->distinctClause,
+										 NIL,
+										 NULL,
+										 numDistinctRows));
+	}
+
+	/* Let extensions possibly add some more paths */
+	if (create_upper_paths_hook)
+		(*create_upper_paths_hook) (root, UPPERREL_PARTIAL_DISTINCT,
+									input_rel, partial_distinct_rel, NULL);
+
+	if (partial_distinct_rel->partial_pathlist != NIL)
+	{
+		generate_gather_paths(root, partial_distinct_rel, true);
+		set_cheapest(partial_distinct_rel);
+
+		/*
+		 * Finally, create Paths to distinctify the final result.  This
+		 * just requires removing any results that are duplicated due to
+		 * combining results from parallel workers.
+		 */
+		create_distinct_paths(root, partial_distinct_rel, true);
+	}
+}
+
 /*
  * create_distinct_paths
  *
@@ -4223,12 +4330,15 @@ create_one_window_path(PlannerInfo *root,
  *
  * input_rel: contains the source-data Paths
  *
+ * parallel_paths: true if we're processing a set of Gather/GatherMerge paths,
+ * false if we're processing normal paths.
+ *
  * Note: input paths should already compute the desired pathtarget, since
  * Sort/Unique won't project anything.
  */
 static RelOptInfo *
-create_distinct_paths(PlannerInfo *root,
-					  RelOptInfo *input_rel)
+create_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
+					  bool parallel_paths)
 {
 	Query	   *parse = root->parse;
 	Path	   *cheapest_input_path = input_rel->cheapest_total_path;
@@ -4392,19 +4502,27 @@ create_distinct_paths(PlannerInfo *root,
 				 errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
 
 	/*
-	 * If there is an FDW that's responsible for all baserels of the query,
-	 * let it consider adding ForeignPaths.
+	 * Skip calling the FDW method and the hook when doing parallel paths.  No
+	 * need to call it again as we should already have done so when working on
+	 * the serial paths.
 	 */
-	if (distinct_rel->fdwroutine &&
-		distinct_rel->fdwroutine->GetForeignUpperPaths)
-		distinct_rel->fdwroutine->GetForeignUpperPaths(root, UPPERREL_DISTINCT,
-													   input_rel, distinct_rel,
-													   NULL);
-
-	/* Let extensions possibly add some more paths */
-	if (create_upper_paths_hook)
-		(*create_upper_paths_hook) (root, UPPERREL_DISTINCT,
-									input_rel, distinct_rel, NULL);
+	if (!parallel_paths)
+	{
+		/*
+		 * If there is an FDW that's responsible for all baserels of the
+		 * query, let it consider adding ForeignPaths.
+		 */
+		if (distinct_rel->fdwroutine &&
+			distinct_rel->fdwroutine->GetForeignUpperPaths)
+			distinct_rel->fdwroutine->GetForeignUpperPaths(root, UPPERREL_DISTINCT,
+														   input_rel, distinct_rel,
+														   NULL);
+
+		/* Let extensions possibly add some more paths */
+		if (create_upper_paths_hook)
+			(*create_upper_paths_hook) (root, UPPERREL_DISTINCT,
+										input_rel, distinct_rel, NULL);
+	}
 
 	/* Now choose the best path(s) */
 	set_cheapest(distinct_rel);
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 6e068f2c8b..60afc2c4ae 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -71,6 +71,8 @@ typedef enum UpperRelationKind
 								 * any */
 	UPPERREL_GROUP_AGG,			/* result of grouping/aggregation, if any */
 	UPPERREL_WINDOW,			/* result of window functions, if any */
+	UPPERREL_PARTIAL_DISTINCT,	/* result of partial "SELECT DISTINCT", if
+								 * any */
 	UPPERREL_DISTINCT,			/* result of "SELECT DISTINCT", if any */
 	UPPERREL_ORDERED,			/* result of ORDER BY, if any */
 	UPPERREL_FINAL				/* result of any remaining top-level actions */
diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out
index 11c6f50fbf..8da8999954 100644
--- a/src/test/regress/expected/select_distinct.out
+++ b/src/test/regress/expected/select_distinct.out
@@ -210,6 +210,54 @@ DROP TABLE distinct_hash_1;
 DROP TABLE distinct_hash_2;
 DROP TABLE distinct_group_1;
 DROP TABLE distinct_group_2;
+-- Test parallel DISTINCT
+SET parallel_tuple_cost=0;
+SET parallel_setup_cost=0;
+SET min_parallel_table_scan_size=0;
+-- Ensure we get a parallel plan
+EXPLAIN (costs off)
+SELECT DISTINCT twenty FROM tenk1;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: twenty
+         ->  Gather
+               Workers Planned: 2
+               ->  HashAggregate
+                     Group Key: twenty
+                     ->  Parallel Seq Scan on tenk1
+(8 rows)
+
+-- Ensure the parallel plan produces the correct results
+SELECT DISTINCT twenty FROM tenk1;
+ twenty 
+--------
+      0
+      1
+      2
+      3
+      4
+      5
+      6
+      7
+      8
+      9
+     10
+     11
+     12
+     13
+     14
+     15
+     16
+     17
+     18
+     19
+(20 rows)
+
+RESET parallel_setup_cost;
+RESET parallel_tuple_cost;
+RESET min_parallel_table_scan_size;
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.
diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql
index 33102744eb..a589e08941 100644
--- a/src/test/regress/sql/select_distinct.sql
+++ b/src/test/regress/sql/select_distinct.sql
@@ -107,6 +107,22 @@ DROP TABLE distinct_hash_2;
 DROP TABLE distinct_group_1;
 DROP TABLE distinct_group_2;
 
+-- Test parallel DISTINCT
+SET parallel_tuple_cost=0;
+SET parallel_setup_cost=0;
+SET min_parallel_table_scan_size=0;
+
+-- Ensure we get a parallel plan
+EXPLAIN (costs off)
+SELECT DISTINCT twenty FROM tenk1;
+
+-- Ensure the parallel plan produces the correct results
+SELECT DISTINCT twenty FROM tenk1;
+
+RESET parallel_setup_cost;
+RESET parallel_tuple_cost;
+RESET min_parallel_table_scan_size;
+
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.
#2David Rowley
dgrowleyml@gmail.com
In reply to: David Rowley (#1)
Re: Allow parallel DISTINCT

On Wed, 11 Aug 2021 at 16:51, David Rowley <dgrowleyml@gmail.com> wrote:

The patch is just some plumbing work to connect all the correct paths
up to make it work. It's all fairly trivial.

I looked at this patch again and realise that it could be done a bit
better. For example, the previous version set the distinct_rel's FDW
fields twice, once when making the serial paths and once when
finalizing the partial paths.

I've now added two new functions; create_final_distinct_paths and
create_partial_distinct_paths. The responsibility of
create_distinct_paths has changed. Instead of it creating the
non-parallel DISTINCT paths, it calls the two new functions and also
takes charge of calling the create_upper_paths_hook for
UPPERREL_DISTINCT plus the FDW GetForeignUpperPaths() call. I think
this is nicer as I'd previously added a new parameter to
create_distinct_paths() so I could tell it not to call the hook as I
didn't want to call that twice on the same relation as it would no
doubt result in some plugin just creating the same paths again.

I've also changed my mind about the previous choice I'd made not to
call GetForeignUpperPaths for the UPPERREL_PARTIAL_DISTINCT. I now
think that's ok.

I think this is a fairly trivial patch that just does a bit of wiring
up of paths. Unless anyone has anything to say about it in the next
few days, I'll be looking at it again with intensions to push it.

David

#3Zhihong Yu
zyu@yugabyte.com
In reply to: David Rowley (#2)
Re: Allow parallel DISTINCT

On Mon, Aug 16, 2021 at 10:07 PM David Rowley <dgrowleyml@gmail.com> wrote:

On Wed, 11 Aug 2021 at 16:51, David Rowley <dgrowleyml@gmail.com> wrote:

The patch is just some plumbing work to connect all the correct paths
up to make it work. It's all fairly trivial.

I looked at this patch again and realise that it could be done a bit
better. For example, the previous version set the distinct_rel's FDW
fields twice, once when making the serial paths and once when
finalizing the partial paths.

I've now added two new functions; create_final_distinct_paths and
create_partial_distinct_paths. The responsibility of
create_distinct_paths has changed. Instead of it creating the
non-parallel DISTINCT paths, it calls the two new functions and also
takes charge of calling the create_upper_paths_hook for
UPPERREL_DISTINCT plus the FDW GetForeignUpperPaths() call. I think
this is nicer as I'd previously added a new parameter to
create_distinct_paths() so I could tell it not to call the hook as I
didn't want to call that twice on the same relation as it would no
doubt result in some plugin just creating the same paths again.

I've also changed my mind about the previous choice I'd made not to
call GetForeignUpperPaths for the UPPERREL_PARTIAL_DISTINCT. I now
think that's ok.

I think this is a fairly trivial patch that just does a bit of wiring
up of paths. Unless anyone has anything to say about it in the next
few days, I'll be looking at it again with intensions to push it.

David

Hi, David:

Can you attach updated patch so that we know more detail about the two new
functions; create_final_distinct_paths and
create_partial_distinct_paths ?

Thanks

#4David Rowley
dgrowleyml@gmail.com
In reply to: Zhihong Yu (#3)
1 attachment(s)
Re: Allow parallel DISTINCT

On Tue, 17 Aug 2021 at 20:07, Zhihong Yu <zyu@yugabyte.com> wrote:

Can you attach updated patch so that we know more detail about the two new functions; create_final_distinct_paths and
create_partial_distinct_paths ?

Must've fallen off in transit :)

David

Attachments:

parallel_distinct_v2.patchapplication/octet-stream; name=parallel_distinct_v2.patchDownload
diff --git a/src/backend/optimizer/README b/src/backend/optimizer/README
index 2339347c24..41c120e0cd 100644
--- a/src/backend/optimizer/README
+++ b/src/backend/optimizer/README
@@ -1015,6 +1015,7 @@ UPPERREL_SETOP		result of UNION/INTERSECT/EXCEPT, if any
 UPPERREL_PARTIAL_GROUP_AGG	result of partial grouping/aggregation, if any
 UPPERREL_GROUP_AGG	result of grouping/aggregation, if any
 UPPERREL_WINDOW		result of window functions, if any
+UPPERREL_PARTIAL_DISTINCT	result of partial "SELECT DISTINCT", if any
 UPPERREL_DISTINCT	result of "SELECT DISTINCT", if any
 UPPERREL_ORDERED	result of ORDER BY, if any
 UPPERREL_FINAL		result of any remaining top-level actions
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2cd691191c..1e42d75465 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -189,6 +189,12 @@ static void create_one_window_path(PlannerInfo *root,
 								   List *activeWindows);
 static RelOptInfo *create_distinct_paths(PlannerInfo *root,
 										 RelOptInfo *input_rel);
+static void create_partial_distinct_paths(PlannerInfo *root,
+										  RelOptInfo *input_rel,
+										  RelOptInfo *final_distinct_rel);
+static RelOptInfo *create_final_distinct_paths(PlannerInfo *root,
+											   RelOptInfo *input_rel,
+											   RelOptInfo *distinct_rel);
 static RelOptInfo *create_ordered_paths(PlannerInfo *root,
 										RelOptInfo *input_rel,
 										PathTarget *target,
@@ -1570,6 +1576,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
 		 */
 		root->upper_targets[UPPERREL_FINAL] = final_target;
 		root->upper_targets[UPPERREL_ORDERED] = final_target;
+		root->upper_targets[UPPERREL_PARTIAL_DISTINCT] = sort_input_target;
 		root->upper_targets[UPPERREL_DISTINCT] = sort_input_target;
 		root->upper_targets[UPPERREL_WINDOW] = sort_input_target;
 		root->upper_targets[UPPERREL_GROUP_AGG] = grouping_target;
@@ -4227,16 +4234,9 @@ create_one_window_path(PlannerInfo *root,
  * Sort/Unique won't project anything.
  */
 static RelOptInfo *
-create_distinct_paths(PlannerInfo *root,
-					  RelOptInfo *input_rel)
+create_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel)
 {
-	Query	   *parse = root->parse;
-	Path	   *cheapest_input_path = input_rel->cheapest_total_path;
 	RelOptInfo *distinct_rel;
-	double		numDistinctRows;
-	bool		allow_hash;
-	Path	   *path;
-	ListCell   *lc;
 
 	/* For now, do all work in the (DISTINCT, NULL) upperrel */
 	distinct_rel = fetch_upper_rel(root, UPPERREL_DISTINCT, NULL);
@@ -4258,6 +4258,184 @@ create_distinct_paths(PlannerInfo *root,
 	distinct_rel->useridiscurrent = input_rel->useridiscurrent;
 	distinct_rel->fdwroutine = input_rel->fdwroutine;
 
+	/* build distinct paths based on input_rel's pathlist */
+	create_final_distinct_paths(root, input_rel, distinct_rel);
+
+	/* now build distinct paths based on input_rel's partial_pathlist */
+	create_partial_distinct_paths(root, input_rel, distinct_rel);
+
+	/* Give a helpful error if we failed to create any paths */
+	if (distinct_rel->pathlist == NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("could not implement DISTINCT"),
+				 errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
+
+	/*
+	 * If there is an FDW that's responsible for all baserels of the query,
+	 * let it consider adding ForeignPaths.
+	 */
+	if (distinct_rel->fdwroutine &&
+		distinct_rel->fdwroutine->GetForeignUpperPaths)
+		distinct_rel->fdwroutine->GetForeignUpperPaths(root,
+													   UPPERREL_DISTINCT,
+													   input_rel,
+													   distinct_rel,
+													   NULL);
+
+	/* Let extensions possibly add some more paths */
+	if (create_upper_paths_hook)
+		(*create_upper_paths_hook) (root, UPPERREL_DISTINCT, input_rel,
+									distinct_rel, NULL);
+
+	/* Now choose the best path(s) */
+	set_cheapest(distinct_rel);
+
+	return distinct_rel;
+}
+
+/*
+ * create_partial_distinct_paths
+ *
+ * Process 'input_rel' partial paths and add unique/aggregate paths to the
+ * UPPERREL_PARTIAL_DISTINCT rel.  For paths created, add Gather/GatherMerge
+ * paths on top and add a final unique/aggregate path to remove any duplicate
+ * produced from combining rows from parallel workers.
+ */
+static void
+create_partial_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
+							  RelOptInfo *final_distinct_rel)
+{
+	RelOptInfo *partial_distinct_rel;
+	Query	   *parse;
+	List	   *distinctExprs;
+	double		numDistinctRows;
+	Path	   *cheapest_partial_path;
+	ListCell   *lc;
+
+	/* nothing to do when there are no partial paths in the input rel */
+	if (!input_rel->consider_parallel || input_rel->partial_pathlist == NIL)
+		return;
+
+	parse = root->parse;
+
+	/* can't do parallel DISTINCT ON */
+	if (parse->hasDistinctOn)
+		return;
+
+	partial_distinct_rel = fetch_upper_rel(root, UPPERREL_PARTIAL_DISTINCT,
+										   NULL);
+	partial_distinct_rel->reltarget = root->upper_targets[UPPERREL_PARTIAL_DISTINCT];
+	partial_distinct_rel->consider_parallel = input_rel->consider_parallel;
+
+	/*
+	 * If input_rel belongs to a single FDW, so does the partial_distinct_rel.
+	 */
+	partial_distinct_rel->serverid = input_rel->serverid;
+	partial_distinct_rel->userid = input_rel->userid;
+	partial_distinct_rel->useridiscurrent = input_rel->useridiscurrent;
+	partial_distinct_rel->fdwroutine = input_rel->fdwroutine;
+
+	cheapest_partial_path = linitial(input_rel->partial_pathlist);
+
+	distinctExprs = get_sortgrouplist_exprs(parse->distinctClause,
+											parse->targetList);
+
+	/* estimate how many distinct rows we'll get from each worker */
+	numDistinctRows = estimate_num_groups(root, distinctExprs,
+										  cheapest_partial_path->rows,
+										  NULL, NULL);
+
+	/* first try adding unique paths atop of sorted paths */
+	if (grouping_is_sortable(parse->distinctClause))
+	{
+		foreach(lc, input_rel->partial_pathlist)
+		{
+			Path	   *path = (Path *) lfirst(lc);
+
+			if (pathkeys_contained_in(root->distinct_pathkeys, path->pathkeys))
+			{
+				add_partial_path(partial_distinct_rel, (Path *)
+								 create_upper_unique_path(root,
+														  partial_distinct_rel,
+														  path,
+														  list_length(root->distinct_pathkeys),
+														  numDistinctRows));
+			}
+		}
+	}
+
+	/*
+	 * Now try hash aggregate paths, if enabled and hashing is possible. Since
+	 * we're not on the hook to ensure we do our best to create at least one
+	 * path here, we treat enable_hashagg as a hard off-switch rather than the
+	 * slightly softer variant in create_final_distinct_paths.
+	 */
+	if (enable_hashagg && grouping_is_hashable(parse->distinctClause))
+	{
+		add_partial_path(partial_distinct_rel, (Path *)
+						 create_agg_path(root,
+										 partial_distinct_rel,
+										 cheapest_partial_path,
+										 cheapest_partial_path->pathtarget,
+										 AGG_HASHED,
+										 AGGSPLIT_SIMPLE,
+										 parse->distinctClause,
+										 NIL,
+										 NULL,
+										 numDistinctRows));
+	}
+
+	/*
+	 * If there is an FDW that's responsible for all baserels of the query,
+	 * let it consider adding ForeignPaths.
+	 */
+	if (partial_distinct_rel->fdwroutine &&
+		partial_distinct_rel->fdwroutine->GetForeignUpperPaths)
+		partial_distinct_rel->fdwroutine->GetForeignUpperPaths(root,
+															   UPPERREL_PARTIAL_DISTINCT,
+															   input_rel,
+															   partial_distinct_rel,
+															   NULL);
+
+	/* Let extensions possibly add some more partial paths */
+	if (create_upper_paths_hook)
+		(*create_upper_paths_hook) (root, UPPERREL_PARTIAL_DISTINCT,
+									input_rel, partial_distinct_rel, NULL);
+
+	if (partial_distinct_rel->partial_pathlist != NIL)
+	{
+		generate_gather_paths(root, partial_distinct_rel, true);
+		set_cheapest(partial_distinct_rel);
+
+		/*
+		 * Finally, create paths to distinctify the final result.  This step
+		 * is needed to remove any duplicates due to combining rows from
+		 * parallel workers.
+		 */
+		create_final_distinct_paths(root, partial_distinct_rel,
+									final_distinct_rel);
+	}
+}
+
+/*
+ * create_final_distinct_paths
+ *		Create distinct paths in 'distinct_rel' based on 'input_rel' pathlist
+ *
+ * input_rel: contains the source-data paths
+ * distinct_rel: destination relation for storing created paths
+ */
+static RelOptInfo *
+create_final_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
+							RelOptInfo *distinct_rel)
+{
+	Query	   *parse = root->parse;
+	Path	   *cheapest_input_path = input_rel->cheapest_total_path;
+	double		numDistinctRows;
+	bool		allow_hash;
+	Path	   *path;
+	ListCell   *lc;
+
 	/* Estimate number of distinct rows there will be */
 	if (parse->groupClause || parse->groupingSets || parse->hasAggs ||
 		root->hasHavingQual)
@@ -4384,31 +4562,6 @@ create_distinct_paths(PlannerInfo *root,
 								 numDistinctRows));
 	}
 
-	/* Give a helpful error if we failed to find any implementation */
-	if (distinct_rel->pathlist == NIL)
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("could not implement DISTINCT"),
-				 errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
-
-	/*
-	 * If there is an FDW that's responsible for all baserels of the query,
-	 * let it consider adding ForeignPaths.
-	 */
-	if (distinct_rel->fdwroutine &&
-		distinct_rel->fdwroutine->GetForeignUpperPaths)
-		distinct_rel->fdwroutine->GetForeignUpperPaths(root, UPPERREL_DISTINCT,
-													   input_rel, distinct_rel,
-													   NULL);
-
-	/* Let extensions possibly add some more paths */
-	if (create_upper_paths_hook)
-		(*create_upper_paths_hook) (root, UPPERREL_DISTINCT,
-									input_rel, distinct_rel, NULL);
-
-	/* Now choose the best path(s) */
-	set_cheapest(distinct_rel);
-
 	return distinct_rel;
 }
 
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 6e068f2c8b..1abe233db2 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -71,6 +71,7 @@ typedef enum UpperRelationKind
 								 * any */
 	UPPERREL_GROUP_AGG,			/* result of grouping/aggregation, if any */
 	UPPERREL_WINDOW,			/* result of window functions, if any */
+	UPPERREL_PARTIAL_DISTINCT,	/* result of partial "SELECT DISTINCT", if any */
 	UPPERREL_DISTINCT,			/* result of "SELECT DISTINCT", if any */
 	UPPERREL_ORDERED,			/* result of ORDER BY, if any */
 	UPPERREL_FINAL				/* result of any remaining top-level actions */
diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out
index 11c6f50fbf..419ce4aed5 100644
--- a/src/test/regress/expected/select_distinct.out
+++ b/src/test/regress/expected/select_distinct.out
@@ -210,6 +210,73 @@ DROP TABLE distinct_hash_1;
 DROP TABLE distinct_hash_2;
 DROP TABLE distinct_group_1;
 DROP TABLE distinct_group_2;
+-- Test parallel DISTINCT
+SET parallel_tuple_cost=0;
+SET parallel_setup_cost=0;
+SET min_parallel_table_scan_size=0;
+-- Ensure we get a parallel plan
+EXPLAIN (costs off)
+SELECT DISTINCT four FROM tenk1;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: four
+         ->  Gather
+               Workers Planned: 2
+               ->  HashAggregate
+                     Group Key: four
+                     ->  Parallel Seq Scan on tenk1
+(8 rows)
+
+-- Ensure the parallel plan produces the correct results
+SELECT DISTINCT four FROM tenk1;
+ four 
+------
+    0
+    1
+    2
+    3
+(4 rows)
+
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL UNSAFE;
+-- Ensure we don't do parallel distinct with a parallel unsafe function
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: (distinct_func(1))
+         ->  Index Only Scan using tenk1_hundred on tenk1
+(4 rows)
+
+-- make the function parallel safe
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL SAFE;
+-- Ensure we do parallel distinct now that the function is parallel safe
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+                  QUERY PLAN                  
+----------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: (distinct_func(1))
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+RESET parallel_setup_cost;
+RESET parallel_tuple_cost;
+RESET min_parallel_table_scan_size;
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.
diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql
index 33102744eb..95f287a83f 100644
--- a/src/test/regress/sql/select_distinct.sql
+++ b/src/test/regress/sql/select_distinct.sql
@@ -107,6 +107,43 @@ DROP TABLE distinct_hash_2;
 DROP TABLE distinct_group_1;
 DROP TABLE distinct_group_2;
 
+-- Test parallel DISTINCT
+SET parallel_tuple_cost=0;
+SET parallel_setup_cost=0;
+SET min_parallel_table_scan_size=0;
+
+-- Ensure we get a parallel plan
+EXPLAIN (costs off)
+SELECT DISTINCT four FROM tenk1;
+
+-- Ensure the parallel plan produces the correct results
+SELECT DISTINCT four FROM tenk1;
+
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL UNSAFE;
+
+-- Ensure we don't do parallel distinct with a parallel unsafe function
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+
+-- make the function parallel safe
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL SAFE;
+
+-- Ensure we do parallel distinct now that the function is parallel safe
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+
+RESET parallel_setup_cost;
+RESET parallel_tuple_cost;
+RESET min_parallel_table_scan_size;
+
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.
#5Zhihong Yu
zyu@yugabyte.com
In reply to: David Rowley (#4)
Re: Allow parallel DISTINCT

On Tue, Aug 17, 2021 at 3:59 AM David Rowley <dgrowleyml@gmail.com> wrote:

On Tue, 17 Aug 2021 at 20:07, Zhihong Yu <zyu@yugabyte.com> wrote:

Can you attach updated patch so that we know more detail about the two

new functions; create_final_distinct_paths and

create_partial_distinct_paths ?

Must've fallen off in transit :)

David

Hi,
Since create_partial_distinct_paths() calls create_final_distinct_paths(),
I wonder if numDistinctRows can be passed to create_final_distinct_paths()
so that the latter doesn't need to call estimate_num_groups().

Cheers

#6David Rowley
dgrowleyml@gmail.com
In reply to: Zhihong Yu (#5)
Re: Allow parallel DISTINCT

On Wed, 18 Aug 2021 at 02:42, Zhihong Yu <zyu@yugabyte.com> wrote:

Since create_partial_distinct_paths() calls create_final_distinct_paths(), I wonder if numDistinctRows can be passed to create_final_distinct_paths() so that the latter doesn't need to call estimate_num_groups().

That can't be done. The two calls to estimate_num_groups() are passing
in a different number of input rows. In
create_partial_distinct_paths() the number of rows is the number of
expected input rows from a partial path. In
create_final_distinct_paths() when called to complete the final
distinct step, that's the number of distinct values multiplied by the
number of workers.

It might be more possible to do something like cache the value of
distinctExprs, but I just don't feel the need. If there are partial
paths in the input_rel then it's most likely that planning time is not
going to dominate much between planning and execution. Also, if we
were to calculate the value of distinctExprs in create_distinct_paths
always, then we might end up calculating it for nothing as
create_final_distinct_paths() does not always need it. I don't feel
the need to clutter up the code by doing any lazy calculating of it
either.

David

#7Zhihong Yu
zyu@yugabyte.com
In reply to: David Rowley (#6)
Re: Allow parallel DISTINCT

On Tue, Aug 17, 2021 at 1:47 PM David Rowley <dgrowleyml@gmail.com> wrote:

On Wed, 18 Aug 2021 at 02:42, Zhihong Yu <zyu@yugabyte.com> wrote:

Since create_partial_distinct_paths() calls

create_final_distinct_paths(), I wonder if numDistinctRows can be passed to
create_final_distinct_paths() so that the latter doesn't need to call
estimate_num_groups().

That can't be done. The two calls to estimate_num_groups() are passing
in a different number of input rows. In
create_partial_distinct_paths() the number of rows is the number of
expected input rows from a partial path. In
create_final_distinct_paths() when called to complete the final
distinct step, that's the number of distinct values multiplied by the
number of workers.

It might be more possible to do something like cache the value of
distinctExprs, but I just don't feel the need. If there are partial
paths in the input_rel then it's most likely that planning time is not
going to dominate much between planning and execution. Also, if we
were to calculate the value of distinctExprs in create_distinct_paths
always, then we might end up calculating it for nothing as
create_final_distinct_paths() does not always need it. I don't feel
the need to clutter up the code by doing any lazy calculating of it
either.

David

Hi,
Thanks for your explanation.

The patch is good from my point of view.

#8David Rowley
dgrowleyml@gmail.com
In reply to: Zhihong Yu (#7)
Re: Allow parallel DISTINCT

On Wed, 18 Aug 2021 at 08:50, Zhihong Yu <zyu@yugabyte.com> wrote:

The patch is good from my point of view.

Thanks for the review. I looked over the patch again and the only
thing I adjusted was the order of the RESETs in the regression tests.

I left the " if (distinct_rel->pathlist == NIL)" ERROR case check so
that the ERROR is raised before we call the FDW function and hook
function to add more paths. Part of me thinks it should probably go
afterwards, but I didn't want to change the behaviour there. The
other part of me thinks that if you can't do distinct by sorting or
hashing then there's not much hope for the hook to add any paths
either.

I've pushed this to master now.

David

#9Tom Lane
tgl@sss.pgh.pa.us
In reply to: David Rowley (#8)
Re: Allow parallel DISTINCT

David Rowley <dgrowleyml@gmail.com> writes:

I've pushed this to master now.

... and the buildfarm is pushing back, eg

https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=hoverfly&amp;dt=2021-08-22%2011%3A31%3A45

diff -U3 /scratch/nm/farm/xlc64v16/HEAD/pgsql.build/src/test/regress/expected/select_distinct.out /scratch/nm/farm/xlc64v16/HEAD/pgsql.build/src/test/regress/results/select_distinct.out
--- /scratch/nm/farm/xlc64v16/HEAD/pgsql.build/src/test/regress/expected/select_distinct.out	2021-08-22 11:26:22.000000000 +0000
+++ /scratch/nm/farm/xlc64v16/HEAD/pgsql.build/src/test/regress/results/select_distinct.out	2021-08-22 11:38:43.000000000 +0000
@@ -223,7 +223,7 @@
    ->  Sort
          Sort Key: four
          ->  Gather
-               Workers Planned: 2
+               Workers Planned: 5
                ->  HashAggregate
                      Group Key: four
                      ->  Parallel Seq Scan on tenk1
@@ -270,7 +270,7 @@
    ->  Sort
          Sort Key: (distinct_func(1))
          ->  Gather
-               Workers Planned: 2
+               Workers Planned: 5
                ->  Parallel Seq Scan on tenk1
 (6 rows)

regards, tom lane

#10David Rowley
dgrowleyml@gmail.com
In reply to: Tom Lane (#9)
Re: Allow parallel DISTINCT

On Mon, 23 Aug 2021 at 01:58, Tom Lane <tgl@sss.pgh.pa.us> wrote:

David Rowley <dgrowleyml@gmail.com> writes:

I've pushed this to master now.

... and the buildfarm is pushing back, eg

Thanks. I pushed a fix for that.

David

#11Tom Lane
tgl@sss.pgh.pa.us
In reply to: David Rowley (#10)
Re: Allow parallel DISTINCT

David Rowley <dgrowleyml@gmail.com> writes:

Thanks. I pushed a fix for that.

Yeah, I saw your commit just after complaining. Sorry for the noise.

regards, tom lane