From d91215c25023ec839c4bc8d3116cc3e32f48a3c3 Mon Sep 17 00:00:00 2001 From: bucoo Date: Mon, 19 Oct 2020 17:54:11 +0800 Subject: [PATCH 1/2] Parallel distinct and union support --- src/backend/commands/explain.c | 15 + src/backend/executor/Makefile | 1 + src/backend/executor/execAmi.c | 5 + src/backend/executor/execParallel.c | 17 ++ src/backend/executor/execProcnode.c | 10 + src/backend/executor/nodeBatchSort.c | 377 ++++++++++++++++++++++++++ src/backend/nodes/copyfuncs.c | 19 ++ src/backend/nodes/outfuncs.c | 15 + src/backend/nodes/readfuncs.c | 16 ++ src/backend/optimizer/path/costsize.c | 82 ++++++ src/backend/optimizer/plan/createplan.c | 47 +++- src/backend/optimizer/plan/planner.c | 43 +++ src/backend/optimizer/plan/setrefs.c | 1 + src/backend/optimizer/plan/subselect.c | 1 + src/backend/optimizer/prep/prepunion.c | 52 +++- src/backend/optimizer/util/pathnode.c | 38 +++ src/backend/optimizer/util/tlist.c | 16 ++ src/backend/postmaster/pgstat.c | 3 + src/backend/utils/misc/guc.c | 9 + src/include/executor/nodeBatchSort.h | 19 ++ src/include/nodes/execnodes.h | 15 + src/include/nodes/nodes.h | 3 + src/include/nodes/pathnodes.h | 10 + src/include/nodes/plannodes.h | 12 + src/include/optimizer/cost.h | 6 + src/include/optimizer/pathnode.h | 9 + src/include/optimizer/tlist.h | 1 + src/include/pgstat.h | 3 +- src/test/regress/expected/select_distinct.out | 42 +++ src/test/regress/expected/sysviews.out | 3 +- src/test/regress/expected/union.out | 55 ++++ src/test/regress/sql/select_distinct.sql | 14 + src/test/regress/sql/union.sql | 15 + 33 files changed, 960 insertions(+), 14 deletions(-) create mode 100644 src/backend/executor/nodeBatchSort.c create mode 100644 src/include/executor/nodeBatchSort.h diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index c98c9b5547..16a1fb035d 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1270,6 +1270,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Sort: pname = sname = "Sort"; break; + case T_BatchSort: + pname = sname = "BatchSort"; + break; case T_IncrementalSort: pname = sname = "Incremental Sort"; break; @@ -1933,6 +1936,18 @@ ExplainNode(PlanState *planstate, List *ancestors, show_sort_keys(castNode(SortState, planstate), ancestors, es); show_sort_info(castNode(SortState, planstate), es); break; + case T_BatchSort: + { + BatchSort *bsort = (BatchSort*)plan; + show_sort_group_keys(planstate, "Sort Key", + bsort->sort.numCols, 0, bsort->sort.sortColIdx, + bsort->sort.sortOperators, bsort->sort.collations, + bsort->sort.nullsFirst, + ancestors, es); + if (es->verbose) + ExplainPropertyInteger("batches", NULL, bsort->numBatches, es); + } + break; case T_IncrementalSort: show_incremental_sort_keys(castNode(IncrementalSortState, planstate), ancestors, es); diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f990c6473a..a4855a8881 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -33,6 +33,7 @@ OBJS = \ instrument.o \ nodeAgg.o \ nodeAppend.o \ + nodeBatchSort.o \ nodeBitmapAnd.o \ nodeBitmapHeapscan.o \ nodeBitmapIndexscan.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index e2154ba86a..6eb1fe2424 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -17,6 +17,7 @@ #include "executor/execdebug.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" +#include "executor/nodeBatchSort.h" #include "executor/nodeBitmapAnd.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeBitmapIndexscan.h" @@ -253,6 +254,10 @@ ExecReScan(PlanState *node) ExecReScanSort((SortState *) node); break; + case T_BatchSortState: + ExecReScanBatchSort((BatchSortState *)node); + break; + case T_IncrementalSortState: ExecReScanIncrementalSort((IncrementalSortState *) node); break; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 382e78fb7f..a5abd48507 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -27,6 +27,7 @@ #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" +#include "executor/nodeBatchSort.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" @@ -285,6 +286,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortEstimate((SortState *) planstate, e->pcxt); break; + case T_BatchSortState: + if (planstate->plan->parallel_aware) + ExecBatchSortEstimate((BatchSortState*)planstate, e->pcxt); + break; case T_IncrementalSortState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt); @@ -505,6 +510,10 @@ ExecParallelInitializeDSM(PlanState *planstate, /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortInitializeDSM((SortState *) planstate, d->pcxt); break; + case T_BatchSortState: + if (planstate->plan->parallel_aware) + ExecBatchSortInitializeDSM((BatchSortState*)planstate, d->pcxt); + break; case T_IncrementalSortState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt); @@ -991,6 +1000,10 @@ ExecParallelReInitializeDSM(PlanState *planstate, case T_IncrementalSortState: /* these nodes have DSM state, but no reinitialization is required */ break; + case T_BatchSortState: + if (planstate->plan->parallel_aware) + ExecBatchSortReInitializeDSM((BatchSortState*)planstate, pcxt); + break; default: break; @@ -1341,6 +1354,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortInitializeWorker((SortState *) planstate, pwcxt); break; + case T_BatchSortState: + if (planstate->plan->parallel_aware) + ExecBatchSortInitializeWorker((BatchSortState*)planstate, pwcxt); + break; case T_IncrementalSortState: /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate, diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 01b7b926bf..c13835ddda 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -75,6 +75,7 @@ #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" +#include "executor/nodeBatchSort.h" #include "executor/nodeBitmapAnd.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeBitmapIndexscan.h" @@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_BatchSort: + result = (PlanState *) ExecInitBatchSort((BatchSort *) node, + estate, eflags); + break; + case T_IncrementalSort: result = (PlanState *) ExecInitIncrementalSort((IncrementalSort *) node, estate, eflags); @@ -699,6 +705,10 @@ ExecEndNode(PlanState *node) ExecEndSort((SortState *) node); break; + case T_BatchSortState: + ExecEndBatchSort((BatchSortState *) node); + break; + case T_IncrementalSortState: ExecEndIncrementalSort((IncrementalSortState *) node); break; diff --git a/src/backend/executor/nodeBatchSort.c b/src/backend/executor/nodeBatchSort.c new file mode 100644 index 0000000000..b090fdaf43 --- /dev/null +++ b/src/backend/executor/nodeBatchSort.c @@ -0,0 +1,377 @@ +#include "postgres.h" + +#include "common/hashfn.h" +#include "executor/executor.h" +#include "executor/nodeBatchSort.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "port/atomics.h" +#include "storage/barrier.h" +#include "utils/builtins.h" +#include "utils/tuplesort.h" +#include "utils/typcache.h" + +typedef struct ParallelBatchSort +{ + Barrier barrier; + pg_atomic_uint32 attached; + pg_atomic_uint32 cur_batch; + Size tuplesort_size; /* MAXIMUM_ALIGNOF*n */ +}ParallelBatchSort; + +#define PARALLEL_BATCH_SORT_SIZE MAXALIGN(sizeof(ParallelBatchSort)) +#define PARALLEL_BATCH_SORT_SHARED(p,n) \ + (Sharedsort*)(((char*)p) + PARALLEL_BATCH_SORT_SIZE + (p)->tuplesort_size * n) + +#define BUILD_BATCH_DONE 1 + +static bool ExecNextParallelBatchSort(BatchSortState *state) +{ + ParallelBatchSort *parallel = state->parallel; + BatchSort *plan = castNode(BatchSort, state->ps.plan); + SortCoordinateData coord; + uint32 cur_batch; + Assert(parallel != NULL); + + if (state->curBatch >= 0 && + state->curBatch < plan->numBatches && + state->batches[state->curBatch] != NULL) + { + tuplesort_end(state->batches[state->curBatch]); + state->batches[state->curBatch] = NULL; + } + + cur_batch = pg_atomic_fetch_add_u32(¶llel->cur_batch, 1); + if (cur_batch >= plan->numBatches) + { + state->curBatch = plan->numBatches; + return false; + } + + Assert(state->batches[cur_batch] == NULL); + state->curBatch = cur_batch; + coord.isWorker = false; + coord.nParticipants = pg_atomic_read_u32(¶llel->attached); + coord.sharedsort = PARALLEL_BATCH_SORT_SHARED(parallel, cur_batch); + state->batches[cur_batch] = tuplesort_begin_heap(ExecGetResultType(outerPlanState(state)), + plan->sort.numCols, + plan->sort.sortColIdx, + plan->sort.sortOperators, + plan->sort.collations, + plan->sort.nullsFirst, + work_mem, + &coord, + false); + tuplesort_performsort(state->batches[cur_batch]); + return true; +} + +static TupleTableSlot *ExecEmptyBatchSort(PlanState *pstate) +{ + return ExecClearTuple(pstate->ps_ResultTupleSlot); +} + +static TupleTableSlot *ExecBatchSort(PlanState *pstate) +{ + TupleTableSlot *slot = pstate->ps_ResultTupleSlot; + BatchSortState *state = castNode(BatchSortState, pstate); + Assert(state->sort_Done); + +re_get_: + if (tuplesort_gettupleslot(state->batches[state->curBatch], + true, + false, + slot, + NULL) == false && + state->curBatch < castNode(BatchSort, pstate->plan)->numBatches-1) + { + if (state->parallel) + { + if (ExecNextParallelBatchSort(state) == false) + { + ExecSetExecProcNode(pstate, ExecEmptyBatchSort); + return ExecClearTuple(slot); + } + }else + { + state->curBatch++; + } + goto re_get_; + } + + return slot; +} + +static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate) +{ + BatchSort *node = castNode(BatchSort, pstate->plan); + BatchSortState *state = castNode(BatchSortState, pstate); + PlanState *outerNode = outerPlanState(pstate); + TupleTableSlot *slot; + ListCell *lc; + ParallelBatchSort *parallel = state->parallel; + SortCoordinateData coord; + FunctionCallInfo fcinfo; + uint32 hash; + int i; + AttrNumber maxAttr; + Assert(state->sort_Done == false); + Assert(list_length(state->groupFuns) == node->numGroupCols); + + if (parallel) + { + if (BarrierAttach(¶llel->barrier) >= BUILD_BATCH_DONE) + goto build_already_done_; + pg_atomic_add_fetch_u32(¶llel->attached, 1); + } + + for (i=node->numBatches;i>0;) + { + --i; + if (parallel) + { + coord.isWorker = true; + coord.nParticipants = -1; + coord.sharedsort = PARALLEL_BATCH_SORT_SHARED(parallel, i); + } + state->batches[i] = tuplesort_begin_heap(ExecGetResultType(outerNode), + node->sort.numCols, + node->sort.sortColIdx, + node->sort.sortOperators, + node->sort.collations, + node->sort.nullsFirst, + work_mem / node->numBatches, + parallel ? &coord : NULL, + false); + } + + maxAttr = 0; + for (i=node->numGroupCols;i>0;) + { + if (maxAttr < node->grpColIdx[--i]) + maxAttr = node->grpColIdx[i]; + } + for (i=node->sort.numCols;i>0;) + { + if (maxAttr < node->sort.sortColIdx[--i]) + maxAttr = node->sort.sortColIdx[i]; + } + Assert(maxAttr > 0); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + slot = ExecProcNode(outerNode); + if (TupIsNull(slot)) + break; + slot_getsomeattrs(slot, maxAttr); + + hash = 0; + i = 0; + foreach(lc, state->groupFuns) + { + AttrNumber att = node->grpColIdx[i++]-1; + if (slot->tts_isnull[att] == false) + { + fcinfo = lfirst(lc); + fcinfo->args[0].value = slot->tts_values[att]; + hash = hash_combine(hash, DatumGetUInt32(FunctionCallInvoke(fcinfo))); + Assert(fcinfo->isnull == false); + } + } + + tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot); + } + + for (i=node->numBatches;i>0;) + tuplesort_performsort(state->batches[--i]); +build_already_done_: + if (parallel) + { + for (i=node->numBatches;i>0;) + { + --i; + if (state->batches[i]) + { + tuplesort_end(state->batches[i]); + state->batches[i] = NULL; + } + } + if (BarrierPhase(¶llel->barrier) < BUILD_BATCH_DONE) + BarrierArriveAndWait(¶llel->barrier, WAIT_EVENT_BATCH_SORT_BUILD); + BarrierDetach(¶llel->barrier); + + if (ExecNextParallelBatchSort(state)) + ExecSetExecProcNode(pstate, ExecBatchSort); + else + ExecSetExecProcNode(pstate, ExecEmptyBatchSort); + }else + { + state->curBatch = 0; + ExecSetExecProcNode(pstate, ExecBatchSort); + } + state->sort_Done = true; + + return (*pstate->ExecProcNodeReal)(pstate); +} + +BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags) +{ + BatchSortState *state; + TypeCacheEntry *typentry; + TupleDesc desc; + int i; + + if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) + { + /* for now, we only using in group aggregate */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("not support execute flag(s) %d for group sort", eflags))); + } + + state = makeNode(BatchSortState); + state->ps.plan = (Plan*) node; + state->ps.state = estate; + state->ps.ExecProcNode = ExecBatchSortPrepare; + + state->sort_Done = false; + state->batches = palloc0(node->numBatches * sizeof(Tuplesortstate*)); + + outerPlanState(state) = ExecInitNode(outerPlan(node), estate, eflags); + + /* + * Initialize return slot and type. No need to initialize projection info + * because this node doesn't do projections. + */ + ExecInitResultTupleSlotTL(&state->ps, &TTSOpsMinimalTuple); + state->ps.ps_ProjInfo = NULL; + + Assert(node->numGroupCols > 0); + desc = ExecGetResultType(outerPlanState(state)); + for (i=0;inumGroupCols;++i) + { + FmgrInfo *flinfo; + FunctionCallInfo fcinfo; + Form_pg_attribute attr = TupleDescAttr(desc, node->grpColIdx[i]-1); + typentry = lookup_type_cache(attr->atttypid, TYPECACHE_HASH_PROC); + if (!OidIsValid(typentry->hash_proc)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an extended hash function for type %s", + format_type_be(attr->atttypid)))); + flinfo = palloc0(sizeof(*flinfo)); + fcinfo = palloc0(SizeForFunctionCallInfo(1)); + fmgr_info(typentry->hash_proc, flinfo); + InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL); + fcinfo->args[0].isnull = false; + state->groupFuns = lappend(state->groupFuns, fcinfo); + } + + return state; +} + +static void CleanBatchSort(BatchSortState *node) +{ + int i; + + ExecClearTuple(node->ps.ps_ResultTupleSlot); + if (node->sort_Done) + { + for (i=castNode(BatchSort, node->ps.plan)->numBatches;i>0;) + { + if (node->batches[--i] != NULL) + { + tuplesort_end(node->batches[i]); + node->batches[i] = NULL; + } + } + node->sort_Done = false; + } +} + +void ExecEndBatchSort(BatchSortState *node) +{ + ExecClearTuple(node->ps.ps_ResultTupleSlot); + CleanBatchSort(node); + ExecEndNode(outerPlanState(node)); +} + +void ExecReScanBatchSort(BatchSortState *node) +{ + CleanBatchSort(node); + if (outerPlanState(node)->chgParam != NULL) + ExecReScan(outerPlanState(node)); + ExecSetExecProcNode(&node->ps, ExecBatchSortPrepare); +} + +void ExecShutdownBatchSort(BatchSortState *node) +{ + CleanBatchSort(node); +} + +void ExecBatchSortEstimate(BatchSortState *node, ParallelContext *pcxt) +{ + Size size = mul_size(MAXALIGN(tuplesort_estimate_shared(pcxt->nworkers+1)), + castNode(BatchSort, node->ps.plan)->numBatches); + size = add_size(size, PARALLEL_BATCH_SORT_SIZE); + + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +static void InitializeBatchSortParallel(ParallelBatchSort *parallel, + int num_batches, + int num_workers, + dsm_segment *seg) +{ + int i; + BarrierInit(¶llel->barrier, 0); + pg_atomic_init_u32(¶llel->attached, 0); + pg_atomic_init_u32(¶llel->cur_batch, 0); + for (i=0;ips.plan); + Size tuplesort_size = MAXALIGN(tuplesort_estimate_shared(pcxt->nworkers+1)); + Size size = mul_size(tuplesort_size, plan->numBatches); + size = add_size(PARALLEL_BATCH_SORT_SIZE, size); + + node->parallel = parallel = shm_toc_allocate(pcxt->toc, size); + parallel->tuplesort_size = tuplesort_size; + InitializeBatchSortParallel(parallel, plan->numBatches, pcxt->nworkers+1, pcxt->seg); + shm_toc_insert(pcxt->toc, plan->sort.plan.plan_node_id, parallel); +} + +void ExecBatchSortReInitializeDSM(BatchSortState *node, ParallelContext *pcxt) +{ + InitializeBatchSortParallel(node->parallel, + castNode(BatchSort, node->ps.plan)->numBatches, + pcxt->nworkers+1, + pcxt->seg); + ExecSetExecProcNode(&node->ps, ExecBatchSortPrepare); +} + +void ExecBatchSortInitializeWorker(BatchSortState *node, ParallelWorkerContext *pwcxt) +{ + uint32 i; + BatchSort *plan = castNode(BatchSort, node->ps.plan); + ParallelBatchSort *parallel = shm_toc_lookup(pwcxt->toc, + plan->sort.plan.plan_node_id, + false); + node->parallel = parallel; + for (i=0;inumBatches;++i) + { + tuplesort_attach_shared(PARALLEL_BATCH_SORT_SHARED(parallel, i), + pwcxt->seg); + } +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 0409a40b82..958964f1fa 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -961,6 +961,22 @@ _copySort(const Sort *from) return newnode; } +/* + * _copyBatchSort + */ +static BatchSort * +_copyBatchSort(const BatchSort *from) +{ + BatchSort *newnode = makeNode(BatchSort); + + CopySortFields(&from->sort, &newnode->sort); + + COPY_SCALAR_FIELD(numGroupCols); + COPY_SCALAR_FIELD(numBatches); + COPY_POINTER_FIELD(grpColIdx, from->numGroupCols * sizeof(AttrNumber)); + + return newnode; +} /* * _copyIncrementalSort @@ -4939,6 +4955,9 @@ copyObjectImpl(const void *from) case T_Sort: retval = _copySort(from); break; + case T_BatchSort: + retval = _copyBatchSort(from); + break; case T_IncrementalSort: retval = _copyIncrementalSort(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index f0386480ab..a8dd7ef23f 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -856,6 +856,18 @@ _outSort(StringInfo str, const Sort *node) _outSortInfo(str, node); } +static void +_outBatchSort(StringInfo str, const BatchSort *node) +{ + WRITE_NODE_TYPE("BATCHSORT"); + + _outSortInfo(str, &node->sort); + + WRITE_INT_FIELD(numGroupCols); + WRITE_INT_FIELD(numBatches); + WRITE_ATTRNUMBER_ARRAY(grpColIdx, node->numGroupCols); +} + static void _outIncrementalSort(StringInfo str, const IncrementalSort *node) { @@ -3813,6 +3825,9 @@ outNode(StringInfo str, const void *obj) case T_Sort: _outSort(str, obj); break; + case T_BatchSort: + _outBatchSort(str, obj); + break; case T_IncrementalSort: _outIncrementalSort(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 42050ab719..2c6eb4362c 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2181,6 +2181,20 @@ _readSort(void) READ_DONE(); } +static BatchSort * +_readBatchSort(void) +{ + READ_LOCALS(BatchSort); + + ReadCommonSort(&local_node->sort); + + READ_INT_FIELD(numGroupCols); + READ_INT_FIELD(numBatches); + READ_ATTRNUMBER_ARRAY(grpColIdx, local_node->numGroupCols); + + READ_DONE(); +} + /* * _readIncrementalSort */ @@ -2834,6 +2848,8 @@ parseNodeString(void) return_value = _readMaterial(); else if (MATCH("SORT", 4)) return_value = _readSort(); + else if (MATCH("BATCHSORT", 9)) + return_value = _readBatchSort(); else if (MATCH("INCREMENTALSORT", 15)) return_value = _readIncrementalSort(); else if (MATCH("GROUP", 5)) diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index cd3716d494..32d0dc8ce5 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -140,6 +140,7 @@ bool enable_partitionwise_aggregate = false; bool enable_parallel_append = true; bool enable_parallel_hash = true; bool enable_partition_pruning = true; +bool enable_batch_sort = true; typedef struct { @@ -1948,6 +1949,87 @@ cost_sort(Path *path, PlannerInfo *root, path->total_cost = startup_cost + run_cost; } +void cost_batchsort(Path *path, PlannerInfo *root, + List *batchkeys, Cost input_cost, + double tuples, int width, + Cost comparison_cost, int sort_mem, + uint32 numGroupCols, uint32 numBatches) +{ + Cost startup_cost = input_cost; + Cost run_cost = 0; + double input_bytes = relation_byte_size(tuples, width); + double batch_bytes = input_bytes / numBatches; + double batch_tuples = tuples / numBatches; + long sort_mem_bytes = sort_mem * 1024L; + + if (sort_mem_bytes < (64*1024)) + sort_mem_bytes = (64*1024); + + if (!enable_batch_sort) + startup_cost += disable_cost; + + /* hash cost */ + startup_cost += cpu_operator_cost * numGroupCols * tuples; + + path->rows = tuples; + + /* + * We want to be sure the cost of a sort is never estimated as zero, even + * if passed-in tuple count is zero. Besides, mustn't do log(0)... + */ + if (tuples < 2.0) + tuples = 2.0; + + if (batch_bytes > sort_mem_bytes) + { + /* + * We'll have to use a disk-based sort of all the tuples + */ + double npages = ceil(batch_bytes / BLCKSZ); + double nruns = batch_bytes / sort_mem_bytes; + double mergeorder = tuplesort_merge_order(sort_mem_bytes); + double log_runs; + double npageaccesses; + + /* + * CPU costs + * + * Assume about N log2 N comparisons + */ + startup_cost += comparison_cost * batch_tuples * LOG2(batch_tuples) * numBatches; + + /* Disk costs */ + + /* Compute logM(r) as log(r) / log(M) */ + if (nruns > mergeorder) + log_runs = ceil(log(nruns) / log(mergeorder)); + else + log_runs = 1.0; + npageaccesses = 2.0 * npages * log_runs; + /* Assume 3/4ths of accesses are sequential, 1/4th are not */ + startup_cost += npageaccesses * numBatches * + (seq_page_cost * 0.75 + random_page_cost * 0.25); + + }else + { + /* We'll use plain quicksort on all the input tuples */ + startup_cost += comparison_cost * tuples * LOG2(tuples); + } + + /* + * Also charge a small amount (arbitrarily set equal to operator cost) per + * extracted tuple. We don't charge cpu_tuple_cost because a Sort node + * doesn't do qual-checking or projection, so it has less overhead than + * most plan nodes. Note it's correct to use tuples not output_tuples + * here --- the upper LIMIT will pro-rate the run cost so we'd be double + * counting the LIMIT otherwise. + */ + run_cost += cpu_operator_cost * tuples; + + path->startup_cost = startup_cost; + path->total_cost = startup_cost + run_cost; +} + /* * append_nonpartial_cost * Estimate the cost of the non-partial paths in a Parallel Append. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 3d7a4e373f..85969388c2 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -98,6 +98,7 @@ static Plan *create_projection_plan(PlannerInfo *root, int flags); static Plan *inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe); static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags); +static BatchSort *create_batchsort_plan(PlannerInfo *root, BatchSortPath *best_path, int flags); static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root, IncrementalSortPath *best_path, int flags); static Group *create_group_plan(PlannerInfo *root, GroupPath *best_path); @@ -468,6 +469,11 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) (SortPath *) best_path, flags); break; + case T_BatchSort: + plan = (Plan *) create_batchsort_plan(root, + (BatchSortPath*) best_path, + flags); + break; case T_IncrementalSort: plan = (Plan *) create_incrementalsort_plan(root, (IncrementalSortPath *) best_path, @@ -2009,6 +2015,39 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags) return plan; } +static BatchSort *create_batchsort_plan(PlannerInfo *root, BatchSortPath *best_path, int flags) +{ + BatchSort *plan; + Plan *subplan; + + subplan = create_plan_recurse(root, best_path->subpath, + flags | CP_SMALL_TLIST); + + plan = makeNode(BatchSort); + subplan = prepare_sort_from_pathkeys(subplan, + best_path->batchkeys, + IS_OTHER_REL(best_path->subpath->parent) ? + best_path->path.parent->relids : NULL, + NULL, + false, + &plan->sort.numCols, + &plan->sort.sortColIdx, + &plan->sort.sortOperators, + &plan->sort.collations, + &plan->sort.nullsFirst); + plan->sort.plan.targetlist = subplan->targetlist; + plan->sort.plan.qual = NIL; + outerPlan(plan) = subplan; + innerPlan(plan) = NULL; + plan->numBatches = best_path->numBatches; + plan->numGroupCols = list_length(best_path->batchgroup); + plan->grpColIdx = extract_grouping_cols(best_path->batchgroup, + subplan->targetlist); + + copy_generic_path_info(&plan->sort.plan, &best_path->path); + return plan; +} + /* * create_incrementalsort_plan * @@ -2085,6 +2124,12 @@ create_upper_unique_plan(PlannerInfo *root, UpperUniquePath *best_path, int flag { Unique *plan; Plan *subplan; + List *pathkeys; + + if (IsA(best_path->subpath, BatchSortPath)) + pathkeys = ((BatchSortPath*)best_path->subpath)->batchkeys; + else + pathkeys = best_path->path.pathkeys; /* * Unique doesn't project, so tlist requirements pass through; moreover we @@ -2094,7 +2139,7 @@ create_upper_unique_plan(PlannerInfo *root, UpperUniquePath *best_path, int flag flags | CP_LABEL_TLIST); plan = make_unique_from_pathkeys(subplan, - best_path->path.pathkeys, + pathkeys, best_path->numkeys); copy_generic_path_info(&plan->plan, (Path *) best_path); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index f331f82a6c..ac7c2a52be 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -4806,6 +4806,8 @@ create_distinct_paths(PlannerInfo *root, cheapest_input_path->rows, NULL); } + distinct_rel->rows = numDistinctRows; + distinct_rel->reltarget = root->upper_targets[UPPERREL_DISTINCT]; /* * Consider sort-based implementations of DISTINCT, if possible. @@ -4825,6 +4827,7 @@ create_distinct_paths(PlannerInfo *root, * the other.) */ List *needed_pathkeys; + List *hashable_clause; if (parse->hasDistinctOn && list_length(root->distinct_pathkeys) < @@ -4871,6 +4874,44 @@ create_distinct_paths(PlannerInfo *root, path, list_length(root->distinct_pathkeys), numDistinctRows)); + + /* add parallel unique */ + if (distinct_rel->consider_parallel && + input_rel->partial_pathlist != NIL && + numDistinctRows >= BATCH_SORT_MIN_BATCHES && + (hashable_clause = grouping_get_hashable(parse->distinctClause)) != NIL) + { + double numPartialDistinctRows; + uint32 num_batchs = (uint32)numDistinctRows; + if (num_batchs > BATCH_SORT_MAX_BATCHES) + { + /* + * too many batchs(files) it is not a good idea, + * limit to BATCH_SORT_MAX_BATCHES + */ + num_batchs = BATCH_SORT_MAX_BATCHES; + } + + foreach (lc, input_rel->partial_pathlist) + { + Path *path = (Path*)create_batchsort_path(root, + distinct_rel, + lfirst(lc), + needed_pathkeys, + hashable_clause, + num_batchs, + true); + numPartialDistinctRows = numDistinctRows / path->parallel_workers; + if (numPartialDistinctRows < 1.0) + numPartialDistinctRows = 1.0; + path = (Path*)create_upper_unique_path(root, + distinct_rel, + path, + list_length(root->distinct_pathkeys), + numPartialDistinctRows); + add_partial_path(distinct_rel, path); + } + } } /* @@ -4908,6 +4949,8 @@ create_distinct_paths(PlannerInfo *root, numDistinctRows)); } + generate_useful_gather_paths(root, distinct_rel, false); + /* Give a helpful error if we failed to find any implementation */ if (distinct_rel->pathlist == NIL) ereport(ERROR, diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index dd8e2e966d..c39eed6b44 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -737,6 +737,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_Material: case T_Sort: + case T_BatchSort: case T_IncrementalSort: case T_Unique: case T_SetOp: diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index fcce81926b..cfe2557988 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2752,6 +2752,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, case T_Hash: case T_Material: case T_Sort: + case T_BatchSort: case T_IncrementalSort: case T_Unique: case T_SetOp: diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 745f443e5c..fa1053f077 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -67,7 +67,7 @@ static List *plan_union_children(PlannerInfo *root, List *refnames_tlist, List **tlist_list); static Path *make_union_unique(SetOperationStmt *op, Path *path, List *tlist, - PlannerInfo *root); + PlannerInfo *root, List *groupList, List *sortKeys); static void postprocess_setop_rel(PlannerInfo *root, RelOptInfo *rel); static bool choose_hashed_setop(PlannerInfo *root, List *groupClauses, Path *input_path, @@ -354,6 +354,7 @@ recurse_set_operations(Node *setOp, PlannerInfo *root, rel = generate_nonunion_paths(op, root, refnames_tlist, pTargetList); + generate_useful_gather_paths(root, rel, false); if (pNumGroups) *pNumGroups = rel->rows; @@ -552,6 +553,8 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, List *tlist_list; List *tlist; Path *path; + List *groupList = NIL; + List *sortKeys = NIL; /* * If plain UNION, tell children to fetch all tuples. @@ -587,6 +590,14 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, *pTargetList = tlist; + if (!op->all) + { + /* Identify the grouping semantics */ + groupList = generate_setop_grouplist(op, tlist); + if (grouping_is_sortable(groupList)) + sortKeys = make_pathkeys_for_sortclauses(root, groupList, tlist); + } + /* Build path lists and relid set. */ foreach(lc, rellist) { @@ -627,7 +638,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, * node(s) to remove duplicates. */ if (!op->all) - path = make_union_unique(op, path, tlist, root); + path = make_union_unique(op, path, tlist, root, groupList, sortKeys); add_path(result_rel, path); @@ -646,6 +657,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, { Path *ppath; ListCell *lc; + List *hashable_list; int parallel_workers = 0; /* Find the highest number of workers requested for any subpath. */ @@ -678,11 +690,35 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, NIL, NULL, parallel_workers, enable_parallel_append, NIL, -1); + if (!op->all && + sortKeys != NIL && + ppath->rows >= BATCH_SORT_MIN_BATCHES && + (hashable_list = grouping_get_hashable(groupList)) != NIL) + { + Path *partial_path; + uint32 numBatches = ppath->rows; + if (numBatches > BATCH_SORT_MAX_BATCHES) + numBatches = BATCH_SORT_MAX_BATCHES; + Assert(list_length(sortKeys) >= list_length(hashable_list)); + partial_path = (Path*)create_batchsort_path(root, + result_rel, + ppath, + sortKeys, + hashable_list, + numBatches, + true); + partial_path = (Path*) create_upper_unique_path(root, + result_rel, + partial_path, + list_length(sortKeys), + partial_path->rows); + add_partial_path(result_rel, partial_path); + } ppath = (Path *) create_gather_path(root, result_rel, ppath, result_rel->reltarget, NULL, NULL); if (!op->all) - ppath = make_union_unique(op, ppath, tlist, root); + ppath = make_union_unique(op, ppath, tlist, root, groupList, sortKeys); add_path(result_rel, ppath); } @@ -933,15 +969,11 @@ plan_union_children(PlannerInfo *root, */ static Path * make_union_unique(SetOperationStmt *op, Path *path, List *tlist, - PlannerInfo *root) + PlannerInfo *root, List *groupList, List *sortKeys) { RelOptInfo *result_rel = fetch_upper_rel(root, UPPERREL_SETOP, NULL); - List *groupList; double dNumGroups; - /* Identify the grouping semantics */ - groupList = generate_setop_grouplist(op, tlist); - /* * XXX for the moment, take the number of distinct groups as equal to the * total input size, ie, the worst case. This is too conservative, but @@ -976,9 +1008,7 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist, create_sort_path(root, result_rel, path, - make_pathkeys_for_sortclauses(root, - groupList, - tlist), + sortKeys, -1.0); path = (Path *) create_upper_unique_path(root, result_rel, diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index c1fc866cbf..460d2e5faa 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2880,6 +2880,44 @@ create_sort_path(PlannerInfo *root, return pathnode; } +BatchSortPath * +create_batchsort_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + List *pathkeys, + List *groupClause, + uint32 numBatches, + bool parallel_sort) +{ + BatchSortPath *pathnode = makeNode(BatchSortPath); + + pathnode->path.pathtype = T_BatchSort; + pathnode->path.parent = rel; + /* Sort doesn't project, so use source path's pathtarget */ + pathnode->path.pathtarget = subpath->pathtarget; + /* For now, assume we are above any joins, so no parameterization */ + pathnode->path.param_info = NULL; + pathnode->path.parallel_aware = parallel_sort; + pathnode->path.parallel_safe = rel->consider_parallel && + subpath->parallel_safe; + pathnode->path.parallel_workers = subpath->parallel_workers; + pathnode->batchkeys = pathkeys; + pathnode->batchgroup = groupClause; + pathnode->numBatches = numBatches; + + pathnode->subpath = subpath; + + cost_batchsort(&pathnode->path, root, pathkeys, + subpath->total_cost, subpath->rows, + subpath->pathtarget->width, + 0.0, /* XXX comparison_cost shouldn't be 0? */ + work_mem/numBatches, + list_length(groupClause), + numBatches); + + return pathnode; +} + /* * create_group_path * Creates a pathnode that represents performing grouping of presorted input diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c index 02a3c6b165..949568c672 100644 --- a/src/backend/optimizer/util/tlist.c +++ b/src/backend/optimizer/util/tlist.c @@ -593,6 +593,22 @@ grouping_is_hashable(List *groupClause) return true; } +List * +grouping_get_hashable(List *groupClause) +{ + ListCell *lc; + List *result = NIL; + + foreach (lc, groupClause) + { + SortGroupClause *groupcl = lfirst_node(SortGroupClause, lc); + + if (groupcl->hashable) + result = lappend(result, groupcl); + } + + return result; +} /***************************************************************************** * PathTarget manipulation functions diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 822f0ebc62..cacb7d13e6 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4021,6 +4021,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; + case WAIT_EVENT_BATCH_SORT_BUILD: + event_name = "Batch/Sort/Building"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 596bcb7b84..43a4e36d78 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -987,6 +987,15 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_batch_sort", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("enable batch sort method"), + NULL + }, + &enable_batch_sort, + false, + NULL, NULL, NULL + }, { {"enable_incremental_sort", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of incremental sort steps."), diff --git a/src/include/executor/nodeBatchSort.h b/src/include/executor/nodeBatchSort.h new file mode 100644 index 0000000000..66c68e0125 --- /dev/null +++ b/src/include/executor/nodeBatchSort.h @@ -0,0 +1,19 @@ + +#ifndef NODE_BATCH_SORT_H +#define NODE_BATCH_SORT_H + +#include "access/parallel.h" +#include "nodes/execnodes.h" + +extern BatchSortState *ExecInitBatchSort(BatchSort *node, EState *estate, int eflags); +extern void ExecEndBatchSort(BatchSortState *node); +extern void ExecReScanBatchSort(BatchSortState *node); + +/* parallel scan support */ +extern void ExecBatchSortEstimate(BatchSortState *node, ParallelContext *pcxt); +extern void ExecBatchSortInitializeDSM(BatchSortState *node, ParallelContext *pcxt); +extern void ExecBatchSortReInitializeDSM(BatchSortState *node, ParallelContext *pcxt); +extern void ExecBatchSortInitializeWorker(BatchSortState *node, ParallelWorkerContext *pwcxt); +extern void ExecShutdownBatchSort(BatchSortState *node); + +#endif /* NODE_BATCH_SORT_H */ \ No newline at end of file diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index ef448d67c7..14dde9fca3 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2012,6 +2012,21 @@ typedef struct SortState SharedSortInfo *shared_info; /* one entry per worker */ } SortState; +/* ---------------- + * BatchSortState information + * ---------------- + */ +typedef struct BatchSortState +{ + PlanState ps; /* its first field is NodeTag */ + void **batches; /* private state of tuplesort.c */ + List *groupFuns; /* hash function call info for each group-key */ + struct ParallelBatchSort + *parallel; /* parallel info, private in nodeBatchSort.c */ + int curBatch; /* current batch index */ + bool sort_Done; /* sort completed yet? */ +}BatchSortState; + /* ---------------- * Instrumentation information for IncrementalSort * ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 7ddd8c011b..ace4c98939 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -74,6 +74,7 @@ typedef enum NodeTag T_HashJoin, T_Material, T_Sort, + T_BatchSort, T_IncrementalSort, T_Group, T_Agg, @@ -131,6 +132,7 @@ typedef enum NodeTag T_HashJoinState, T_MaterialState, T_SortState, + T_BatchSortState, T_IncrementalSortState, T_GroupState, T_AggState, @@ -246,6 +248,7 @@ typedef enum NodeTag T_ProjectionPath, T_ProjectSetPath, T_SortPath, + T_BatchSortPath, T_IncrementalSortPath, T_GroupPath, T_UpperUniquePath, diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index dbe86e7af6..273bdda452 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -1649,6 +1649,16 @@ typedef struct SortPath Path *subpath; /* path representing input source */ } SortPath; +typedef struct BatchSortPath +{ + Path path; + Path *subpath; /* path representing input source */ + List *batchkeys; /* our result is not all ordered, only for each batch, + * so we can not use Path::pathkeys */ + List *batchgroup; /* a list of SortGroupClause for hash */ + uint32 numBatches; +}BatchSortPath; + /* * IncrementalSortPath */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 83e01074ed..f7ad7881dc 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -774,6 +774,18 @@ typedef struct Sort bool *nullsFirst; /* NULLS FIRST/LAST directions */ } Sort; +/* ---------------- + * batch sort node + * ---------------- + */ +typedef struct BatchSort +{ + Sort sort; + int numGroupCols; /* number of group-key columns */ + int numBatches; /* number of group */ + AttrNumber *grpColIdx; /* their indexes in the target list */ +}BatchSort; + /* ---------------- * incremental sort node * ---------------- diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6141654e47..37e6a12a6f 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -53,6 +53,7 @@ extern PGDLLIMPORT bool enable_indexonlyscan; extern PGDLLIMPORT bool enable_bitmapscan; extern PGDLLIMPORT bool enable_tidscan; extern PGDLLIMPORT bool enable_sort; +extern PGDLLIMPORT bool enable_batch_sort; extern PGDLLIMPORT bool enable_incremental_sort; extern PGDLLIMPORT bool enable_hashagg; extern PGDLLIMPORT bool enable_nestloop; @@ -102,6 +103,11 @@ extern void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples); +extern void cost_batchsort(Path *path, PlannerInfo *root, + List *batchkeys, Cost input_cost, + double tuples, int width, + Cost comparison_cost, int sort_mem, + uint32 numGroupCols, uint32 numBatchs); extern void cost_incremental_sort(Path *path, PlannerInfo *root, List *pathkeys, int presorted_keys, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 715a24ad29..816fc37739 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -17,6 +17,8 @@ #include "nodes/bitmapset.h" #include "nodes/pathnodes.h" +#define BATCH_SORT_MIN_BATCHES 2 +#define BATCH_SORT_MAX_BATCHES 512 /* * prototypes for pathnode.c @@ -195,6 +197,13 @@ extern SortPath *create_sort_path(PlannerInfo *root, Path *subpath, List *pathkeys, double limit_tuples); +extern BatchSortPath *create_batchsort_path(PlannerInfo *root, + RelOptInfo *rel, + Path *subpath, + List *pathkeys, + List *groupClause, + uint32 numBatches, + bool parallel_sort); extern GroupPath *create_group_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, diff --git a/src/include/optimizer/tlist.h b/src/include/optimizer/tlist.h index 1d4c7da545..9372cebeba 100644 --- a/src/include/optimizer/tlist.h +++ b/src/include/optimizer/tlist.h @@ -36,6 +36,7 @@ extern Oid *extract_grouping_collations(List *groupClause, List *tlist); extern AttrNumber *extract_grouping_cols(List *groupClause, List *tlist); extern bool grouping_is_sortable(List *groupClause); extern bool grouping_is_hashable(List *groupClause); +extern List *grouping_get_hashable(List *groupClause); extern PathTarget *make_pathtarget_from_tlist(List *tlist); extern List *make_tlist_from_pathtarget(PathTarget *target); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index a821ff4f15..f0b6dae97b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -952,7 +952,8 @@ typedef enum WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, - WAIT_EVENT_XACT_GROUP_UPDATE + WAIT_EVENT_XACT_GROUP_UPDATE, + WAIT_EVENT_BATCH_SORT_BUILD } WaitEventIPC; /* ---------- diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out index 11c6f50fbf..c200e38d12 100644 --- a/src/test/regress/expected/select_distinct.out +++ b/src/test/regress/expected/select_distinct.out @@ -306,3 +306,45 @@ SELECT null IS NOT DISTINCT FROM null as "yes"; t (1 row) +-- parallel distinct +BEGIN; +SET enable_batch_sort = ON; +SET min_parallel_table_scan_size =0; +SET parallel_tuple_cost = 0; +SET parallel_setup_cost = 0; +SET enable_indexonlyscan = OFF; +EXPLAIN (costs off) +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; + QUERY PLAN +---------------------------------------------------- + Aggregate + -> Gather + Workers Planned: 2 + -> Unique + -> Parallel BatchSort + Sort Key: tenk1.unique2 + -> Parallel Seq Scan on tenk1 +(7 rows) + +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; + count +------- + 10000 +(1 row) + +explain (costs off) +SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo; + QUERY PLAN +---------------------------------------------------------- + Gather + Workers Planned: 2 + -> Unique + -> Parallel BatchSort + Sort Key: tenk1.unique2 + -> Unique + -> Parallel BatchSort + Sort Key: tenk1.unique2 + -> Parallel Seq Scan on tenk1 +(9 rows) + +ABORT; diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 81bdacf59d..8ed047e520 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal; select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- + enable_batch_sort | off enable_bitmapscan | on enable_gathermerge | on enable_hashagg | on @@ -106,7 +107,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(18 rows) +(19 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out index 6e72e92d80..5a2be9aec9 100644 --- a/src/test/regress/expected/union.out +++ b/src/test/regress/expected/union.out @@ -1052,3 +1052,58 @@ where (x = 0) or (q1 >= q2 and q1 <= q2); 4567890123456789 | 4567890123456789 | 1 (6 rows) +-- parallel union +BEGIN; +SET enable_batch_sort = ON; +SET min_parallel_table_scan_size =0; +SET parallel_tuple_cost = 0; +SET parallel_setup_cost = 0; +SET enable_indexonlyscan = OFF; +EXPLAIN (costs off) +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; + QUERY PLAN +------------------------------------------------------------------ + Aggregate + -> Gather + Workers Planned: 2 + -> Unique + -> Parallel BatchSort + Sort Key: tenk1.unique2 + -> Parallel Append + -> Parallel Seq Scan on tenk1 + -> Parallel Seq Scan on tenk1 tenk1_1 +(9 rows) + +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; + count +------- + 10000 +(1 row) + +EXPLAIN (costs off) +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (tenk1_1.unique2 = tenk1.unique2) + -> Unique + -> Parallel BatchSort + Sort Key: tenk1_1.unique2 + -> Parallel Append + -> Parallel Seq Scan on tenk1 tenk1_1 + -> Parallel Seq Scan on tenk1 tenk1_2 + -> Parallel Hash + -> Parallel Seq Scan on tenk1 +(14 rows) + +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2); + count +------- + 10000 +(1 row) + +ABORT; diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql index 33102744eb..3ff7acf64d 100644 --- a/src/test/regress/sql/select_distinct.sql +++ b/src/test/regress/sql/select_distinct.sql @@ -135,3 +135,17 @@ SELECT 1 IS NOT DISTINCT FROM 2 as "no"; SELECT 2 IS NOT DISTINCT FROM 2 as "yes"; SELECT 2 IS NOT DISTINCT FROM null as "no"; SELECT null IS NOT DISTINCT FROM null as "yes"; + +-- parallel distinct +BEGIN; +SET enable_batch_sort = ON; +SET min_parallel_table_scan_size =0; +SET parallel_tuple_cost = 0; +SET parallel_setup_cost = 0; +SET enable_indexonlyscan = OFF; +EXPLAIN (costs off) +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; +explain (costs off) +SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo; +ABORT; \ No newline at end of file diff --git a/src/test/regress/sql/union.sql b/src/test/regress/sql/union.sql index 5f4881d594..a1cb1bb7ac 100644 --- a/src/test/regress/sql/union.sql +++ b/src/test/regress/sql/union.sql @@ -440,3 +440,18 @@ select * from union all select *, 1 as x from int8_tbl b) ss where (x = 0) or (q1 >= q2 and q1 <= q2); + +-- parallel union +BEGIN; +SET enable_batch_sort = ON; +SET min_parallel_table_scan_size =0; +SET parallel_tuple_cost = 0; +SET parallel_setup_cost = 0; +SET enable_indexonlyscan = OFF; +EXPLAIN (costs off) +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; +EXPLAIN (costs off) +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2); +SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2); +ABORT; \ No newline at end of file -- 2.16.3