diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index c9b55ea..8c16370 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -106,6 +106,7 @@ static void show_tidbitmap_info(BitmapHeapScanState *planstate, static void show_instrumentation_count(const char *qlabel, int which, PlanState *planstate, ExplainState *es); static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es); +static void show_eval_params(Bitmapset *bms_params, ExplainState *es); static const char *explain_get_index_name(Oid indexId); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir, @@ -631,7 +632,17 @@ ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc) */ ps = queryDesc->planstate; if (IsA(ps, GatherState) &&((Gather *) ps->plan)->invisible) + { + List *initPlanState = NULL; + PlanState *save_ps; + + /* initplans are always attached to top node (cf standard_planner) */ + save_ps = ps; + initPlanState = ps->initPlan; ps = outerPlanState(ps); + ps->initPlan = initPlanState; + save_ps->initPlan = NIL; + } ExplainNode(ps, NIL, NULL, NULL, es); } @@ -1402,6 +1413,11 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, es); ExplainPropertyInteger("Workers Planned", gather->num_workers, es); + + /* Show params evaluated at gather node */ + if (gather->initParam) + show_eval_params(gather->initParam, es); + if (es->analyze) { int nworkers; @@ -1424,6 +1440,11 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, es); ExplainPropertyInteger("Workers Planned", gm->num_workers, es); + + /* Show params evaluated at gather-merge node */ + if (gm->initParam) + show_eval_params(gm->initParam, es); + if (es->analyze) { int nworkers; @@ -2385,6 +2406,28 @@ show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es) } /* + * Show initplan params evaluated at gather or gather merge node. + */ +static void +show_eval_params(Bitmapset *bms_params, ExplainState *es) +{ + int paramid = -1; + List *params = NIL; + + Assert(bms_params); + + while ((paramid = bms_next_member(bms_params, paramid)) >= 0) + { + char param[32]; + + snprintf(param, sizeof(param), "$%d", paramid); + params = lappend(params, pstrdup(param)); + } + + ExplainPropertyList("Params Evaluated", params, es); +} + +/* * Fetch the name of an index in an EXPLAIN * * We allow plugins to get control here so that plans involving hypothetical diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a1289e5..09bb721 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -29,6 +29,7 @@ #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" +#include "executor/nodeSubplan.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/tqueue.h" @@ -54,6 +55,7 @@ #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_INITPLAN_PARAMS UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -364,7 +366,8 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + Bitmapset *initParam) { ParallelExecutorInfo *pei; ParallelContext *pcxt; @@ -373,10 +376,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) char *pstmt_data; char *pstmt_space; char *param_space; + char *initplan_param_space; BufferUsage *bufusage_space; SharedExecutorInstrumentation *instrumentation = NULL; int pstmt_len; int param_len; + int initplan_param_len; int instrumentation_len = 0; int instrument_offset = 0; Size dsa_minsize = dsa_minimum_size(); @@ -388,6 +393,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->finished = false; pei->planstate = planstate; + ExecEvalParamExecParams(initParam, estate); + /* Fix up and serialize plan to be sent to workers. */ pstmt_data = ExecSerializePlan(planstate->plan, estate); @@ -416,6 +423,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) shm_toc_estimate_chunk(&pcxt->estimator, param_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for initplan params. */ + initplan_param_len = EstimateInitPlanParamsSpace(estate->es_param_exec_vals, initParam); + shm_toc_estimate_chunk(&pcxt->estimator, initplan_param_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -485,6 +497,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space); SerializeParamList(estate->es_param_list_info, ¶m_space); + /* Store serialized initplan params. */ + initplan_param_space = shm_toc_allocate(pcxt->toc, initplan_param_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INITPLAN_PARAMS, initplan_param_space); + SerializeInitPlanParams(estate->es_param_exec_vals, initParam, &initplan_param_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -675,6 +692,21 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) } /* + * Copy the ParamExecData params corresponding to initplans from dynamic + * shared memory. This has to be done once the params are allocated by + * executor; that is after ExecutorStart(). + */ +static void +ExecParallelInitializeInitPlanParams(shm_toc *toc, ParamExecData *params) +{ + char *paramspace; + + /* Reconstruct initplan params. */ + paramspace = shm_toc_lookup(toc, PARALLEL_KEY_INITPLAN_PARAMS); + RestoreInitPlanParams(¶mspace, params); +} + +/* * Create a QueryDesc for the PlannedStmt we are to execute, and return it. */ static QueryDesc * @@ -849,6 +881,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; + ExecParallelInitializeInitPlanParams(toc, queryDesc->estate->es_param_exec_vals); ExecParallelInitializeWorker(queryDesc->planstate, toc); /* Run the plan */ diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c index 90bef6f..72a1dcb 100644 --- a/src/backend/executor/execQual.c +++ b/src/backend/executor/execQual.c @@ -1094,6 +1094,34 @@ ExecEvalParamExec(ExprState *exprstate, ExprContext *econtext, } /* ---------------------------------------------------------------- + * ExecEvalParamExecParams + * + * Execute the subplan stored in PARAM_EXEC initplans params, if not executed + * till now. + * ---------------------------------------------------------------- + */ +void +ExecEvalParamExecParams(Bitmapset *params, EState *estate) +{ + ParamExecData *prm; + int paramid; + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + prm = &(estate->es_param_exec_vals[paramid]); + + if (prm->execPlan != NULL) + { + /* Parameter not evaluated yet, so go do it */ + ExecSetParamPlan(prm->execPlan, GetPerTupleExprContext(estate)); + /* ExecSetParamPlan should have processed this param... */ + Assert(prm->execPlan == NULL); + } + } +} + +/* ---------------------------------------------------------------- * ExecEvalParamExtern * * Returns the value of a PARAM_EXTERN parameter. diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 32c97d3..31d4bf5 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -155,7 +155,8 @@ ExecGather(GatherState *node) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gather->num_workers); + gather->num_workers, + gather->initParam); /* * Register backend workers. We might not get as many as we diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 72f30ab..6df2226 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -189,7 +189,8 @@ ExecGatherMerge(GatherMergeState *node) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gm->num_workers); + gm->num_workers, + gm->initParam); /* Try to launch workers. */ pcxt = node->pei->pcxt; diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c index cac7ba1..11aeaaf 100644 --- a/src/backend/executor/nodeNestloop.c +++ b/src/backend/executor/nodeNestloop.c @@ -126,6 +126,7 @@ ExecNestLoop(NestLoopState *node) { NestLoopParam *nlp = (NestLoopParam *) lfirst(lc); int paramno = nlp->paramno; + TupleDesc tdesc = outerTupleSlot->tts_tupleDescriptor; ParamExecData *prm; prm = &(econtext->ecxt_param_exec_vals[paramno]); @@ -136,6 +137,7 @@ ExecNestLoop(NestLoopState *node) prm->value = slot_getattr(outerTupleSlot, nlp->paramval->varattno, &(prm->isnull)); + prm->ptype = tdesc->attrs[nlp->paramval->varattno - 1]->atttypid; /* Flag parameter value as changed */ innerPlan->chgParam = bms_add_member(innerPlan->chgParam, paramno); diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 8f419a1..b75fae6 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -30,11 +30,15 @@ #include #include "access/htup_details.h" +#include "catalog/pg_type.h" #include "executor/executor.h" #include "executor/nodeSubplan.h" #include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" +#include "storage/shmem.h" #include "utils/array.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -276,11 +280,13 @@ ExecScanSubPlan(SubPlanState *node, forboth(l, subplan->parParam, pvar, node->args) { int paramid = lfirst_int(l); + ExprState *exprstate = (ExprState *) lfirst(pvar); ParamExecData *prm = &(econtext->ecxt_param_exec_vals[paramid]); - prm->value = ExecEvalExprSwitchContext((ExprState *) lfirst(pvar), + prm->value = ExecEvalExprSwitchContext(exprstate, econtext, &(prm->isnull)); + prm->ptype = exprType((Node *) exprstate->expr); planstate->chgParam = bms_add_member(planstate->chgParam, paramid); } @@ -393,6 +399,7 @@ ExecScanSubPlan(SubPlanState *node, prmdata = &(econtext->ecxt_param_exec_vals[paramid]); Assert(prmdata->execPlan == NULL); prmdata->value = slot_getattr(slot, col, &(prmdata->isnull)); + prmdata->ptype = tdesc->attrs[col - 1]->atttypid; col++; } @@ -559,11 +566,13 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext) { int paramid = lfirst_int(plst); ParamExecData *prmdata; + TupleDesc tdesc = slot->tts_tupleDescriptor; prmdata = &(innerecontext->ecxt_param_exec_vals[paramid]); Assert(prmdata->execPlan == NULL); prmdata->value = slot_getattr(slot, col, &(prmdata->isnull)); + prmdata->ptype = tdesc->attrs[col - 1]->atttypid; col++; } slot = ExecProject(node->projRight); @@ -943,6 +952,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) ListCell *l; bool found = false; ArrayBuildStateAny *astate = NULL; + Oid ptype; if (subLinkType == ANY_SUBLINK || subLinkType == ALL_SUBLINK) @@ -950,6 +960,8 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) if (subLinkType == CTE_SUBLINK) elog(ERROR, "CTE subplans should not be executed via ExecSetParamPlan"); + ptype = exprType((Node *) node->xprstate.expr); + /* Initialize ArrayBuildStateAny in caller's context, if needed */ if (subLinkType == ARRAY_SUBLINK) astate = initArrayResultAny(subplan->firstColType, @@ -972,11 +984,13 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) forboth(l, subplan->parParam, pvar, node->args) { int paramid = lfirst_int(l); + ExprState *exprstate = (ExprState *) lfirst(pvar); ParamExecData *prm = &(econtext->ecxt_param_exec_vals[paramid]); - prm->value = ExecEvalExprSwitchContext((ExprState *) lfirst(pvar), + prm->value = ExecEvalExprSwitchContext(exprstate, econtext, &(prm->isnull)); + prm->ptype = exprType((Node *) exprstate->expr); planstate->chgParam = bms_add_member(planstate->chgParam, paramid); } @@ -999,6 +1013,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = BoolGetDatum(true); + prm->ptype = ptype; prm->isnull = false; found = true; break; @@ -1050,6 +1065,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = heap_getattr(node->curTuple, i, tdesc, &(prm->isnull)); + prm->ptype = tdesc->attrs[i - 1]->atttypid; i++; } } @@ -1072,6 +1088,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) true); prm->execPlan = NULL; prm->value = node->curArray; + prm->ptype = ptype; prm->isnull = false; } else if (!found) @@ -1084,6 +1101,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = BoolGetDatum(false); + prm->ptype = ptype; prm->isnull = false; } else @@ -1096,6 +1114,7 @@ ExecSetParamPlan(SubPlanState *node, ExprContext *econtext) prm->execPlan = NULL; prm->value = (Datum) 0; + prm->ptype = VOIDOID; prm->isnull = true; } } @@ -1220,3 +1239,136 @@ ExecAlternativeSubPlan(AlternativeSubPlanState *node, return ExecSubPlan(activesp, econtext, isNull); } + +/* + * Estimate the amount of space required to serialize the InitPlan params. + */ +Size +EstimateInitPlanParamsSpace(ParamExecData *paramExecVals, Bitmapset *params) +{ + int paramid; + Size sz = sizeof(int); + ParamExecData *prm; + + if (params == NULL) + return sz; + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + + prm = &(paramExecVals[paramid]); + typeOid = prm->ptype; + + sz = add_size(sz, sizeof(int)); /* space for paramid */ + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + sz = add_size(sz, + datumEstimateSpace(prm->value, prm->isnull, typByVal, typLen)); + } + return sz; +} + +/* + * Serialize ParamExecData params corresponding to initplans. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte paramid (location of param in execution time internal + * parameter array), 4-byte type OID, and then the datum as serialized by + * datumSerialize(). + * + * The above format is quite similar to the format used to serialize + * paramListInfo structure, so if we change either format, then consider to + * change at both the places. + */ +void +SerializeInitPlanParams(ParamExecData *paramExecVals, Bitmapset *params, + char **start_address) +{ + int nparams; + int paramid; + ParamExecData *prm; + + if (params == NULL) + nparams = 0; + else + nparams = bms_num_members(params); + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + + prm = &(paramExecVals[paramid]); + typeOid = prm->ptype; + + /* Write paramid. */ + memcpy(*start_address, ¶mid, sizeof(int)); + *start_address += sizeof(int); + + /* Write OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* space for datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + datumSerialize(prm->value, prm->isnull, typByVal, typLen, + start_address); + } +} + +/* + * Restore ParamExecData params corresponding to initplans. + */ +void +RestoreInitPlanParams(char **start_address, ParamExecData *params) +{ + int nparams; + int i; + int paramid; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + for (i = 0; i < nparams; i++) + { + ParamExecData *prm; + + /* Read paramid */ + memcpy(¶mid, *start_address, sizeof(int)); + *start_address += sizeof(int); + prm = ¶ms[paramid]; + + /* Read type OID. */ + memcpy(&prm->ptype, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + prm->value = datumRestore(start_address, &prm->isnull); + prm->execPlan = NULL; + } +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index bfc2ac1..6a0ca65 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -356,6 +356,7 @@ _copyGather(const Gather *from) COPY_SCALAR_FIELD(num_workers); COPY_SCALAR_FIELD(single_copy); COPY_SCALAR_FIELD(invisible); + COPY_BITMAPSET_FIELD(initParam); return newnode; } @@ -382,6 +383,7 @@ _copyGatherMerge(const GatherMerge *from) COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool)); + COPY_BITMAPSET_FIELD(initParam); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 7418fbe..7437a65 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -455,6 +455,7 @@ _outGather(StringInfo str, const Gather *node) WRITE_INT_FIELD(num_workers); WRITE_BOOL_FIELD(single_copy); WRITE_BOOL_FIELD(invisible); + WRITE_BITMAPSET_FIELD(initParam); } static void @@ -484,6 +485,8 @@ _outGatherMerge(StringInfo str, const GatherMerge *node) appendStringInfoString(str, " :nullsFirst"); for (i = 0; i < node->numCols; i++) appendStringInfo(str, " %s", booltostr(node->nullsFirst[i])); + + WRITE_BITMAPSET_FIELD(initParam); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index d3bbc02..152bb1b 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2133,6 +2133,7 @@ _readGather(void) READ_INT_FIELD(num_workers); READ_BOOL_FIELD(single_copy); READ_BOOL_FIELD(invisible); + READ_BITMAPSET_FIELD(initParam); READ_DONE(); } @@ -2153,6 +2154,7 @@ _readGatherMerge(void) READ_OID_ARRAY(sortOperators, local_node->numCols); READ_OID_ARRAY(collations, local_node->numCols); READ_BOOL_ARRAY(nullsFirst, local_node->numCols); + READ_BITMAPSET_FIELD(initParam); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index b263359..fb240b8 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -38,6 +38,7 @@ #include "optimizer/planner.h" #include "optimizer/prep.h" #include "optimizer/restrictinfo.h" +#include "optimizer/subselect.h" #include "optimizer/tlist.h" #include "optimizer/var.h" #include "parser/parse_clause.h" @@ -2103,6 +2104,14 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel) return; /* + * We don't want to generate gather or gather merge node if there are + * initplans at some query level below the current query level as those + * plans could be parallel-unsafe or undirect correlated plans. + */ + if (is_initplan_below_current_query_level(root)) + return; + + /* * The output of Gather is always unsorted, so there's only one partial * path of interest: the cheapest one. That will be the one at the front * of partial_pathlist because of the way add_partial_path works. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index d002e6d..3fe1e3b 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -6161,6 +6161,7 @@ make_gather(List *qptlist, node->num_workers = nworkers; node->single_copy = single_copy; node->invisible = false; + node->initParam = NULL; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 02286d9..8767e69 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -331,6 +331,14 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { Gather *gather = makeNode(Gather); + /* + * If there are any initPlans attached to the formerly-top plan node, + * move them up to the Gather node; same as we do for Material node + * in materialize_finished_plan. + */ + gather->plan.initPlan = top_plan->initPlan; + top_plan->initPlan = NIL; + gather->plan.targetlist = top_plan->targetlist; gather->plan.qual = NIL; gather->plan.lefttree = top_plan; @@ -3441,6 +3449,14 @@ create_grouping_paths(PlannerInfo *root, /* Insufficient support for partial mode. */ try_parallel_aggregation = false; } + else if (is_initplan_below_current_query_level(root)) + { + /* + * Don't parallelize the plan if there is an initplan below current + * query level. See generate_gather_paths() for detailed reason. + */ + try_parallel_aggregation = false; + } else { /* Everything looks good. */ @@ -4357,9 +4373,13 @@ create_ordered_paths(PlannerInfo *root, * above will have handled those as well. However, there's one more * possibility: it may make sense to sort the cheapest partial path * according to the required output order and then use Gather Merge. + * + * Don't parallelize the plan if there is an initplan below current + * query level. See generate_gather_paths() for detailed reason. */ if (ordered_rel->consider_parallel && root->sort_pathkeys != NIL && - input_rel->partial_pathlist != NIL) + input_rel->partial_pathlist != NIL && + !is_initplan_below_current_query_level(root)) { Path *cheapest_partial_path; diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 5f3027e..4ee8da8 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -104,6 +104,7 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_param_references(PlannerInfo *root, Plan *plan); static Node *convert_combining_aggrefs(Node *node, void *context); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); @@ -617,7 +618,10 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_Gather: case T_GatherMerge: - set_upper_references(root, plan, rtoffset); + { + set_upper_references(root, plan, rtoffset); + set_param_references(root, plan); + } break; case T_Hash: @@ -1740,6 +1744,47 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) } /* + * set_param_references + * Initialize the initParam list in gather or gather merge node such that + * it contains reference of all the params that needs to be evaluated + * before execution of the node. It contains the initplan params that are + * being passed to the plan nodes below it. + */ +static void +set_param_references(PlannerInfo *root, Plan *plan) +{ + Assert(IsA(plan, Gather) || IsA(plan, GatherMerge)); + + if (plan->lefttree->extParam) + { + PlannerInfo *proot; + Bitmapset *initSetParam = NULL; + ListCell *l; + + for (proot = root; proot != NULL; proot = proot->parent_root) + { + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); + ListCell *l2; + + foreach(l2, initsubplan->setParam) + { + initSetParam = bms_add_member(initSetParam, lfirst_int(l2)); + } + } + } + + if (IsA(plan, Gather)) + ((Gather *) plan)->initParam = bms_intersect(plan->lefttree->extParam, initSetParam); + else if (IsA(plan, GatherMerge)) + ((GatherMerge *) plan)->initParam = bms_intersect(plan->lefttree->extParam, initSetParam); + else + elog(ERROR, "unrecognized node type: %d", nodeTag(plan)); + } +} + +/* * Recursively scan an expression tree and convert Aggrefs to the proper * intermediate form for combining aggregates. This means (1) replacing each * one's argument list with a single argument that is the original Aggref diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 6fa6540..a4527ff 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2136,13 +2136,11 @@ SS_identify_outer_params(PlannerInfo *root) } /* - * SS_charge_for_initplans - account for initplans in Path costs & parallelism + * SS_charge_for_initplans - account for initplans in Path costs * * If any initPlans have been created in the current query level, they will * get attached to the Plan tree created from whichever Path we select from - * the given rel. Increment all that rel's Paths' costs to account for them, - * and make sure the paths get marked as parallel-unsafe, since we can't - * currently transmit initPlans to parallel workers. + * the given rel. Increment all that rel's Paths' costs to account for them. * * This is separate from SS_attach_initplans because we might conditionally * create more initPlans during create_plan(), depending on which Path we @@ -2174,7 +2172,7 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) } /* - * Now adjust the costs and parallel_safe flags. + * Now adjust the costs. */ foreach(lc, final_rel->pathlist) { @@ -2182,7 +2180,6 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) path->startup_cost += initplan_cost; path->total_cost += initplan_cost; - path->parallel_safe = false; } /* We needn't do set_cheapest() here, caller will do it */ @@ -2932,3 +2929,29 @@ SS_make_initplan_from_plan(PlannerInfo *root, /* Set costs of SubPlan using info from the plan tree */ cost_subplan(subroot, node, plan); } + +/* + * is_initplan_below_current_query_level - is there any initplan present below + * current query level. + */ +bool +is_initplan_below_current_query_level(PlannerInfo *root) +{ + ListCell *lc; + + /* + * If the subplan corresponding to the subroot is an initPlan, it'll + * be attached to its parent root. Hence, we check the query level + * of its parent root and if any init_plans are attached there. + */ + foreach(lc, root->glob->subroots) + { + PlannerInfo *subroot = (PlannerInfo *) lfirst(lc); + PlannerInfo *proot = subroot->parent_root; + + if (proot->query_level > root->query_level && proot->init_plans) + return true; + } + + return false; +} diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index b19380e..10cb255 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -91,6 +91,7 @@ typedef struct typedef struct { + PlannerInfo *root; char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ } max_parallel_hazard_context; @@ -1054,6 +1055,7 @@ max_parallel_hazard(Query *parse) { max_parallel_hazard_context context; + context.root = NULL; context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; (void) max_parallel_hazard_walker((Node *) parse, &context); @@ -1082,6 +1084,7 @@ is_parallel_safe(PlannerInfo *root, Node *node) root->glob->nParamExec == 0) return true; /* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */ + context.root = root; context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; return !max_parallel_hazard_walker(node, &context); @@ -1176,13 +1179,40 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) return !((SubPlan *) node)->parallel_safe; /* - * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted. + * As of now, we can only pass initplan Params that refer to the same or + * parent query level. For the detailed reason, see generate_gather_paths */ else if (IsA(node, Param)) { - if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) - return true; + int paramid; + PlannerInfo *root; + + root = context->root; + paramid = ((Param *) node)->paramid; + + if (root) + { + PlannerInfo *proot; + ListCell *l; + + for (proot = root; proot != NULL; proot = proot->parent_root) + { + foreach(l, proot->init_plans) + { + SubPlan *initsubplan = (SubPlan *) lfirst(l); + ListCell *l2; + + foreach(l2, initsubplan->setParam) + { + int initparam = lfirst_int(l2); + if (paramid == initparam) + return false; + } + } + } + } + + return true; } /* diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 8bc4270..5c42160 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, - EState *estate, int nworkers); + EState *estate, int nworkers, Bitmapset *initParam); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 02dbe7b..1c7e0b9 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -243,6 +243,7 @@ extern bool ExecShutdownNode(PlanState *node); /* * prototypes from functions in execQual.c */ +extern void ExecEvalParamExecParams(Bitmapset *params, EState *estate); extern Datum GetAttributeByNum(HeapTupleHeader tuple, AttrNumber attrno, bool *isNull); extern Datum GetAttributeByName(HeapTupleHeader tuple, const char *attname, diff --git a/src/include/executor/nodeSubplan.h b/src/include/executor/nodeSubplan.h index 0f821dc..a59c671 100644 --- a/src/include/executor/nodeSubplan.h +++ b/src/include/executor/nodeSubplan.h @@ -24,4 +24,10 @@ extern void ExecReScanSetParamPlan(SubPlanState *node, PlanState *parent); extern void ExecSetParamPlan(SubPlanState *node, ExprContext *econtext); +extern Size EstimateInitPlanParamsSpace(ParamExecData *paramExecVals, Bitmapset *params); + +extern void SerializeInitPlanParams(ParamExecData *paramExecVals, Bitmapset *params, char **start_address); + +extern void RestoreInitPlanParams(char **start_address, ParamExecData *params); + #endif /* NODESUBPLAN_H */ diff --git a/src/include/nodes/params.h b/src/include/nodes/params.h index e19ac24..b0732d2 100644 --- a/src/include/nodes/params.h +++ b/src/include/nodes/params.h @@ -98,6 +98,16 @@ typedef struct ParamExecData { void *execPlan; /* should be "SubPlanState *" */ Datum value; + + /* + * parameter's datatype, or 0. This is required so that datum value can + * be read and used for other purposes like passing it to worker backend + * via shared memory. This is required only for initPlan's evaluation, + * however for consistency we set this for Subplan as well. We left it + * for other cases like CTE or RecursiveUnion cases where this structure + * is not used for evaluation of subplans. + */ + Oid ptype; bool isnull; } ParamExecData; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index b880dc1..c56f4b0 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -795,6 +795,8 @@ typedef struct Gather int num_workers; bool single_copy; bool invisible; /* suppress EXPLAIN display (for testing)? */ + Bitmapset *initParam; /* param id's of initplans which are referred at gather or + * one of it's child node */ } Gather; /* ------------ @@ -811,6 +813,8 @@ typedef struct GatherMerge Oid *sortOperators; /* OIDs of operators to sort them by */ Oid *collations; /* OIDs of collations */ bool *nullsFirst; /* NULLS FIRST/LAST directions */ + Bitmapset *initParam; /* param id's of initplans which are referred at gather + * merge or one of it's child node */ } GatherMerge; /* ---------------- diff --git a/src/include/optimizer/subselect.h b/src/include/optimizer/subselect.h index 56dc237..62ff0a9 100644 --- a/src/include/optimizer/subselect.h +++ b/src/include/optimizer/subselect.h @@ -35,6 +35,7 @@ extern Param *SS_make_initplan_output_param(PlannerInfo *root, extern void SS_make_initplan_from_plan(PlannerInfo *root, PlannerInfo *subroot, Plan *plan, Param *prm); +extern bool is_initplan_below_current_query_level(PlannerInfo *root); extern Param *assign_nestloop_param_var(PlannerInfo *root, Var *var); extern Param *assign_nestloop_param_placeholdervar(PlannerInfo *root, PlaceHolderVar *phv);