diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 0a1bb9d..7a782c9 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -427,6 +427,7 @@ sql_fn_make_param(SQLFunctionParseInfoPtr pinfo, param->paramtypmod = -1; param->paramcollid = get_typcollation(param->paramtype); param->location = location; + param->parallel_safe = false; /* * If we have a function input collation, allow it to override the diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 77ef898..d16f4cb 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1219,6 +1219,7 @@ _copyParam(const Param *from) COPY_SCALAR_FIELD(paramtypmod); COPY_SCALAR_FIELD(paramcollid); COPY_LOCATION_FIELD(location); + COPY_SCALAR_FIELD(parallel_safe); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 266519f..0f52fa3 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -183,6 +183,7 @@ _equalParam(const Param *a, const Param *b) COMPARE_SCALAR_FIELD(paramtypmod); COMPARE_SCALAR_FIELD(paramcollid); COMPARE_LOCATION_FIELD(location); + COMPARE_SCALAR_FIELD(parallel_safe); return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 2389a11..db2149c 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1006,6 +1006,7 @@ _outParam(StringInfo str, const Param *node) WRITE_INT_FIELD(paramtypmod); WRITE_OID_FIELD(paramcollid); WRITE_LOCATION_FIELD(location); + WRITE_BOOL_FIELD(parallel_safe); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index d74eff3..671bd20 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -533,6 +533,7 @@ _readParam(void) READ_INT_FIELD(paramtypmod); READ_OID_FIELD(paramcollid); READ_LOCATION_FIELD(location); + READ_BOOL_FIELD(parallel_safe); READ_DONE(); } diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 32ce254..8e0f517 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -160,6 +160,9 @@ replace_outer_var(PlannerInfo *root, Var *var) retval->paramcollid = var->varcollid; retval->location = var->location; + /* mark vars from same or immediate outer level as parallel safe */ + retval->parallel_safe = (var->varlevelsup <= 1) ? true : false; + return retval; } @@ -188,6 +191,9 @@ assign_nestloop_param_var(PlannerInfo *root, Var *var) retval->paramcollid = var->varcollid; retval->location = var->location; + /* mark vars from same or immediate outer level as parallel safe */ + retval->parallel_safe = (var->varlevelsup <= 1) ? true : false; + return retval; } @@ -265,6 +271,9 @@ replace_outer_placeholdervar(PlannerInfo *root, PlaceHolderVar *phv) retval->paramcollid = exprCollation((Node *) phv->phexpr); retval->location = -1; + /* mark phvs from same or immediate outer level as parallel safe */ + retval->parallel_safe = (phv->phlevelsup <= 1) ? true : false; + return retval; } @@ -292,6 +301,9 @@ assign_nestloop_param_placeholdervar(PlannerInfo *root, PlaceHolderVar *phv) retval->paramcollid = exprCollation((Node *) phv->phexpr); retval->location = -1; + /* mark phvs from same or immediate outer level as parallel safe */ + retval->parallel_safe = (phv->phlevelsup <= 1) ? true : false; + return retval; } @@ -334,6 +346,9 @@ replace_outer_agg(PlannerInfo *root, Aggref *agg) retval->paramcollid = agg->aggcollid; retval->location = agg->location; + /* mark aggrefs from same or immediate outer level as parallel safe */ + retval->parallel_safe = (agg->agglevelsup <= 1) ? true : false; + return retval; } @@ -376,6 +391,9 @@ replace_outer_grouping(PlannerInfo *root, GroupingFunc *grp) retval->paramcollid = InvalidOid; retval->location = grp->location; + /* mark aggs from same or immediate outer level as parallel safe */ + retval->parallel_safe = (grp->agglevelsup <= 1) ? true : false; + return retval; } @@ -399,6 +417,7 @@ generate_new_param(PlannerInfo *root, Oid paramtype, int32 paramtypmod, retval->paramtypmod = paramtypmod; retval->paramcollid = paramcollation; retval->location = -1; + retval->parallel_safe = false; return retval; } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 60b9b92..997e6d8 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -1264,13 +1264,10 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) return false; } - /* - * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted. - */ + /* only parallel-safe params can be passed to workers */ else if (IsA(node, Param)) { - if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) + if (!((Param *) node)->parallel_safe) return true; } diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index d977709..e41cc44 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -2051,5 +2051,6 @@ make_agg_arg(Oid argtype, Oid argcollation) argp->paramtypmod = -1; argp->paramcollid = argcollation; argp->location = -1; + argp->parallel_safe = false; return (Node *) argp; } diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index f62e45f..45a475f 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -1592,6 +1592,7 @@ transformMultiAssignRef(ParseState *pstate, MultiAssignRef *maref) param->paramtypmod = exprTypmod((Node *) tle->expr); param->paramcollid = exprCollation((Node *) tle->expr); param->location = exprLocation((Node *) tle->expr); + param->parallel_safe = false; return (Node *) param; } @@ -1949,6 +1950,7 @@ transformSubLink(ParseState *pstate, SubLink *sublink) param->paramtypmod = exprTypmod((Node *) tent->expr); param->paramcollid = exprCollation((Node *) tent->expr); param->location = -1; + param->parallel_safe = false; right_list = lappend(right_list, param); } diff --git a/src/backend/parser/parse_param.c b/src/backend/parser/parse_param.c index 2575e02..f3a8350 100644 --- a/src/backend/parser/parse_param.c +++ b/src/backend/parser/parse_param.c @@ -117,6 +117,7 @@ fixed_paramref_hook(ParseState *pstate, ParamRef *pref) param->paramtypmod = -1; param->paramcollid = get_typcollation(param->paramtype); param->location = pref->location; + param->parallel_safe = false; return (Node *) param; } @@ -170,6 +171,7 @@ variable_paramref_hook(ParseState *pstate, ParamRef *pref) param->paramtypmod = -1; param->paramcollid = get_typcollation(param->paramtype); param->location = pref->location; + param->parallel_safe = false; return (Node *) param; } diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 235bc75..4ade0e5 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -225,6 +225,7 @@ typedef struct Param int32 paramtypmod; /* typmod value, if known */ Oid paramcollid; /* OID of collation, or InvalidOid if none */ int location; /* token location, or -1 if unknown */ + bool parallel_safe; /* OK to use as part of parallel plan? */ } Param; /* diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c index 3c52d71..7452443 100644 --- a/src/pl/plpgsql/src/pl_comp.c +++ b/src/pl/plpgsql/src/pl_comp.c @@ -1327,6 +1327,7 @@ make_datum_param(PLpgSQL_expr *expr, int dno, int location) ¶m->paramtypmod, ¶m->paramcollid); param->location = location; + param->parallel_safe = false; return (Node *) param; } diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index de8eb88..0c1003c 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -124,6 +124,30 @@ select count(*)from tenk1 where (two, four) not in 10000 (1 row) +-- test parallel plans for queries containing correlated subplans. +explain (costs off) + select count(*)from tenk1 where (two, four) in + (select hundred, thousand from tenk2 where tenk2.thousand = tenk1.hundred); + QUERY PLAN +---------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 4 + -> Partial Aggregate + -> Parallel Seq Scan on tenk1 + Filter: (SubPlan 1) + SubPlan 1 + -> Seq Scan on tenk2 + Filter: (thousand = tenk1.hundred) +(9 rows) + +select count(*)from tenk1 where (two, four) in + (select hundred, thousand from tenk2 where tenk2.thousand = tenk1.hundred); + count +------- + 200 +(1 row) + alter table tenk2 reset (parallel_workers); set force_parallel_mode=1; explain (costs off) diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 69d7e72..f8cbd9a 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -46,6 +46,14 @@ explain (costs off) (select hundred, thousand from tenk2 where thousand > 100); select count(*)from tenk1 where (two, four) not in (select hundred, thousand from tenk2 where thousand > 100); + +-- test parallel plans for queries containing correlated subplans. +explain (costs off) + select count(*)from tenk1 where (two, four) in + (select hundred, thousand from tenk2 where tenk2.thousand = tenk1.hundred); +select count(*)from tenk1 where (two, four) in + (select hundred, thousand from tenk2 where tenk2.thousand = tenk1.hundred); + alter table tenk2 reset (parallel_workers); set force_parallel_mode=1;