From ba785f2abca7c9f5199f0fc27c3b71f6d1d8010b Mon Sep 17 00:00:00 2001
From: jcoleman <jtc331@gmail.com>
Date: Fri, 21 Jan 2022 22:38:46 -0500
Subject: [PATCH v4 2/4] Parallelize correlated subqueries

When params are provided at the current query level (i.e., are generated
within a single worker and not shared across workers) we can safely
execute these in parallel.

Alternative approach using just relids subset check
---
 doc/src/sgml/parallel.sgml                    |   3 +-
 src/backend/optimizer/path/allpaths.c         |  18 ++-
 src/backend/optimizer/path/joinpath.c         |  16 ++-
 src/backend/optimizer/util/clauses.c          |   3 +
 src/backend/optimizer/util/pathnode.c         |   2 +
 src/backend/utils/misc/guc.c                  |   1 +
 src/include/nodes/pathnodes.h                 |   2 +-
 .../regress/expected/incremental_sort.out     |  28 ++--
 src/test/regress/expected/select_parallel.out | 125 ++++++++++++++++++
 src/test/regress/sql/select_parallel.sql      |  25 ++++
 10 files changed, 197 insertions(+), 26 deletions(-)

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 13479d7e5e..2d924dd2ac 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -517,7 +517,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
 
     <listitem>
       <para>
-        Plan nodes that reference a correlated <literal>SubPlan</literal>.
+        Plan nodes that reference a correlated <literal>SubPlan</literal> where
+        the result is shared between workers.
       </para>
     </listitem>
   </itemizedlist>
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 6a581e20fa..776f002054 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -556,7 +556,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	 * (see grouping_planner).
 	 */
 	if (rel->reloptkind == RELOPT_BASEREL &&
-		bms_membership(root->all_baserels) != BMS_SINGLETON)
+		bms_membership(root->all_baserels) != BMS_SINGLETON
+		&& (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY))
 		generate_useful_gather_paths(root, rel, false);
 
 	/* Now find the cheapest of the paths for this rel */
@@ -2700,7 +2701,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
-						   NULL, rowsp);
+						   rel->lateral_relids, rowsp);
 	add_path(rel, simple_gather_path);
 
 	/*
@@ -2717,7 +2718,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 
 		rows = subpath->rows * subpath->parallel_workers;
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
-										subpath->pathkeys, NULL, rowsp);
+										subpath->pathkeys, rel->lateral_relids, rowsp);
 		add_path(rel, &path->path);
 	}
 }
@@ -2819,11 +2820,15 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 	double	   *rowsp = NULL;
 	List	   *useful_pathkeys_list = NIL;
 	Path	   *cheapest_partial_path = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -2895,7 +2900,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 												tmp,
 												rel->reltarget,
 												tmp->pathkeys,
-												NULL,
+												required_outer,
 												rowsp);
 
 				add_path(rel, &path->path);
@@ -2929,7 +2934,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 												tmp,
 												rel->reltarget,
 												tmp->pathkeys,
-												NULL,
+												required_outer,
 												rowsp);
 
 				add_path(rel, &path->path);
@@ -3108,7 +3113,8 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels)
 			/*
 			 * Except for the topmost scan/join rel, consider gathering
 			 * partial paths.  We'll do the same for the topmost scan/join rel
-			 * once we know the final targetlist (see grouping_planner).
+			 * once we know the final targetlist (see
+			 * apply_scanjoin_target_to_paths).
 			 */
 			if (lev < levels_needed)
 				generate_useful_gather_paths(root, rel, false);
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index f96fc9fd28..e85b5449ea 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1791,16 +1791,24 @@ match_unsorted_outer(PlannerInfo *root,
 	 * partial path and the joinrel is parallel-safe.  However, we can't
 	 * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and
 	 * therefore we won't be able to properly guarantee uniqueness.  Nor can
-	 * we handle joins needing lateral rels, since partial paths must not be
-	 * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT,
-	 * because they can produce false null extended rows.
+	 * we handle JOIN_FULL and JOIN_RIGHT, because they can produce false null
+	 * extended rows.
+	 *
+	 * Partial paths may only have parameters in limited cases
+	 * where the parameterization is fully satisfied without sharing state
+	 * between workers, so we only allow lateral rels on inputs to the join
+	 * if the resulting join contains no lateral rels, the inner rel's laterals
+	 * are fully satisfied by the outer rel, and the outer rel doesn't depend
+	 * on the inner rel to produce any laterals.
 	 */
 	if (joinrel->consider_parallel &&
 		save_jointype != JOIN_UNIQUE_OUTER &&
 		save_jointype != JOIN_FULL &&
 		save_jointype != JOIN_RIGHT &&
 		outerrel->partial_pathlist != NIL &&
-		bms_is_empty(joinrel->lateral_relids))
+		bms_is_empty(joinrel->lateral_relids) &&
+		bms_is_subset(innerrel->lateral_relids, outerrel->relids) &&
+		(bms_is_empty(outerrel->lateral_relids) || !bms_is_subset(outerrel->lateral_relids, innerrel->relids)))
 	{
 		if (nestjoinOK)
 			consider_parallel_nestloop(root, joinrel, outerrel, innerrel,
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index a707dc9f26..f0002f6887 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -822,6 +822,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (param->paramkind == PARAM_EXTERN)
 			return false;
 
+		if (param->paramkind == PARAM_EXEC)
+			return false;
+
 		if (param->paramkind != PARAM_EXEC ||
 			!list_member_int(context->safe_param_ids, param->paramid))
 		{
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 5c32c96b71..144b2c485d 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -2418,6 +2418,8 @@ create_nestloop_path(PlannerInfo *root,
 	NestPath   *pathnode = makeNode(NestPath);
 	Relids		inner_req_outer = PATH_REQ_OUTER(inner_path);
 
+	/* TODO: Assert lateral relids subset safety? */
+
 	/*
 	 * If the inner path is parameterized by the outer, we must drop any
 	 * restrict_clauses that are due to be moved into the inner path.  We have
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index effb9d03a0..52cd3512b3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -58,6 +58,7 @@
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "optimizer/cost.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 1f33fe13c1..75681d6fb9 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -2478,7 +2478,7 @@ typedef struct MinMaxAggInfo
  * for conflicting purposes.
  *
  * In addition, PARAM_EXEC slots are assigned for Params representing outputs
- * from subplans (values that are setParam items for those subplans).  These
+ * from subplans (values that are setParam items for those subplans). [TODO: is this true, or only for init plans?]  These
  * IDs need not be tracked via PlannerParamItems, since we do not need any
  * duplicate-elimination nor later processing of the represented expressions.
  * Instead, we just record the assignment of the slot number by appending to
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 545e301e48..8f9ca05e60 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1614,16 +1614,16 @@ from tenk1 t, generate_series(1, 1000);
                                    QUERY PLAN                                    
 ---------------------------------------------------------------------------------
  Unique
-   ->  Sort
-         Sort Key: t.unique1, ((SubPlan 1))
-         ->  Gather
-               Workers Planned: 2
+   ->  Gather Merge
+         Workers Planned: 2
+         ->  Sort
+               Sort Key: t.unique1, ((SubPlan 1))
                ->  Nested Loop
                      ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                      ->  Function Scan on generate_series
-               SubPlan 1
-                 ->  Index Only Scan using tenk1_unique1 on tenk1
-                       Index Cond: (unique1 = t.unique1)
+                     SubPlan 1
+                       ->  Index Only Scan using tenk1_unique1 on tenk1
+                             Index Cond: (unique1 = t.unique1)
 (11 rows)
 
 explain (costs off) select
@@ -1633,16 +1633,16 @@ from tenk1 t, generate_series(1, 1000)
 order by 1, 2;
                                 QUERY PLAN                                 
 ---------------------------------------------------------------------------
- Sort
-   Sort Key: t.unique1, ((SubPlan 1))
-   ->  Gather
-         Workers Planned: 2
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: t.unique1, ((SubPlan 1))
          ->  Nested Loop
                ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                ->  Function Scan on generate_series
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on tenk1
-                 Index Cond: (unique1 = t.unique1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on tenk1
+                       Index Cond: (unique1 = t.unique1)
 (10 rows)
 
 -- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 2303f70d6e..124fe9fec5 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -311,6 +311,131 @@ select count(*) from tenk1 where (two, four) not in
  10000
 (1 row)
 
+-- test parallel plans for queries containing correlated subplans
+-- where the subplan only needs params available from the current
+-- worker's scan.
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t, generate_series(1, 10);
+                                 QUERY PLAN                                 
+----------------------------------------------------------------------------
+ Gather
+   Output: ((SubPlan 1))
+   Workers Planned: 4
+   ->  Nested Loop
+         Output: (SubPlan 1)
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+               Output: t.unique1
+         ->  Function Scan on pg_catalog.generate_series
+               Output: generate_series.generate_series
+               Function Call: generate_series(1, 10)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(14 rows)
+
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: ((SubPlan 1))
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(9 rows)
+
+explain (analyze, costs off, summary off, verbose, timing off) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t
+  limit 1;
+                                             QUERY PLAN                                             
+----------------------------------------------------------------------------------------------------
+ Limit (actual rows=1 loops=1)
+   Output: ((SubPlan 1))
+   ->  Gather (actual rows=1 loops=1)
+         Output: ((SubPlan 1))
+         Workers Planned: 4
+         Workers Launched: 4
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t (actual rows=1 loops=5)
+               Output: (SubPlan 1)
+               Heap Fetches: 0
+               Worker 0:  actual rows=1 loops=1
+               Worker 1:  actual rows=1 loops=1
+               Worker 2:  actual rows=1 loops=1
+               Worker 3:  actual rows=1 loops=1
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1 (actual rows=1 loops=5)
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+                       Heap Fetches: 0
+                       Worker 0:  actual rows=1 loops=1
+                       Worker 1:  actual rows=1 loops=1
+                       Worker 2:  actual rows=1 loops=1
+                       Worker 3:  actual rows=1 loops=1
+(22 rows)
+
+explain (costs off, verbose) select t.unique1
+  from tenk1 t
+  where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: t.unique1
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+         Filter: (t.unique1 = (SubPlan 1))
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(10 rows)
+
+explain (costs off, verbose) select *
+  from tenk1 t
+  order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+                                                                                                QUERY PLAN                                                                                                
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Gather Merge
+   Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+   Workers Planned: 4
+   ->  Sort
+         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+         Sort Key: ((SubPlan 1))
+         ->  Parallel Seq Scan on public.tenk1 t
+               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+(12 rows)
+
+-- test subplan in join/lateral join
+explain (costs off, verbose, timing off) select t.unique1, l.*
+  from tenk1 t
+  join lateral (
+    select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0)
+  ) l on true;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: t.unique1, ((SubPlan 1))
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1, (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(9 rows)
+
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
 explain (costs off)
 	select * from tenk1 where (unique1 + random())::integer not in
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 019e17e751..c49799b6d4 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -111,6 +111,31 @@ explain (costs off)
 	(select hundred, thousand from tenk2 where thousand > 100);
 select count(*) from tenk1 where (two, four) not in
 	(select hundred, thousand from tenk2 where thousand > 100);
+-- test parallel plans for queries containing correlated subplans
+-- where the subplan only needs params available from the current
+-- worker's scan.
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t, generate_series(1, 10);
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t;
+explain (analyze, costs off, summary off, verbose, timing off) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t
+  limit 1;
+explain (costs off, verbose) select t.unique1
+  from tenk1 t
+  where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+explain (costs off, verbose) select *
+  from tenk1 t
+  order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+-- test subplan in join/lateral join
+explain (costs off, verbose, timing off) select t.unique1, l.*
+  from tenk1 t
+  join lateral (
+    select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0)
+  ) l on true;
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
 explain (costs off)
 	select * from tenk1 where (unique1 + random())::integer not in
-- 
2.17.1

