diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8..813a80e42d 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -47,16 +47,22 @@ * greater than any 32-bit integer here so that values < 2^32 can be used * by individual parallel nodes to store their own state. */ -#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 +typedef struct FixedParallelExecutorState +{ + int64 tuples_needed; +} FixedParallelExecutorState; + /* * DSM structure for accumulating per-PlanState instrumentation. * @@ -77,6 +83,7 @@ struct SharedExecutorInstrumentation { int instrument_options; int instrument_offset; + int64 tuple_bound; int num_workers; int num_plan_nodes; int plan_node_id[FLEXIBLE_ARRAY_MEMBER]; @@ -381,12 +388,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + int64 tuples_needed) { ParallelExecutorInfo *pei; ParallelContext *pcxt; ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; + FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; char *param_space; @@ -418,6 +427,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for fixed-size state. */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(FixedParallelExecutorState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for query text. */ query_len = strlen(estate->es_sourceText); shm_toc_estimate_chunk(&pcxt->estimator, query_len); @@ -487,6 +501,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store fixed-size state. */ + fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); + fpes->tuples_needed = tuples_needed; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); + /* Store query string */ query_string = shm_toc_allocate(pcxt->toc, query_len); memcpy(query_string, estate->es_sourceText, query_len); @@ -833,6 +852,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { + FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; DestReceiver *receiver; QueryDesc *queryDesc; @@ -841,6 +861,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) void *area_space; dsa_area *area; + /* Get fixed-size state. */ + fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); @@ -868,6 +891,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); + /* Pass down any tuple bound */ + ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); + /* Run the plan */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 36d2914249..d1abd20cd1 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -757,3 +757,116 @@ ExecShutdownNode(PlanState *node) return false; } + +/* + * Set a tuple bound for a planstate node. This lets us optimize based on + * the knowledge that the maximum number of tuples which must be returned is + * limited. + * + * This is a bit of a kluge, but we don't have any more-abstract way of + * communicating between the two nodes; and it doesn't seem worth trying + * to invent one without some more examples of special communication needs. + */ +void +ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) +{ + /* + * If the child is a subquery that does no filtering (no predicates) + * and does not have any SRFs in the target list then we can potentially + * push the tuple bound through the subquery. It is possible that we could + * have multiple subqueries, so tunnel through them all. + */ + while (IsA(child_node, SubqueryScanState)) + { + SubqueryScanState *subqueryScanState; + + subqueryScanState = (SubqueryScanState *) child_node; + + /* + * Non-empty predicates or an SRF means we cannot push down the bound. + */ + if (subqueryScanState->ss.ps.qual != NULL || + expression_returns_set((Node *) child_node->plan->targetlist)) + return; + + /* Use the child in the following checks */ + child_node = subqueryScanState->subplan; + } + + if (IsA(child_node, SortState)) + { + SortState *sortState = castNode(SortState, child_node); + + /* + * If our input is a Sort node, notify it that it can use bounded sort. + * + * Note: it is the responsibility of nodeSort.c to react properly to + * changes of these parameters. If we ever do redesign this, it'd be a + * good idea to integrate this signaling with the parameter-change + * mechanism. + */ + + /* negative test checks for overflow in sum */ + if (tuples_needed < 0) + { + /* make sure flag gets reset if needed upon rescan */ + sortState->bounded = false; + } + else + { + sortState->bounded = true; + sortState->bound = tuples_needed; + } + } + else if (IsA(child_node, MergeAppendState)) + { + MergeAppendState *maState = castNode(MergeAppendState, child_node); + int i; + + /* + * Also, if our input is a MergeAppend, we can apply the same bound to + * any Sorts that are direct children of the MergeAppend, since the + * MergeAppend surely need read no more than that many tuples from + * any one input. + */ + for (i = 0; i < maState->ms_nplans; i++) + ExecSetTupleBound(tuples_needed, maState->mergeplans[i]); + } + else if (IsA(child_node, ResultState)) + { + /* + * We also have to be prepared to look through a Result, since the + * planner might stick one atop MergeAppend for projection purposes. + * + * If Result supported qual checking, we'd have to punt on seeing a + * qual. Note that having a resconstantqual is not a showstopper: if + * that fails we're not getting any rows at all. + */ + if (outerPlanState(child_node)) + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + else if (IsA(child_node, Gather)) + { + GatherState *gstate = castNode(GatherState, child_node); + + /* + * We might have a Gather node, which can propagate the bound to + * workers. + * + * Note: As with Sort, the Gather node is responsible for reacting + * properly to changes to this parameter. + */ + gstate->tuples_needed = tuples_needed; + + /* we should also pass this down to our own copy of the child plan */ + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + else if (IsA(child_node, GatherMerge)) + { + GatherMergeState *gstate = castNode(GatherMergeState, child_node); + + /* Same idea here as for Gather */ + gstate->tuples_needed = tuples_needed; + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } +} diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index e8d94ee6f3..a0f5a60d93 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.state = estate; gatherstate->ps.ExecProcNode = ExecGather; gatherstate->need_to_scan_locally = !node->single_copy; + gatherstate->tuples_needed = -1; /* * Miscellaneous initialization @@ -156,7 +157,8 @@ ExecGather(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gather->num_workers); + gather->num_workers, + node->tuples_needed); /* * Register backend workers. We might not get as many as we diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 64c62398bb..2526c584fd 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) gm_state->ps.plan = (Plan *) node; gm_state->ps.state = estate; gm_state->ps.ExecProcNode = ExecGatherMerge; + gm_state->tuples_needed = -1; /* * Miscellaneous initialization @@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gm->num_workers); + gm->num_workers, + node->tuples_needed); /* Try to launch workers. */ pcxt = node->pei->pcxt; diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c index 09af1a5d8b..1ee035b5e5 100644 --- a/src/backend/executor/nodeLimit.c +++ b/src/backend/executor/nodeLimit.c @@ -27,7 +27,7 @@ #include "nodes/nodeFuncs.h" static void recompute_limits(LimitState *node); -static void pass_down_bound(LimitState *node, PlanState *child_node); +static int64 compute_tuples_needed(LimitState *node); /* ---------------------------------------------------------------- @@ -298,89 +298,18 @@ recompute_limits(LimitState *node) node->lstate = LIMIT_RESCAN; /* Notify child node about limit, if useful */ - pass_down_bound(node, outerPlanState(node)); + ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node)); } /* - * If we have a COUNT, and our input is a Sort node, notify it that it can - * use bounded sort. Also, if our input is a MergeAppend, we can apply the - * same bound to any Sorts that are direct children of the MergeAppend, - * since the MergeAppend surely need read no more than that many tuples from - * any one input. We also have to be prepared to look through a Result, - * since the planner might stick one atop MergeAppend for projection purposes. - * We can also accept one or more levels of subqueries that have no quals or - * SRFs (that is, each subquery is just projecting columns) between the LIMIT - * and any of the above. - * - * This is a bit of a kluge, but we don't have any more-abstract way of - * communicating between the two nodes; and it doesn't seem worth trying - * to invent one without some more examples of special communication needs. - * - * Note: it is the responsibility of nodeSort.c to react properly to - * changes of these parameters. If we ever do redesign this, it'd be a - * good idea to integrate this signaling with the parameter-change mechanism. + * Compute the number of tuples needed to satisfy a Limit node. */ -static void -pass_down_bound(LimitState *node, PlanState *child_node) +static int64 +compute_tuples_needed(LimitState *node) { - /* - * If the child is a subquery that does no filtering (no predicates) - * and does not have any SRFs in the target list then we can potentially - * push the limit through the subquery. It is possible that we could have - * multiple subqueries, so tunnel through them all. - */ - while (IsA(child_node, SubqueryScanState)) - { - SubqueryScanState *subqueryScanState; - - subqueryScanState = (SubqueryScanState *) child_node; - - /* - * Non-empty predicates or an SRF means we cannot push down the limit. - */ - if (subqueryScanState->ss.ps.qual != NULL || - expression_returns_set((Node *) child_node->plan->targetlist)) - return; - - /* Use the child in the following checks */ - child_node = subqueryScanState->subplan; - } - - if (IsA(child_node, SortState)) - { - SortState *sortState = (SortState *) child_node; - int64 tuples_needed = node->count + node->offset; - - /* negative test checks for overflow in sum */ - if (node->noCount || tuples_needed < 0) - { - /* make sure flag gets reset if needed upon rescan */ - sortState->bounded = false; - } - else - { - sortState->bounded = true; - sortState->bound = tuples_needed; - } - } - else if (IsA(child_node, MergeAppendState)) - { - MergeAppendState *maState = (MergeAppendState *) child_node; - int i; - - for (i = 0; i < maState->ms_nplans; i++) - pass_down_bound(node, maState->mergeplans[i]); - } - else if (IsA(child_node, ResultState)) - { - /* - * If Result supported qual checking, we'd have to punt on seeing a - * qual. Note that having a resconstantqual is not a showstopper: if - * that fails we're not getting any rows at all. - */ - if (outerPlanState(child_node)) - pass_down_bound(node, outerPlanState(child_node)); - } + if (node->noCount) + return -1; + return node->count + node->offset; } /* ---------------------------------------------------------------- diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index bd0a87fa04..79b886706f 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, - EState *estate, int nworkers); + EState *estate, int nworkers, int64 tuples_needed); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index eacbea3c36..f48a603dae 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); extern bool ExecShutdownNode(PlanState *node); +extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node); /* ---------------------------------------------------------------- diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3272c4b315..17ae657022 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1919,6 +1919,7 @@ typedef struct GatherState struct TupleQueueReader **reader; TupleTableSlot *funnel_slot; bool need_to_scan_locally; + int64 tuples_needed; } GatherState; /* ---------------- @@ -1944,6 +1945,7 @@ typedef struct GatherMergeState struct binaryheap *gm_heap; /* binary heap of slot indices */ bool gm_initialized; /* gather merge initilized ? */ bool need_to_scan_locally; + int64 tuples_needed; int gm_nkeys; SortSupport gm_sortkeys; /* array of length ms_nkeys */ struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */