diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 2d49a65..0982434 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -427,6 +427,8 @@ sql_fn_make_param(SQLFunctionParseInfoPtr pinfo, param->paramtypmod = -1; param->paramcollid = get_typcollation(param->paramtype); param->location = location; + param->parallel_safe = false; + param->is_correlated = false; /* * If we have a function input collation, allow it to override the diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 12324ab..5d57138 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1237,6 +1237,8 @@ _copyParam(const Param *from) COPY_SCALAR_FIELD(paramtypmod); COPY_SCALAR_FIELD(paramcollid); COPY_LOCATION_FIELD(location); + COPY_SCALAR_FIELD(parallel_safe); + COPY_SCALAR_FIELD(is_correlated); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 6d1dabe..46f7c5b 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -183,6 +183,8 @@ _equalParam(const Param *a, const Param *b) COMPARE_SCALAR_FIELD(paramtypmod); COMPARE_SCALAR_FIELD(paramcollid); COMPARE_LOCATION_FIELD(location); + COMPARE_SCALAR_FIELD(parallel_safe); + COMPARE_SCALAR_FIELD(is_correlated); return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index b3802b4..69e1919 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1016,6 +1016,8 @@ _outParam(StringInfo str, const Param *node) WRITE_INT_FIELD(paramtypmod); WRITE_OID_FIELD(paramcollid); WRITE_LOCATION_FIELD(location); + WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(is_correlated); } static void @@ -1616,6 +1618,7 @@ _outPathInfo(StringInfo str, const Path *node) outBitmapset(str, NULL); WRITE_BOOL_FIELD(parallel_aware); WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(contain_correl_param); WRITE_INT_FIELD(parallel_workers); WRITE_FLOAT_FIELD(rows, "%.0f"); WRITE_FLOAT_FIELD(startup_cost, "%.2f"); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index d2f69fe..2184f9e 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -536,6 +536,8 @@ _readParam(void) READ_INT_FIELD(paramtypmod); READ_OID_FIELD(paramcollid); READ_LOCATION_FIELD(location); + READ_BOOL_FIELD(parallel_safe); + READ_BOOL_FIELD(is_correlated); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 5c18987..5d11751 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -501,6 +501,9 @@ static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) { + bool contain_correl_param = false; + bool save_correl_param = false; + /* * The flag has previously been initialized to false, so we can just * return if it becomes clear that we can't safely set it. @@ -541,9 +544,9 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, char proparallel = func_parallel(rte->tablesample->tsmhandler); if (proparallel != PROPARALLEL_SAFE) - return; - if (!is_parallel_safe(root, (Node *) rte->tablesample->args)) - return; + goto parallel_unsafe; + if (!is_parallel_safe(root, (Node *) rte->tablesample->args, &contain_correl_param)) + goto parallel_unsafe; } /* @@ -558,9 +561,9 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, { Assert(rel->fdwroutine); if (!rel->fdwroutine->IsForeignScanParallelSafe) - return; + goto parallel_unsafe; if (!rel->fdwroutine->IsForeignScanParallelSafe(root, rel, rte)) - return; + goto parallel_unsafe; } /* @@ -591,18 +594,18 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_JOIN: /* Shouldn't happen; we're only considering baserels here. */ Assert(false); - return; + goto parallel_unsafe; case RTE_FUNCTION: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->functions)) - return; + if (!is_parallel_safe(root, (Node *) rte->functions, &contain_correl_param)) + goto parallel_unsafe; break; case RTE_VALUES: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->values_lists)) - return; + if (!is_parallel_safe(root, (Node *) rte->values_lists, &contain_correl_param)) + goto parallel_unsafe; break; case RTE_CTE: @@ -617,6 +620,8 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, return; } + save_correl_param = contain_correl_param; + /* * If there's anything in baserestrictinfo that's parallel-restricted, we * give up on parallelizing access to this relation. We could consider @@ -626,18 +631,23 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, * outer join clauses work correctly. It would likely break equivalence * classes, too. */ - if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo)) - return; + if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo, &contain_correl_param)) + goto parallel_unsafe; + + save_correl_param = save_correl_param || contain_correl_param; /* * Likewise, if the relation's outputs are not parallel-safe, give up. * (Usually, they're just Vars, but sometimes they're not.) */ - if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs)) - return; + if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs, &contain_correl_param)) + goto parallel_unsafe; /* We have a winner. */ rel->consider_parallel = true; + +parallel_unsafe: + rel->contain_correl_param = save_correl_param || contain_correl_param; } /* diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c index 3c58d05..ac797b3 100644 --- a/src/backend/optimizer/plan/planmain.c +++ b/src/backend/optimizer/plan/planmain.c @@ -76,8 +76,13 @@ query_planner(PlannerInfo *root, List *tlist, * the query tlist will be dealt with later.) */ if (root->glob->parallelModeOK) + { + bool contain_correl_param = false; + final_rel->consider_parallel = - is_parallel_safe(root, parse->jointree->quals); + is_parallel_safe(root, parse->jointree->quals, &contain_correl_param); + final_rel->contain_correl_param = contain_correl_param; + } /* The only path for it is a trivial Result path */ add_path(final_rel, (Path *) diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 881742f..caa3a4e 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -1394,6 +1394,8 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, RelOptInfo *current_rel; RelOptInfo *final_rel; ListCell *lc; + bool offset_correl_param = false; + bool count_correl_param = false; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -1814,7 +1816,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, * computed by partial paths. */ if (current_rel->partial_pathlist && - is_parallel_safe(root, (Node *) scanjoin_target->exprs)) + is_parallel_safe(root, (Node *) scanjoin_target->exprs, NULL)) { /* Apply the scan/join target to each partial path */ foreach(lc, current_rel->partial_pathlist) @@ -1951,11 +1953,20 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, * query. */ if (current_rel->consider_parallel && - is_parallel_safe(root, parse->limitOffset) && - is_parallel_safe(root, parse->limitCount)) + is_parallel_safe(root, parse->limitOffset, &offset_correl_param) && + is_parallel_safe(root, parse->limitCount, &count_correl_param)) final_rel->consider_parallel = true; /* + * mark final_rel to indicate that it contains correlated params if there + * is any reference to correlated param in input rel or limit clause. + */ + if (current_rel->contain_correl_param || + offset_correl_param || + count_correl_param) + final_rel->contain_correl_param = true; + + /* * If the current_rel belongs to a single FDW, so does the final_rel. */ final_rel->serverid = current_rel->serverid; @@ -3314,6 +3325,8 @@ create_grouping_paths(PlannerInfo *root, bool can_hash; bool can_sort; bool try_parallel_aggregation; + bool target_correl_param = false; + bool having_correl_param = false; ListCell *lc; @@ -3326,11 +3339,21 @@ create_grouping_paths(PlannerInfo *root, * target list and HAVING quals are parallel-safe. */ if (input_rel->consider_parallel && - is_parallel_safe(root, (Node *) target->exprs) && - is_parallel_safe(root, (Node *) parse->havingQual)) + is_parallel_safe(root, (Node *) target->exprs, &target_correl_param) && + is_parallel_safe(root, (Node *) parse->havingQual, &having_correl_param)) grouped_rel->consider_parallel = true; /* + * mark grouped_rel to indicate that it contains correlated params if + * there is any reference to correlated param in input rel, target list or + * having clause. + */ + if (input_rel->contain_correl_param || + target_correl_param || + having_correl_param) + grouped_rel->contain_correl_param = true; + + /* * If the input rel belongs to a single FDW, so does the grouped rel. */ grouped_rel->serverid = input_rel->serverid; @@ -3871,6 +3894,8 @@ create_window_paths(PlannerInfo *root, { RelOptInfo *window_rel; ListCell *lc; + bool target_correl_param = false; + bool activeWindows_correl_param = false; /* For now, do all work in the (WINDOW, NULL) upperrel */ window_rel = fetch_upper_rel(root, UPPERREL_WINDOW, NULL); @@ -3881,11 +3906,21 @@ create_window_paths(PlannerInfo *root, * target list and active windows for non-parallel-safe constructs. */ if (input_rel->consider_parallel && - is_parallel_safe(root, (Node *) output_target->exprs) && - is_parallel_safe(root, (Node *) activeWindows)) + is_parallel_safe(root, (Node *) output_target->exprs, &target_correl_param) && + is_parallel_safe(root, (Node *) activeWindows, &activeWindows_correl_param)) window_rel->consider_parallel = true; /* + * mark window_rel to indicate that it contains correlated params if there + * is any reference to correlated param in input rel, target list or + * activeWindows. + */ + if (input_rel->contain_correl_param || + target_correl_param || + activeWindows_correl_param) + window_rel->contain_correl_param = true; + + /* * If the input rel belongs to a single FDW, so does the window rel. */ window_rel->serverid = input_rel->serverid; @@ -4262,6 +4297,7 @@ create_ordered_paths(PlannerInfo *root, Path *cheapest_input_path = input_rel->cheapest_total_path; RelOptInfo *ordered_rel; ListCell *lc; + bool target_correl_param = false; /* For now, do all work in the (ORDERED, NULL) upperrel */ ordered_rel = fetch_upper_rel(root, UPPERREL_ORDERED, NULL); @@ -4272,10 +4308,17 @@ create_ordered_paths(PlannerInfo *root, * target list is parallel-safe. */ if (input_rel->consider_parallel && - is_parallel_safe(root, (Node *) target->exprs)) + is_parallel_safe(root, (Node *) target->exprs, &target_correl_param)) ordered_rel->consider_parallel = true; /* + * mark ordered_rel to indicate that it contains correlated params if + * there is any reference to correlated param in input rel or target list. + */ + if (input_rel->contain_correl_param || target_correl_param) + ordered_rel->contain_correl_param = true; + + /* * If the input rel belongs to a single FDW, so does the ordered_rel. */ ordered_rel->serverid = input_rel->serverid; diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 4130bba..41743e4 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -160,6 +160,10 @@ replace_outer_var(PlannerInfo *root, Var *var) retval->paramcollid = var->varcollid; retval->location = var->location; + /* mark correlated params as parallel unsafe */ + retval->parallel_safe = false; + retval->is_correlated = true; + return retval; } @@ -188,6 +192,10 @@ assign_nestloop_param_var(PlannerInfo *root, Var *var) retval->paramcollid = var->varcollid; retval->location = var->location; + /* mark correlated params as parallel unsafe */ + retval->parallel_safe = false; + retval->is_correlated = true; + return retval; } @@ -265,6 +273,10 @@ replace_outer_placeholdervar(PlannerInfo *root, PlaceHolderVar *phv) retval->paramcollid = exprCollation((Node *) phv->phexpr); retval->location = -1; + /* mark correlated params as parallel unsafe */ + retval->parallel_safe = false; + retval->is_correlated = true; + return retval; } @@ -292,6 +304,10 @@ assign_nestloop_param_placeholdervar(PlannerInfo *root, PlaceHolderVar *phv) retval->paramcollid = exprCollation((Node *) phv->phexpr); retval->location = -1; + /* mark correlated params as parallel unsafe */ + retval->parallel_safe = false; + retval->is_correlated = true; + return retval; } @@ -334,6 +350,10 @@ replace_outer_agg(PlannerInfo *root, Aggref *agg) retval->paramcollid = agg->aggcollid; retval->location = agg->location; + /* mark correlated params as parallel unsafe */ + retval->parallel_safe = false; + retval->is_correlated = true; + return retval; } @@ -376,6 +396,10 @@ replace_outer_grouping(PlannerInfo *root, GroupingFunc *grp) retval->paramcollid = InvalidOid; retval->location = grp->location; + /* mark correlated params as parallel unsafe */ + retval->parallel_safe = false; + retval->is_correlated = true; + return retval; } @@ -399,6 +423,8 @@ generate_new_param(PlannerInfo *root, Oid paramtype, int32 paramtypmod, retval->paramtypmod = paramtypmod; retval->paramcollid = paramcollation; retval->location = -1; + retval->parallel_safe = false; + retval->is_correlated = false; return retval; } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index c7be99c..27b6f85 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -93,6 +93,8 @@ typedef struct { char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ + bool contain_correl_param; /* true, if expr contains param for + * correlated var */ } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -1056,6 +1058,7 @@ max_parallel_hazard(Query *parse) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; + context.contain_correl_param = false; (void) max_parallel_hazard_walker((Node *) parse, &context); return context.max_hazard; } @@ -1068,9 +1071,10 @@ max_parallel_hazard(Query *parse) * result of max_parallel_hazard() on the whole query. */ bool -is_parallel_safe(PlannerInfo *root, Node *node) +is_parallel_safe(PlannerInfo *root, Node *node, bool *contain_correl_param) { max_parallel_hazard_context context; + bool parallel_hazard; /* * Even if the original querytree contained nothing unsafe, we need to @@ -1084,7 +1088,11 @@ is_parallel_safe(PlannerInfo *root, Node *node) /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; - return !max_parallel_hazard_walker(node, &context); + context.contain_correl_param = false; + parallel_hazard = max_parallel_hazard_walker(node, &context); + if (contain_correl_param) + *contain_correl_param = context.contain_correl_param; + return !parallel_hazard; } /* core logic for all parallel-hazard checks */ @@ -1186,13 +1194,13 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) return false; } - /* - * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted. - */ + /* only parallel-safe params can be passed to workers */ else if (IsA(node, Param)) { - if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + Param *prm = (Param *) node; + + context->contain_correl_param = prm->is_correlated; + if (!prm->parallel_safe) return true; } diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index f440875..43afa75 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -952,6 +952,7 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, required_outer); pathnode->parallel_aware = parallel_workers > 0 ? true : false; pathnode->parallel_safe = rel->consider_parallel; + pathnode->contain_correl_param = rel->contain_correl_param; pathnode->parallel_workers = parallel_workers; pathnode->pathkeys = NIL; /* seqscan has unordered result */ @@ -976,6 +977,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer required_outer); pathnode->parallel_aware = false; pathnode->parallel_safe = rel->consider_parallel; + pathnode->contain_correl_param = rel->contain_correl_param; pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* samplescan has unordered result */ @@ -1033,6 +1035,7 @@ create_index_path(PlannerInfo *root, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; @@ -1082,6 +1085,7 @@ create_bitmap_heap_path(PlannerInfo *root, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1118,6 +1122,7 @@ create_bitmap_and_path(PlannerInfo *root, */ pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1154,6 +1159,7 @@ create_bitmap_or_path(PlannerInfo *root, */ pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1183,6 +1189,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1215,6 +1222,7 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* result is always considered * unsorted */ @@ -1274,6 +1282,7 @@ create_merge_append_path(PlannerInfo *root, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; pathnode->subpaths = subpaths; @@ -1357,6 +1366,7 @@ create_result_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = NULL; /* there are no other rels... */ pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; pathnode->quals = resconstantqual; @@ -1398,6 +1408,8 @@ create_material_path(RelOptInfo *rel, Path *subpath) pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1463,6 +1475,8 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* @@ -1681,6 +1695,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = false; + pathnode->path.contain_correl_param = subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = NIL; /* Gather has unordered result */ @@ -1718,6 +1733,8 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; pathnode->subpath = subpath; @@ -1745,6 +1762,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, required_outer); pathnode->parallel_aware = false; pathnode->parallel_safe = rel->consider_parallel; + pathnode->contain_correl_param = rel->contain_correl_param; pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -1771,6 +1789,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel, required_outer); pathnode->parallel_aware = false; pathnode->parallel_safe = rel->consider_parallel; + pathnode->contain_correl_param = rel->contain_correl_param; pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -1796,6 +1815,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) required_outer); pathnode->parallel_aware = false; pathnode->parallel_safe = rel->consider_parallel; + pathnode->contain_correl_param = rel->contain_correl_param; pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* XXX for now, result is always unordered */ @@ -1822,6 +1842,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel, required_outer); pathnode->parallel_aware = false; pathnode->parallel_safe = rel->consider_parallel; + pathnode->contain_correl_param = rel->contain_correl_param; pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -1861,6 +1882,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, required_outer); pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.startup_cost = startup_cost; @@ -2001,6 +2023,8 @@ create_nestloop_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->path.contain_correl_param = joinrel->contain_correl_param || + outer_path->contain_correl_param || inner_path->contain_correl_param; /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->path.parallel_workers = outer_path->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -2064,6 +2088,8 @@ create_mergejoin_path(PlannerInfo *root, pathnode->jpath.path.parallel_aware = false; pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.contain_correl_param = joinrel->contain_correl_param || + outer_path->contain_correl_param || inner_path->contain_correl_param; /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2126,6 +2152,8 @@ create_hashjoin_path(PlannerInfo *root, pathnode->jpath.path.parallel_aware = false; pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.contain_correl_param = joinrel->contain_correl_param || + outer_path->contain_correl_param || inner_path->contain_correl_param; /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; @@ -2169,6 +2197,7 @@ create_projection_path(PlannerInfo *root, { ProjectionPath *pathnode = makeNode(ProjectionPath); PathTarget *oldtarget = subpath->pathtarget; + bool contain_correl_param = false; pathnode->path.pathtype = T_Result; pathnode->path.parent = rel; @@ -2178,7 +2207,9 @@ create_projection_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + is_parallel_safe(root, (Node *) target->exprs, &contain_correl_param); + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param || contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2259,6 +2290,8 @@ apply_projection_to_path(PlannerInfo *root, PathTarget *target) { QualCost oldcost; + bool contain_correl_param = false; + bool save_correl_param; /* * If given path can't project, we might need a Result node, so make a @@ -2278,6 +2311,8 @@ apply_projection_to_path(PlannerInfo *root, path->total_cost += target->cost.startup - oldcost.startup + (target->cost.per_tuple - oldcost.per_tuple) * path->rows; + save_correl_param = path->contain_correl_param; + /* * If the path happens to be a Gather path, we'd like to arrange for the * subpath to return the required target list so that workers can help @@ -2285,7 +2320,7 @@ apply_projection_to_path(PlannerInfo *root, * target expressions, then we can't. */ if (IsA(path, GatherPath) && - is_parallel_safe(root, (Node *) target->exprs)) + is_parallel_safe(root, (Node *) target->exprs, &contain_correl_param)) { GatherPath *gpath = (GatherPath *) path; @@ -2306,7 +2341,7 @@ apply_projection_to_path(PlannerInfo *root, target); } else if (path->parallel_safe && - !is_parallel_safe(root, (Node *) target->exprs)) + !is_parallel_safe(root, (Node *) target->exprs, &contain_correl_param)) { /* * We're inserting a parallel-restricted target list into a path @@ -2316,6 +2351,8 @@ apply_projection_to_path(PlannerInfo *root, path->parallel_safe = false; } + path->contain_correl_param = save_correl_param || contain_correl_param; + return path; } @@ -2337,6 +2374,7 @@ create_set_projection_path(PlannerInfo *root, ProjectSetPath *pathnode = makeNode(ProjectSetPath); double tlist_rows; ListCell *lc; + bool contain_correl_param = false; pathnode->path.pathtype = T_ProjectSet; pathnode->path.parent = rel; @@ -2346,7 +2384,9 @@ create_set_projection_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + is_parallel_safe(root, (Node *) target->exprs, &contain_correl_param); + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param || contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order XXX? */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2413,6 +2453,8 @@ create_sort_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -2458,6 +2500,8 @@ create_group_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* Group doesn't change sort ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2515,6 +2559,8 @@ create_upper_unique_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* Unique doesn't change the input ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2571,6 +2617,8 @@ create_agg_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; if (aggstrategy == AGG_SORTED) pathnode->path.pathkeys = subpath->pathkeys; /* preserves order */ @@ -2637,6 +2685,8 @@ create_groupingsets_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->subpath = subpath; @@ -2752,6 +2802,7 @@ create_minmaxagg_path(PlannerInfo *root, pathnode->path.parallel_aware = false; /* A MinMaxAggPath implies use of subplans, so cannot be parallel-safe */ pathnode->path.parallel_safe = false; + pathnode->path.contain_correl_param = rel->contain_correl_param; pathnode->path.parallel_workers = 0; /* Result is one unordered row */ pathnode->path.rows = 1; @@ -2810,6 +2861,8 @@ create_windowagg_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* WindowAgg preserves the input sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2878,6 +2931,8 @@ create_setop_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; /* SetOp preserves the input sort order if in sort mode */ pathnode->path.pathkeys = @@ -2937,6 +2992,8 @@ create_recursiveunion_path(PlannerInfo *root, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + leftpath->contain_correl_param || rightpath->contain_correl_param; /* Foolish, but we'll do it like joins for now: */ pathnode->path.parallel_workers = leftpath->parallel_workers; /* RecursiveUnion result is always unsorted */ @@ -2976,6 +3033,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = false; + pathnode->path.contain_correl_param = false; pathnode->path.parallel_workers = 0; pathnode->path.rows = subpath->rows; @@ -3047,6 +3105,7 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = false; + pathnode->path.contain_correl_param = false; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; @@ -3134,6 +3193,8 @@ create_limit_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = rel->consider_parallel && subpath->parallel_safe; + pathnode->path.contain_correl_param = rel->contain_correl_param || + subpath->contain_correl_param; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.rows = subpath->rows; pathnode->path.startup_cost = subpath->startup_cost; diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index adc1db9..f5548ed 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -352,6 +352,8 @@ build_join_rel(PlannerInfo *root, { RelOptInfo *joinrel; List *restrictlist; + bool restrict_correl_param = false; + bool target_correl_param = false; /* * See if we already have a joinrel for this set of base rels. @@ -527,11 +529,22 @@ build_join_rel(PlannerInfo *root, * here. */ if (inner_rel->consider_parallel && outer_rel->consider_parallel && - is_parallel_safe(root, (Node *) restrictlist) && - is_parallel_safe(root, (Node *) joinrel->reltarget->exprs)) + is_parallel_safe(root, (Node *) restrictlist, &restrict_correl_param) && + is_parallel_safe(root, (Node *) joinrel->reltarget->exprs, &target_correl_param)) joinrel->consider_parallel = true; /* + * mark join_rel to indicate that it contains correlated params if there + * is any reference to correlated param in inner rel or outer rel or quals + * or targetlist. + */ + if (inner_rel->contain_correl_param || + outer_rel->contain_correl_param || + restrict_correl_param || + target_correl_param) + joinrel->contain_correl_param = true; + + /* * Add the joinrel to the query's joinrel list, and store it into the * auxiliary hashtable if there is one. NB: GEQO requires us to append * the new joinrel to the end of the list! diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index efe1c37..a55721a 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -2051,5 +2051,7 @@ make_agg_arg(Oid argtype, Oid argcollation) argp->paramtypmod = -1; argp->paramcollid = argcollation; argp->location = -1; + argp->parallel_safe = false; + argp->is_correlated = false; return (Node *) argp; } diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 4b73272..15f3303 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -1590,6 +1590,8 @@ transformMultiAssignRef(ParseState *pstate, MultiAssignRef *maref) param->paramtypmod = exprTypmod((Node *) tle->expr); param->paramcollid = exprCollation((Node *) tle->expr); param->location = exprLocation((Node *) tle->expr); + param->parallel_safe = false; + param->is_correlated = false; return (Node *) param; } @@ -1947,6 +1949,8 @@ transformSubLink(ParseState *pstate, SubLink *sublink) param->paramtypmod = exprTypmod((Node *) tent->expr); param->paramcollid = exprCollation((Node *) tent->expr); param->location = -1; + param->parallel_safe = false; + param->is_correlated = false; right_list = lappend(right_list, param); } diff --git a/src/backend/parser/parse_param.c b/src/backend/parser/parse_param.c index 2575e02..b247bf8 100644 --- a/src/backend/parser/parse_param.c +++ b/src/backend/parser/parse_param.c @@ -117,6 +117,8 @@ fixed_paramref_hook(ParseState *pstate, ParamRef *pref) param->paramtypmod = -1; param->paramcollid = get_typcollation(param->paramtype); param->location = pref->location; + param->parallel_safe = false; + param->is_correlated = false; return (Node *) param; } @@ -170,6 +172,8 @@ variable_paramref_hook(ParseState *pstate, ParamRef *pref) param->paramtypmod = -1; param->paramcollid = get_typcollation(param->paramtype); param->location = pref->location; + param->parallel_safe = false; + param->is_correlated = false; return (Node *) param; } diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 235bc75..16860e9 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -225,6 +225,8 @@ typedef struct Param int32 paramtypmod; /* typmod value, if known */ Oid paramcollid; /* OID of collation, or InvalidOid if none */ int location; /* token location, or -1 if unknown */ + bool parallel_safe; /* OK to use as part of parallel plan? */ + bool is_correlated; /* is param a correlated var? */ } Param; /* diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 643be54..fa94c92 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -496,6 +496,8 @@ typedef struct RelOptInfo bool consider_startup; /* keep cheap-startup-cost paths? */ bool consider_param_startup; /* ditto, for parameterized paths? */ bool consider_parallel; /* consider parallel paths? */ + bool contain_correl_param; /* contains reference to correlated + * param? */ /* default result targetlist for Paths scanning this relation */ struct PathTarget *reltarget; /* list of Vars/Exprs, cost, width */ @@ -897,6 +899,8 @@ typedef struct Path bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + bool contain_correl_param; /* contain reference to correlated + * param */ int parallel_workers; /* desired # of workers; 0 = not * parallel */ @@ -1313,7 +1317,7 @@ typedef struct ProjectSetPath { Path path; Path *subpath; /* path representing input source */ -} ProjectSetPath; +} ProjectSetPath; /* * SortPath represents an explicit sort step diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index cc0d7b0..0b6ace5 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -61,7 +61,7 @@ extern bool contain_mutable_functions(Node *clause); extern bool contain_volatile_functions(Node *clause); extern bool contain_volatile_functions_not_nextval(Node *clause); extern char max_parallel_hazard(Query *parse); -extern bool is_parallel_safe(PlannerInfo *root, Node *node); +extern bool is_parallel_safe(PlannerInfo *root, Node *node, bool *contain_correl_param); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_leaked_vars(Node *clause); diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c index b25b3f1..8111b3c 100644 --- a/src/pl/plpgsql/src/pl_comp.c +++ b/src/pl/plpgsql/src/pl_comp.c @@ -1328,6 +1328,7 @@ make_datum_param(PLpgSQL_expr *expr, int dno, int location) ¶m->paramtypmod, ¶m->paramcollid); param->location = location; + param->parallel_safe = false; return (Node *) param; }