From 54ba12269794d7c714c7396da6a41fb47a442b06 Mon Sep 17 00:00:00 2001 From: bucoo Date: Wed, 28 Oct 2020 16:29:06 +0800 Subject: [PATCH 4/4] Parallel distinct union aggregate and grouping sets support using batch hash aggregate --- src/backend/commands/explain.c | 4 + src/backend/executor/nodeAgg.c | 543 ++++++++++++++++++---- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/outfuncs.c | 1 + src/backend/nodes/readfuncs.c | 1 + src/backend/optimizer/path/costsize.c | 31 +- src/backend/optimizer/plan/createplan.c | 12 +- src/backend/optimizer/plan/planner.c | 131 ++++++ src/backend/optimizer/prep/prepunion.c | 27 ++ src/backend/postmaster/pgstat.c | 3 + src/backend/utils/misc/guc.c | 9 + src/include/executor/nodeAgg.h | 2 + src/include/nodes/execnodes.h | 3 + src/include/nodes/nodes.h | 3 +- src/include/nodes/plannodes.h | 1 + src/include/optimizer/cost.h | 1 + src/include/optimizer/pathnode.h | 2 + src/include/pgstat.h | 3 +- src/test/regress/expected/groupingsets.out | 65 +++ src/test/regress/expected/partition_aggregate.out | 64 +++ src/test/regress/expected/select_distinct.out | 35 ++ src/test/regress/expected/sysviews.out | 3 +- src/test/regress/expected/union.out | 30 ++ src/test/regress/sql/groupingsets.sql | 10 + src/test/regress/sql/partition_aggregate.sql | 20 + src/test/regress/sql/select_distinct.sql | 9 + src/test/regress/sql/union.sql | 13 + 27 files changed, 941 insertions(+), 86 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 16a1fb035d..d4b336aa82 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1302,6 +1302,10 @@ ExplainNode(PlanState *planstate, List *ancestors, pname = "MixedAggregate"; strategy = "Mixed"; break; + case AGG_BATCH_HASH: + pname = "BatchHashAggregate"; + strategy = "BatchHashed"; + break; default: pname = "Aggregate ???"; strategy = "???"; diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 75e5bbf209..08024532a3 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -256,7 +256,10 @@ #include "optimizer/optimizer.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" +#include "pgstat.h" +#include "storage/barrier.h" #include "utils/acl.h" +#include "utils/batchstore.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/dynahash.h" @@ -311,6 +314,11 @@ */ #define CHUNKHDRSZ 16 +#define SHARED_AGG_MAGIC UINT64CONST(0x4141bbcd61518e52) +#define SHARED_AGG_KEY_INFO UINT64CONST(0xD000000000000001) +#define SHARED_AGG_KEY_BARRIER UINT64CONST(0xD000000000000002) +#define SHARED_AGG_KEY_FILE_SET UINT64CONST(0xD000000000000003) + /* * Track all tapes needed for a HashAgg that spills. We don't know the maximum * number of tapes needed at the start of the algorithm (because it can @@ -446,7 +454,7 @@ static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, int input_tapenum, int setno, int64 input_tuples, double input_card, int used_bits); -static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); +static bool hashagg_batch_read(void *userdata, TupleTableSlot *slot, uint32 *hashp); static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, double input_groups, double hashentrysize); @@ -473,6 +481,13 @@ static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, List *transnos); +static TupleTableSlot *ExecBatchHashAggPrepare(PlanState *pstate); +static TupleTableSlot *ExecBatchHashAgg(PlanState *pstate); +static bool ExecBatchHashAggNextBatch(AggState *node); +#if 0 +static void agg_batch_fill_hash_table(AggState *aggstate); +static void ClearBatchAgg(AggState *node); +#endif /* @@ -1338,7 +1353,8 @@ finalize_aggregates(AggState *aggstate, if (pertrans->numSortCols > 0) { Assert(aggstate->aggstrategy != AGG_HASHED && - aggstate->aggstrategy != AGG_MIXED); + aggstate->aggstrategy != AGG_MIXED && + aggstate->aggstrategy != AGG_BATCH_HASH); if (pertrans->numInputs == 1) process_ordered_aggregate_single(aggstate, @@ -1510,8 +1526,10 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets) MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory; MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory; Size additionalsize; + bool use_hash_iv; Assert(aggstate->aggstrategy == AGG_HASHED || + aggstate->aggstrategy == AGG_BATCH_HASH || aggstate->aggstrategy == AGG_MIXED); /* @@ -1521,6 +1539,10 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets) * tuple of each group. */ additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData); + if (aggstate->aggstrategy == AGG_BATCH_HASH) + use_hash_iv = false; + else + use_hash_iv = DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit); perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps, perhash->hashslot->tts_tupleDescriptor, @@ -1534,7 +1556,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets) metacxt, hashcxt, tmpcxt, - DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); + use_hash_iv); } /* @@ -1633,10 +1655,15 @@ find_hash_columns(AggState *aggstate) palloc(maxCols * sizeof(AttrNumber)); perhash->hashGrpColIdxHash = palloc(perhash->numCols * sizeof(AttrNumber)); + perhash->colnos_needed = bms_copy(aggregated_colnos); /* Add all the grouping columns to colnos */ for (i = 0; i < perhash->numCols; i++) + { colnos = bms_add_member(colnos, grpColIdx[i]); + perhash->colnos_needed = bms_add_member(perhash->colnos_needed, + grpColIdx[i]); + } /* * First build mapping for columns directly hashed. These are the @@ -1746,9 +1773,11 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) int j = nullcheck ? 1 : 0; Assert(aggstate->aggstrategy == AGG_HASHED || + aggstate->aggstrategy == AGG_BATCH_HASH || aggstate->aggstrategy == AGG_MIXED); - if (aggstate->aggstrategy == AGG_HASHED) + if (aggstate->aggstrategy == AGG_HASHED || + aggstate->aggstrategy == AGG_BATCH_HASH) phase = &aggstate->phases[0]; else /* AGG_MIXED */ phase = &aggstate->phases[1]; @@ -2170,6 +2199,9 @@ ExecAgg(PlanState *pstate) case AGG_SORTED: result = agg_retrieve_direct(node); break; + case AGG_BATCH_HASH: + elog(ERROR, "batch hash should not run in function ExecAgg"); + break; } if (!TupIsNull(result)) @@ -2570,35 +2602,20 @@ agg_fill_hash_table(AggState *aggstate) &aggstate->perhash[0].hashiter); } -/* - * If any data was spilled during hash aggregation, reset the hash table and - * reprocess one batch of spilled data. After reprocessing a batch, the hash - * table will again contain data, ready to be consumed by - * agg_retrieve_hash_table_in_memory(). - * - * Should only be called after all in memory hash table entries have been - * finalized and emitted. - * - * Return false when input is exhausted and there's no more work to be done; - * otherwise return true. - */ -static bool -agg_refill_hash_table(AggState *aggstate) +static void +agg_refill_hash_table_ex(AggState *aggstate, + bool (*read_tup)(void *userdata, TupleTableSlot *slot, uint32 *hash), + void *userdata, + int used_bits, + double input_groups, + int setno) { - HashAggBatch *batch; AggStatePerHash perhash; HashAggSpill spill; - HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; bool spill_initialized = false; - if (aggstate->hash_batches == NIL) - return false; - - batch = linitial(aggstate->hash_batches); - aggstate->hash_batches = list_delete_first(aggstate->hash_batches); - - hash_agg_set_limits(aggstate->hashentrysize, batch->input_card, - batch->used_bits, &aggstate->hash_mem_limit, + hash_agg_set_limits(aggstate->hashentrysize, input_groups, + used_bits, &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, NULL); /* there could be residual pergroup pointers; clear them */ @@ -2626,7 +2643,7 @@ agg_refill_hash_table(AggState *aggstate) aggstate->phase = &aggstate->phases[aggstate->current_phase]; } - select_current_set(aggstate, batch->setno, true); + select_current_set(aggstate, setno, true); perhash = &aggstate->perhash[aggstate->current_set]; @@ -2644,31 +2661,27 @@ agg_refill_hash_table(AggState *aggstate) TupleTableSlot *spillslot = aggstate->hash_spill_rslot; TupleTableSlot *hashslot = perhash->hashslot; TupleHashEntry entry; - MinimalTuple tuple; uint32 hash; bool isnew = false; bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew; CHECK_FOR_INTERRUPTS(); - tuple = hashagg_batch_read(batch, &hash); - if (tuple == NULL) + if ((*read_tup)(userdata, spillslot, &hash) == false) break; - ExecStoreMinimalTuple(tuple, spillslot, true); aggstate->tmpcontext->ecxt_outertuple = spillslot; prepare_hash_slot(perhash, - aggstate->tmpcontext->ecxt_outertuple, + spillslot, hashslot); - entry = LookupTupleHashEntryHash( - perhash->hashtable, hashslot, p_isnew, hash); + entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew, hash); if (entry != NULL) { if (isnew) initialize_hash_entry(aggstate, perhash->hashtable, entry); - aggstate->hash_pergroup[batch->setno] = entry->additional; + aggstate->hash_pergroup[setno] = entry->additional; advance_aggregates(aggstate); } else @@ -2680,13 +2693,13 @@ agg_refill_hash_table(AggState *aggstate) * that we don't assign tapes that will never be used. */ spill_initialized = true; - hashagg_spill_init(&spill, tapeinfo, batch->used_bits, - batch->input_card, aggstate->hashentrysize); + hashagg_spill_init(&spill, aggstate->hash_tapeinfo, used_bits, + input_groups, aggstate->hashentrysize); } /* no memory for a new group, spill */ hashagg_spill_tuple(aggstate, &spill, spillslot, hash); - aggstate->hash_pergroup[batch->setno] = NULL; + aggstate->hash_pergroup[setno] = NULL; } /* @@ -2696,15 +2709,13 @@ agg_refill_hash_table(AggState *aggstate) ResetExprContext(aggstate->tmpcontext); } - hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum); - /* change back to phase 0 */ aggstate->current_phase = 0; aggstate->phase = &aggstate->phases[aggstate->current_phase]; if (spill_initialized) { - hashagg_spill_finish(aggstate, &spill, batch->setno); + hashagg_spill_finish(aggstate, &spill, setno); hash_agg_update_metrics(aggstate, true, spill.npartitions); } else @@ -2713,9 +2724,43 @@ agg_refill_hash_table(AggState *aggstate) aggstate->hash_spill_mode = false; /* prepare to walk the first hash table */ - select_current_set(aggstate, batch->setno, true); - ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable, - &aggstate->perhash[batch->setno].hashiter); + select_current_set(aggstate, setno, true); + ResetTupleHashIterator(aggstate->perhash[setno].hashtable, + &aggstate->perhash[setno].hashiter); +} + +/* + * If any data was spilled during hash aggregation, reset the hash table and + * reprocess one batch of spilled data. After reprocessing a batch, the hash + * table will again contain data, ready to be consumed by + * agg_retrieve_hash_table_in_memory(). + * + * Should only be called after all in memory hash table entries have been + * finalized and emitted. + * + * Return false when input is exhausted and there's no more work to be done; + * otherwise return true. + */ +static bool +agg_refill_hash_table(AggState *aggstate) +{ + HashAggBatch *batch; + + if (aggstate->hash_batches == NIL) + return false; + + batch = linitial(aggstate->hash_batches); + aggstate->hash_batches = list_delete_first(aggstate->hash_batches); + + agg_refill_hash_table_ex(aggstate, + hashagg_batch_read, + batch, + batch->used_bits, + batch->input_card, + batch->setno); + + hashagg_tapeinfo_release(aggstate->hash_tapeinfo, + batch->input_tapenum); pfree(batch); @@ -3056,9 +3101,10 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, * read_spilled_tuple * read the next tuple from a batch's tape. Return NULL if no more. */ -static MinimalTuple -hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) +static bool +hashagg_batch_read(void *userdata, TupleTableSlot *slot, uint32 *hashp) { + HashAggBatch *batch = userdata; LogicalTapeSet *tapeset = batch->tapeset; int tapenum = batch->input_tapenum; MinimalTuple tuple; @@ -3068,7 +3114,7 @@ hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32)); if (nread == 0) - return NULL; + return false; if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), @@ -3096,7 +3142,8 @@ hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", tapenum, t_len - sizeof(uint32), nread))); - return tuple; + ExecStoreMinimalTuple(tuple, slot, true); + return true; } /* @@ -3257,6 +3304,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) int i = 0; int j = 0; bool use_hashing = (node->aggstrategy == AGG_HASHED || + node->aggstrategy == AGG_BATCH_HASH || node->aggstrategy == AGG_MIXED); /* check for unsupported flags */ @@ -3268,7 +3316,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate = makeNode(AggState); aggstate->ss.ps.plan = (Plan *) node; aggstate->ss.ps.state = estate; - aggstate->ss.ps.ExecProcNode = ExecAgg; + if (node->aggstrategy == AGG_BATCH_HASH) + aggstate->ss.ps.ExecProcNode = ExecBatchHashAggPrepare; + else + aggstate->ss.ps.ExecProcNode = ExecAgg; aggstate->aggs = NIL; aggstate->numaggs = 0; @@ -3315,7 +3366,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * additional AGG_HASHED aggs become part of phase 0, but all * others add an extra phase. */ - if (agg->aggstrategy != AGG_HASHED) + if (agg->aggstrategy != AGG_HASHED && + agg->aggstrategy != AGG_BATCH_HASH) ++numPhases; else ++numHashes; @@ -3362,7 +3414,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * If we are doing a hashed aggregation then the child plan does not need * to handle REWIND efficiently; see ExecReScanAgg. */ - if (node->aggstrategy == AGG_HASHED) + if (node->aggstrategy == AGG_HASHED || + node->aggstrategy == AGG_BATCH_HASH) eflags &= ~EXEC_FLAG_REWIND; outerPlan = outerPlan(node); outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); @@ -3370,9 +3423,16 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* * initialize source tuple type. */ - aggstate->ss.ps.outerops = - ExecGetResultSlotOps(outerPlanState(&aggstate->ss), - &aggstate->ss.ps.outeropsfixed); + if (node->aggstrategy == AGG_BATCH_HASH) + { + aggstate->ss.ps.outerops = &TTSOpsMinimalTuple; + aggstate->ss.ps.outeropsfixed = true; + }else + { + aggstate->ss.ps.outerops = + ExecGetResultSlotOps(outerPlanState(&aggstate->ss), + &aggstate->ss.ps.outeropsfixed); + } aggstate->ss.ps.outeropsset = true; ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss, @@ -3470,6 +3530,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) Assert(phase <= 1 || sortnode); if (aggnode->aggstrategy == AGG_HASHED + || aggnode->aggstrategy == AGG_BATCH_HASH || aggnode->aggstrategy == AGG_MIXED) { AggStatePerPhase phasedata = &aggstate->phases[0]; @@ -3678,7 +3739,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * hashing is being done too, then phase 0 is processed last); but if only * hashing is being done, then phase 0 is all there is. */ - if (node->aggstrategy == AGG_HASHED) + if (node->aggstrategy == AGG_HASHED || + node->aggstrategy == AGG_BATCH_HASH) { aggstate->current_phase = 0; initialize_phase(aggstate, 0); @@ -4066,7 +4128,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) dohash = false; dosort = true; } - else if (phase->aggstrategy == AGG_HASHED) + else if (phase->aggstrategy == AGG_HASHED || + phase->aggstrategy == AGG_BATCH_HASH) { dohash = true; dosort = false; @@ -4666,6 +4729,14 @@ ExecReScanAgg(AggState *node) return; } } + else if (node->aggstrategy == AGG_BATCH_HASH) + { + if (!node->batch_filled) + return; + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("batch hash not support rescan yet!"))); + } /* Make sure we have closed any open tuplesorts */ for (transno = 0; transno < node->numtrans; transno++) @@ -4958,6 +5029,54 @@ aggregate_dummy(PG_FUNCTION_ARGS) * ---------------------------------------------------------------- */ +static Size ExecAggEstimateToc(AggState *node, ParallelContext *pcxt) +{ + Size size; + shm_toc_estimator estimator; + ListCell *lc; + + /* don't need this if no workers */ + if (pcxt->nworkers == 0) + return 0; + /* don't need this if not instrumenting and not batch hash agg */ + if (!node->ss.ps.instrument && + node->aggstrategy != AGG_BATCH_HASH) + return 0; + + shm_toc_initialize_estimator(&estimator); + if (node->ss.ps.instrument) + { + size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation)); + size = add_size(size, offsetof(SharedAggInfo, sinstrument)); + shm_toc_estimate_chunk(&estimator, size); + shm_toc_estimate_keys(&estimator, 1); + } + + if (node->aggstrategy == AGG_BATCH_HASH) + { + int nparticipants = pcxt->nworkers + 1; + shm_toc_estimate_chunk(&estimator, sizeof(Barrier)); + shm_toc_estimate_chunk(&estimator, sizeof(SharedFileSet)); + shm_toc_estimate_keys(&estimator, 2); + + size = bs_parallel_hash_estimate(castNode(Agg, node->ss.ps.plan)->numBatches, + nparticipants); + shm_toc_estimate_chunk(&estimator, size); + shm_toc_estimate_keys(&estimator, 1); + + foreach (lc, castNode(Agg, node->ss.ps.plan)->chain) + { + Agg *agg = lfirst_node(Agg, lc); + Assert(agg->aggstrategy == AGG_BATCH_HASH); + size = bs_parallel_hash_estimate(agg->numBatches, nparticipants); + shm_toc_estimate_chunk(&estimator, size); + shm_toc_estimate_keys(&estimator, 1); + } + } + + return shm_toc_estimate(&estimator); +} + /* ---------------------------------------------------------------- * ExecAggEstimate * @@ -4967,14 +5086,7 @@ aggregate_dummy(PG_FUNCTION_ARGS) void ExecAggEstimate(AggState *node, ParallelContext *pcxt) { - Size size; - - /* don't need this if not instrumenting or no workers */ - if (!node->ss.ps.instrument || pcxt->nworkers == 0) - return; - - size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation)); - size = add_size(size, offsetof(SharedAggInfo, sinstrument)); + Size size = ExecAggEstimateToc(node, pcxt); shm_toc_estimate_chunk(&pcxt->estimator, size); shm_toc_estimate_keys(&pcxt->estimator, 1); } @@ -4989,19 +5101,77 @@ void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt) { Size size; + shm_toc *toc; + void *addr; - /* don't need this if not instrumenting or no workers */ - if (!node->ss.ps.instrument || pcxt->nworkers == 0) + size = ExecAggEstimateToc(node, pcxt); + if (size == 0) return; - size = offsetof(SharedAggInfo, sinstrument) - + pcxt->nworkers * sizeof(AggregateInstrumentation); - node->shared_info = shm_toc_allocate(pcxt->toc, size); - /* ensure any unfilled slots will contain zeroes */ - memset(node->shared_info, 0, size); - node->shared_info->num_workers = pcxt->nworkers; - shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, - node->shared_info); + addr = shm_toc_allocate(pcxt->toc, size); + toc = shm_toc_create(SHARED_AGG_MAGIC, addr, size); + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, addr); + + if (node->ss.ps.instrument) + { + size = offsetof(SharedAggInfo, sinstrument) + + pcxt->nworkers * sizeof(AggregateInstrumentation); + node->shared_info = shm_toc_allocate(toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(toc, SHARED_AGG_KEY_INFO, node->shared_info); + } + + if (node->aggstrategy == AGG_BATCH_HASH) + { + int nparticipants = pcxt->nworkers + 1; + int i = 0; + ListCell *lc; + Agg *agg; + SharedFileSet *fset = shm_toc_allocate(toc, sizeof(SharedFileSet)); + SharedFileSetInit(fset, pcxt->seg); + shm_toc_insert(toc, SHARED_AGG_KEY_FILE_SET, fset); + + node->batch_barrier = shm_toc_allocate(toc, sizeof(Barrier)); + BarrierInit(node->batch_barrier, 0); + shm_toc_insert(toc, SHARED_AGG_KEY_BARRIER, node->batch_barrier); + + agg = castNode(Agg, node->ss.ps.plan); + Assert(agg->numBatches > 0); + size = bs_parallel_hash_estimate(agg->numBatches, nparticipants); + addr = shm_toc_allocate(toc, size); + shm_toc_insert(toc, 0, addr); + node->perhash[0].batch_store = bs_init_parallel_hash(agg->numBatches, + nparticipants, + 0, + addr, + pcxt->seg, + fset, + "BatchHashAgg"); + + i = 1; + foreach (lc, agg->chain) + { + Agg *subagg = lfirst_node(Agg, lc); + char name[30]; + Assert(subagg->aggstrategy == AGG_BATCH_HASH && + subagg->numBatches > 0); + Assert(i < node->num_hashes); + size = bs_parallel_hash_estimate(subagg->numBatches, nparticipants); + addr = shm_toc_allocate(toc, size); + shm_toc_insert(toc, i, addr); + sprintf(name, "BatchHashAgg%d", i); + node->perhash[i].batch_store = bs_init_parallel_hash(subagg->numBatches, + nparticipants, + 0, + addr, + pcxt->seg, + fset, + name); + ++i; + } + } } /* ---------------------------------------------------------------- @@ -5013,8 +5183,43 @@ ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt) void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt) { - node->shared_info = - shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); + shm_toc *toc; + void *addr = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); + if (addr == NULL) + { + Assert(node->aggstrategy != AGG_BATCH_HASH); + return; + } + toc = shm_toc_attach(SHARED_AGG_MAGIC, addr); + node->shared_info = shm_toc_lookup(toc, SHARED_AGG_KEY_INFO, true); + + if (node->aggstrategy == AGG_BATCH_HASH) + { + int i; + ListCell *lc; + Agg *agg = castNode(Agg, node->ss.ps.plan); + SharedFileSet *fset = shm_toc_lookup(toc, SHARED_AGG_KEY_FILE_SET, false); + + node->batch_barrier = shm_toc_lookup(toc, SHARED_AGG_KEY_BARRIER, false); + node->perhash[0].batch_store = + bs_attach_parallel_hash(shm_toc_lookup(toc, 0, false), + pwcxt->seg, + fset, + ParallelWorkerNumber+1); + + i = 1; + foreach (lc, agg->chain) + { + Assert(lfirst_node(Agg, lc)->aggstrategy == AGG_BATCH_HASH); + Assert (inum_hashes); + node->perhash[i].batch_store = + bs_attach_parallel_hash(shm_toc_lookup(toc, i, false), + pwcxt->seg, + fset, + ParallelWorkerNumber+1); + ++i; + } + } } /* ---------------------------------------------------------------- @@ -5038,3 +5243,185 @@ ExecAggRetrieveInstrumentation(AggState *node) memcpy(si, node->shared_info, size); node->shared_info = si; } + +static TupleTableSlot *ExecBatchHashAggPrepare(PlanState *pstate) +{ + int i,x,max_colno_needed; + MinimalTuple mtup; + TupleTableSlot *inputslot; + PlanState *outer = outerPlanState(pstate); + AggState *node = castNode(AggState, pstate); + ExprContext *tmpcontext = node->tmpcontext; + bool *isnull; + Bitmapset *colnos_needed; + Bitmapset **colnos_neededs; + Assert(node->aggstrategy == AGG_BATCH_HASH); + Assert(node->perhash[0].batch_store == NULL || + node->batch_barrier != NULL); + + if (node->agg_done) + return NULL; + + /* create batch store if not parallel */ + if (node->perhash[0].batch_store == NULL) + { + MemoryContext oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(pstate)); + Agg *agg = castNode(Agg, pstate->plan); + ListCell *lc; + + node->perhash[0].batch_store = bs_begin_hash(agg->numBatches); + + i = 1; + foreach (lc, agg->chain) + { + Agg *subagg = lfirst_node(Agg, lc); + Assert(subagg->aggstrategy == AGG_BATCH_HASH); + Assert(i < node->num_hashes); + node->perhash[i].batch_store = bs_begin_hash(subagg->numBatches); + ++i; + } + + MemoryContextSwitchTo(oldcontext); + } + + if (node->batch_barrier && + BarrierAttach(node->batch_barrier) > 0) + { + BarrierDetach(node->batch_barrier); + goto batches_already_done_; + } + + /* read for make minimal tuple */ + isnull = palloc(sizeof(isnull[0]) * node->hash_spill_wslot->tts_tupleDescriptor->natts); + memset(isnull, true, sizeof(isnull[0]) * node->hash_spill_wslot->tts_tupleDescriptor->natts); + max_colno_needed = node->max_colno_needed; + + /* convert Attribute numbers to index(start with 0) */ + colnos_neededs = palloc(sizeof(colnos_neededs[0]) * node->num_hashes); + for (i=0;inum_hashes;++i) + { + AggStatePerHash perhash = &node->perhash[i]; + colnos_needed = NULL; + x = -1; + while ((x=bms_next_member(perhash->colnos_needed, x)) >= 0) + { + Assert(x > 0); + colnos_needed = bms_add_member(colnos_needed, x-1); + } + colnos_neededs[i] = colnos_needed; + } + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + inputslot = ExecProcNode(outer); + if (TupIsNull(inputslot)) + break; + + tmpcontext->ecxt_outertuple = inputslot; + slot_getsomeattrs(inputslot, max_colno_needed); + + for (i=0;inum_hashes;++i) + { + AggStatePerHash perhash = &node->perhash[i]; + TupleTableSlot *hashslot = perhash->hashslot; + + CHECK_FOR_INTERRUPTS(); + + /* mark unneeded columns as null */ + memset(isnull, true, sizeof(isnull[0]) * max_colno_needed); + colnos_needed = colnos_neededs[i]; + x = -1; + while ((x = bms_next_member(colnos_needed, x)) >= 0) + isnull[x] = inputslot->tts_isnull[x]; + /* make minimal tuple from we needed columns for this set */ + mtup = heap_form_minimal_tuple(inputslot->tts_tupleDescriptor, + inputslot->tts_values, + isnull); + + prepare_hash_slot(perhash, inputslot, hashslot); + + bs_write_hash(perhash->batch_store, + mtup, + TupleHashTableHash(perhash->hashtable, hashslot)); + pfree(mtup); + ResetExprContext(tmpcontext); + } + } + + for (i=0;inum_hashes;++i) + bs_end_write(node->perhash[i].batch_store); + if (node->batch_barrier) + { + BarrierArriveAndWait(node->batch_barrier, WAIT_EVENT_BATCH_HASH_BUILD); + BarrierDetach(node->batch_barrier); + } + + /* clear temp memory */ + for (i=0;inum_hashes;++i) + bms_free(colnos_neededs[i]); + pfree(colnos_neededs); + pfree(isnull); + +batches_already_done_: + node->batch_filled = true; + node->current_batch = 0; + if (ExecBatchHashAggNextBatch(node) == false) + return NULL; + + ExecSetExecProcNode(pstate, ExecBatchHashAgg); + return ExecBatchHashAgg(pstate); +} + +static TupleTableSlot *ExecBatchHashAgg(PlanState *pstate) +{ + AggState *node = castNode(AggState, pstate); + TupleTableSlot *result; + +reloop: + result = agg_retrieve_hash_table_in_memory(node); + if (unlikely(result == NULL)) + { + if (agg_refill_hash_table(node) == false && + ExecBatchHashAggNextBatch(node) == false) + { + return NULL; + }else + { + goto reloop; + } + } + + return result; +} + +static bool +batchstore_read(void *userdata, TupleTableSlot *slot, uint32 *hashp) +{ + MinimalTuple mtup = bs_read_hash(userdata, hashp); + if (unlikely(mtup == NULL)) + return false; + ExecStoreMinimalTuple(mtup, slot, false); + return true; +} + +static bool ExecBatchHashAggNextBatch(AggState *node) +{ + while (bs_next_batch(node->perhash[node->current_batch].batch_store, false) == false) + { + ++node->current_batch; + if (node->current_batch >= node->num_hashes) + { + node->agg_done = true; + return false; + } + } + + agg_refill_hash_table_ex(node, + batchstore_read, + node->perhash[node->current_batch].batch_store, + 0, + node->perhash[node->current_batch].aggnode->numGroups, + node->current_batch); + return true; +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 958964f1fa..8649e7d610 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1042,6 +1042,7 @@ _copyAgg(const Agg *from) COPY_BITMAPSET_FIELD(aggParams); COPY_NODE_FIELD(groupingSets); COPY_NODE_FIELD(chain); + COPY_SCALAR_FIELD(numBatches); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index a8dd7ef23f..8893bfab29 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -787,6 +787,7 @@ _outAgg(StringInfo str, const Agg *node) WRITE_BITMAPSET_FIELD(aggParams); WRITE_NODE_FIELD(groupingSets); WRITE_NODE_FIELD(chain); + WRITE_UINT_FIELD(numBatches); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 2c6eb4362c..03ef6bc5da 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2249,6 +2249,7 @@ _readAgg(void) READ_BITMAPSET_FIELD(aggParams); READ_NODE_FIELD(groupingSets); READ_NODE_FIELD(chain); + READ_UINT_FIELD(numBatches); READ_DONE(); } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 32d0dc8ce5..4143b69178 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -141,6 +141,7 @@ bool enable_parallel_append = true; bool enable_parallel_hash = true; bool enable_partition_pruning = true; bool enable_batch_sort = true; +bool enable_batch_hashagg = false; typedef struct { @@ -2404,7 +2405,7 @@ cost_agg(Path *path, PlannerInfo *root, /* Use all-zero per-aggregate costs if NULL is passed */ if (aggcosts == NULL) { - Assert(aggstrategy == AGG_HASHED); + Assert(aggstrategy == AGG_HASHED || aggstrategy == AGG_BATCH_HASH); MemSet(&dummy_aggcosts, 0, sizeof(AggClauseCosts)); aggcosts = &dummy_aggcosts; } @@ -2463,10 +2464,13 @@ cost_agg(Path *path, PlannerInfo *root, } else { - /* must be AGG_HASHED */ + /* must be AGG_HASHED or AGG_BATCH_HASH */ startup_cost = input_total_cost; - if (!enable_hashagg) + if ((aggstrategy == AGG_HASHED && !enable_hashagg) || + (aggstrategy == AGG_BATCH_HASH && !enable_batch_hashagg)) + { startup_cost += disable_cost; + } startup_cost += aggcosts->transCost.startup; startup_cost += aggcosts->transCost.per_tuple * input_tuples; /* cost of computing hash value */ @@ -2478,6 +2482,15 @@ cost_agg(Path *path, PlannerInfo *root, /* cost of retrieving from hash table */ total_cost += cpu_tuple_cost * numGroups; output_tuples = numGroups; + + if (aggstrategy == AGG_BATCH_HASH) + { + double nbytes = relation_byte_size(input_tuples, input_width); + double npages = ceil(nbytes / BLCKSZ); + double material_cost = (seq_page_cost * npages); + startup_cost += material_cost; + total_cost += material_cost; + } } /* @@ -2493,7 +2506,9 @@ cost_agg(Path *path, PlannerInfo *root, * Accrue writes (spilled tuples) to startup_cost and to total_cost; * accrue reads only to total_cost. */ - if (aggstrategy == AGG_HASHED || aggstrategy == AGG_MIXED) + if (aggstrategy == AGG_HASHED || + aggstrategy == AGG_BATCH_HASH || + aggstrategy == AGG_MIXED) { double pages; double pages_written = 0.0; @@ -2506,6 +2521,14 @@ cost_agg(Path *path, PlannerInfo *root, int num_partitions; int depth; + if (aggstrategy == AGG_BATCH_HASH && + numGroups > BATCH_STORE_MAX_BATCH) + { + numGroups /= BATCH_STORE_MAX_BATCH; + if (numGroups < 1.0) + numGroups = 1.0; + } + /* * Estimate number of batches based on the computed limits. If less * than or equal to one, all groups are expected to fit in memory; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 85969388c2..a87dd633dc 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -30,6 +30,7 @@ #include "optimizer/cost.h" #include "optimizer/optimizer.h" #include "optimizer/paramassign.h" +#include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/placeholder.h" #include "optimizer/plancat.h" @@ -2327,7 +2328,9 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) if (!rollup->is_hashed) is_first_sort = false; - if (rollup->is_hashed) + if (best_path->aggstrategy == AGG_BATCH_HASH) + strat = AGG_BATCH_HASH; + else if (rollup->is_hashed) strat = AGG_HASHED; else if (list_length(linitial(rollup->gsets)) == 0) strat = AGG_PLAIN; @@ -6417,6 +6420,13 @@ make_agg(List *tlist, List *qual, plan->lefttree = lefttree; plan->righttree = NULL; + if (aggstrategy == AGG_BATCH_HASH) + { + node->numBatches = (int32)numGroups; + if (node->numBatches > BATCH_STORE_MAX_BATCH) + node->numBatches = BATCH_STORE_MAX_BATCH; + } + return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 27680dbeb3..5d854b26b5 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -176,6 +176,12 @@ static void consider_groupingsets_paths(PlannerInfo *root, grouping_sets_data *gd, const AggClauseCosts *agg_costs, double dNumGroups); +static void consider_parallel_hash_groupingsets_paths(PlannerInfo *root, + RelOptInfo *grouped_rel, + Path *path, + grouping_sets_data *gd, + const AggClauseCosts *agg_costs, + double dNumGroups); static RelOptInfo *create_window_paths(PlannerInfo *root, RelOptInfo *input_rel, PathTarget *input_target, @@ -4538,6 +4544,77 @@ consider_groupingsets_paths(PlannerInfo *root, dNumGroups)); } +static void +consider_parallel_hash_groupingsets_paths(PlannerInfo *root, + RelOptInfo *grouped_rel, + Path *path, + grouping_sets_data *gd, + const AggClauseCosts *agg_costs, + double dNumGroups) +{ + int hash_mem = get_hash_mem(); + List *new_rollups = NIL; + List *sets_data; + ListCell *lc; + RollupData *rollup; + GroupingSetData *gs; + double hashsize; + double numGroups; + + sets_data = list_copy(gd->unsortable_sets); + foreach (lc, gd->rollups) + { + rollup = lfirst_node(RollupData, lc); + if (rollup->hashable == false) + { + list_free(sets_data); + return; + } + sets_data = list_concat(sets_data, rollup->gsets_data); + } + foreach (lc, sets_data) + { + gs = lfirst_node(GroupingSetData, lc); + numGroups = gs->numGroups / BATCH_STORE_MAX_BATCH; + if (numGroups < 1.0) + numGroups = 1.0; + hashsize = estimate_hashagg_tablesize(path, + agg_costs, + numGroups); + if (hashsize > hash_mem * 1024L) + { + list_free(sets_data); + list_free_deep(new_rollups); + return; + } + + rollup = makeNode(RollupData); + rollup->groupClause = preprocess_groupclause(root, gs->set); + rollup->gsets_data = list_make1(gs); + rollup->gsets = remap_to_groupclause_idx(rollup->groupClause, + rollup->gsets_data, + gd->tleref_to_colnum_map); + rollup->numGroups = gs->numGroups; + rollup->hashable = true; + rollup->is_hashed = true; + new_rollups = lappend(new_rollups, rollup); + } + + numGroups = dNumGroups / path->parallel_workers; + if (numGroups < list_length(new_rollups)) + numGroups = list_length(new_rollups); + path = (Path*)create_groupingsets_path(root, + grouped_rel, + path, + (List*) root->parse->havingQual, + AGG_BATCH_HASH, + new_rollups, + agg_costs, + numGroups); + path->parallel_aware = true; + add_partial_path(grouped_rel, path); +} + /* * create_window_paths * @@ -4952,6 +5029,30 @@ create_distinct_paths(PlannerInfo *root, NIL, NULL, numDistinctRows)); +#if 1 + /* Generate parallel batch hashed aggregate path */ + if (distinct_rel->consider_parallel && + input_rel->partial_pathlist != NIL && + numDistinctRows > 1.0) + { + Path *path = linitial(input_rel->partial_pathlist); + double numRows = numDistinctRows / path->parallel_workers; + if (numRows < 1.0) + numRows = 1.0; + path = (Path *)create_agg_path(root, + distinct_rel, + path, + path->pathtarget, + AGG_BATCH_HASH, + AGGSPLIT_SIMPLE, + parse->distinctClause, + NIL, + NULL, + numRows); + path->parallel_aware = true; + add_partial_path(distinct_rel, path); + } +#endif } generate_useful_gather_paths(root, distinct_rel, false); @@ -6874,6 +6975,14 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, consider_groupingsets_paths(root, grouped_rel, cheapest_path, false, true, gd, agg_costs, dNumGroups); + if (grouped_rel->consider_parallel && + input_rel->partial_pathlist != NIL) + consider_parallel_hash_groupingsets_paths(root, + grouped_rel, + linitial(input_rel->partial_pathlist), + gd, + agg_costs, + dNumGroups); } else { @@ -6891,6 +7000,28 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, havingQual, agg_costs, dNumGroups)); + + if (grouped_rel->consider_parallel && + input_rel->partial_pathlist != NIL && + dNumGroups >= 2.0) + { + Path *path = linitial(input_rel->partial_pathlist); + double numGroups = dNumGroups / path->parallel_workers; + if (numGroups < 1.0) + numGroups = 1.0; + path = (Path*)create_agg_path(root, + grouped_rel, + path, + grouped_rel->reltarget, + AGG_BATCH_HASH, + AGGSPLIT_SIMPLE, + parse->groupClause, + havingQual, + agg_costs, + numGroups); + path->parallel_aware = true; + add_partial_path(grouped_rel, path); + } } /* diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index fa1053f077..efe438524c 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -714,6 +714,33 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, partial_path->rows); add_partial_path(result_rel, partial_path); } + + /* create parallel batch hashed union */ + if (!op->all && + ppath->rows > 1.0 && + grouping_is_hashable(groupList)) + { + Path *partial_path; + double dNumGroups = ppath->rows / ppath->parallel_workers; + if (dNumGroups < 1.0) + dNumGroups = 1.0; + if (ppath->pathtarget->width * dNumGroups <= get_hash_mem() * 1024L) + { + partial_path = (Path*)create_agg_path(root, + result_rel, + ppath, + create_pathtarget(root, tlist), + AGG_BATCH_HASH, + AGGSPLIT_SIMPLE, + groupList, + NIL, + NULL, + dNumGroups); + partial_path->parallel_aware = true; + add_partial_path(result_rel, partial_path); + } + } + ppath = (Path *) create_gather_path(root, result_rel, ppath, result_rel->reltarget, NULL, NULL); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index cacb7d13e6..04113bae7d 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4024,6 +4024,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_BATCH_SORT_BUILD: event_name = "Batch/Sort/Building"; break; + case WAIT_EVENT_BATCH_HASH_BUILD: + event_name = "Batch/Hash/Building"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 43a4e36d78..ed2a4369eb 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -996,6 +996,15 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"enable_batch_hashagg", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("enable batch hash agg method"), + NULL + }, + &enable_batch_hashagg, + false, + NULL, NULL, NULL + }, { {"enable_incremental_sort", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of incremental sort steps."), diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index b955169538..c6968c8301 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -310,6 +310,8 @@ typedef struct AggStatePerHashData int largestGrpColIdx; /* largest col required for hashing */ AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */ AttrNumber *hashGrpColIdxHash; /* indices in hash table tuples */ + Bitmapset *colnos_needed; /* all columns needed from the outer plan */ + struct BatchStoreData *batch_store; /* grouping set batch store hash */ Agg *aggnode; /* original Agg node, for numGroups etc. */ } AggStatePerHashData; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 14dde9fca3..ac53d0723e 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2218,6 +2218,9 @@ typedef struct AggState * ->hash_pergroup */ ProjectionInfo *combinedproj; /* projection machinery */ SharedAggInfo *shared_info; /* one entry per worker */ + struct Barrier *batch_barrier; /* for parallel batch */ + int current_batch; + bool batch_filled; } AggState; /* ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index ace4c98939..1b3365c241 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -762,7 +762,8 @@ typedef enum AggStrategy AGG_PLAIN, /* simple agg across all input rows */ AGG_SORTED, /* grouped agg, input must be sorted */ AGG_HASHED, /* grouped agg, use internal hashtable */ - AGG_MIXED /* grouped agg, hash and sort both used */ + AGG_MIXED, /* grouped agg, hash and sort both used */ + AGG_BATCH_HASH /* grouped agg, use batch hash */ } AggStrategy; /* diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index f7ad7881dc..941eb3a23b 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -840,6 +840,7 @@ typedef struct Agg /* Note: planner provides numGroups & aggParams only in HASHED/MIXED case */ List *groupingSets; /* grouping sets to use */ List *chain; /* chained Agg/Sort nodes */ + uint32 numBatches; /* valid in HASHED */ } Agg; /* ---------------- diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 37e6a12a6f..dc0cd825f0 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -54,6 +54,7 @@ extern PGDLLIMPORT bool enable_bitmapscan; extern PGDLLIMPORT bool enable_tidscan; extern PGDLLIMPORT bool enable_sort; extern PGDLLIMPORT bool enable_batch_sort; +extern PGDLLIMPORT bool enable_batch_hashagg; extern PGDLLIMPORT bool enable_incremental_sort; extern PGDLLIMPORT bool enable_hashagg; extern PGDLLIMPORT bool enable_nestloop; diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 816fc37739..0e8eb26111 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -19,6 +19,8 @@ #define BATCH_SORT_MIN_BATCHES 2 #define BATCH_SORT_MAX_BATCHES 512 +#define BATCH_STORE_MIN_BATCH 2 +#define BATCH_STORE_MAX_BATCH 1024 /* * prototypes for pathnode.c diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f0b6dae97b..2826a0b38c 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -953,7 +953,8 @@ typedef enum WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, WAIT_EVENT_XACT_GROUP_UPDATE, - WAIT_EVENT_BATCH_SORT_BUILD + WAIT_EVENT_BATCH_SORT_BUILD, + WAIT_EVENT_BATCH_HASH_BUILD } WaitEventIPC; /* ---------- diff --git a/src/test/regress/expected/groupingsets.out b/src/test/regress/expected/groupingsets.out index 701d52b465..bcac70894f 100644 --- a/src/test/regress/expected/groupingsets.out +++ b/src/test/regress/expected/groupingsets.out @@ -1739,4 +1739,69 @@ set work_mem to default; drop table gs_group_1; drop table gs_hash_1; +-- parallel grouping sets +BEGIN; +set enable_batch_hashagg = on; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 10; +explain (costs off) +select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6; + QUERY PLAN +---------------------------------------------- + Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: two, four, ten, twenty + -> Parallel BatchHashAggregate + Group Key: two, twenty + Group Key: two + Group Key: () + Group Key: four + Group Key: ten + -> Parallel Seq Scan on tenk1 +(11 rows) + +select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6; + sum | count | two | four | ten | twenty +----------+-------+-----+------+-----+-------- + 2495000 | 500 | 0 | | | 0 + 2496000 | 500 | 0 | | | 2 + 2497000 | 500 | 0 | | | 4 + 2498000 | 500 | 0 | | | 6 + 2499000 | 500 | 0 | | | 8 + 2500000 | 500 | 0 | | | 10 + 2501000 | 500 | 0 | | | 12 + 2502000 | 500 | 0 | | | 14 + 2503000 | 500 | 0 | | | 16 + 2504000 | 500 | 0 | | | 18 + 24995000 | 5000 | 0 | | | + 2495500 | 500 | 1 | | | 1 + 2496500 | 500 | 1 | | | 3 + 2497500 | 500 | 1 | | | 5 + 2498500 | 500 | 1 | | | 7 + 2499500 | 500 | 1 | | | 9 + 2500500 | 500 | 1 | | | 11 + 2501500 | 500 | 1 | | | 13 + 2502500 | 500 | 1 | | | 15 + 2503500 | 500 | 1 | | | 17 + 2504500 | 500 | 1 | | | 19 + 25000000 | 5000 | 1 | | | + 12495000 | 2500 | | 0 | | + 12497500 | 2500 | | 1 | | + 12500000 | 2500 | | 2 | | + 12502500 | 2500 | | 3 | | + 4995000 | 1000 | | | 0 | + 4996000 | 1000 | | | 1 | + 4997000 | 1000 | | | 2 | + 4998000 | 1000 | | | 3 | + 4999000 | 1000 | | | 4 | + 5000000 | 1000 | | | 5 | + 5001000 | 1000 | | | 6 | + 5002000 | 1000 | | | 7 | + 5003000 | 1000 | | | 8 | + 5004000 | 1000 | | | 9 | + 49995000 | 10000 | | | | +(37 rows) + +ABORT; -- end diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out index b187c1080b..ba327d7583 100644 --- a/src/test/regress/expected/partition_aggregate.out +++ b/src/test/regress/expected/partition_aggregate.out @@ -1523,6 +1523,7 @@ SET min_parallel_table_scan_size = 0; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET enable_indexonlyscan = OFF; +-- using batch sort EXPLAIN (COSTS OFF) SELECT unique2,count(*) FROM tenk1 GROUP BY 1; QUERY PLAN @@ -1588,4 +1589,67 @@ SELECT count(*) FROM 10000 (1 row) +-- using batch hash +SET enable_batch_sort = OFF; +SET enable_batch_hashagg = ON; +EXPLAIN (COSTS OFF) +SELECT unique2,count(*) FROM tenk1 GROUP BY 1; + QUERY PLAN +---------------------------------------- + Gather + Workers Planned: 2 + -> Parallel BatchHashAggregate + Group Key: unique2 + -> Parallel Seq Scan on tenk1 +(5 rows) + +EXPLAIN (COSTS OFF) +SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo; + QUERY PLAN +---------------------------------------------- + Aggregate + -> Gather + Workers Planned: 2 + -> Parallel BatchHashAggregate + Group Key: tenk1.unique2 + -> Parallel Seq Scan on tenk1 +(6 rows) + +SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo; + count +------- + 10000 +(1 row) + +EXPLAIN (COSTS OFF) +SELECT count(*) FROM + (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1 + INNER JOIN + (SELECT unique2 id FROM tenk1) t2 + USING(id); + QUERY PLAN +------------------------------------------------------------------ + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: ((count(*)) = tenk1.unique2) + -> Parallel BatchHashAggregate + Group Key: tenk1_1.unique2 + -> Parallel Seq Scan on tenk1 tenk1_1 + -> Parallel Hash + -> Parallel Seq Scan on tenk1 +(11 rows) + +SELECT count(*) FROM + (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1 + INNER JOIN + (SELECT unique2 id FROM tenk1) t2 + USING(id); + count +------- + 10000 +(1 row) + ABORT; diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out index c200e38d12..8c7f3381be 100644 --- a/src/test/regress/expected/select_distinct.out +++ b/src/test/regress/expected/select_distinct.out @@ -313,6 +313,7 @@ SET min_parallel_table_scan_size =0; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET enable_indexonlyscan = OFF; +-- using batch sort EXPLAIN (costs off) SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; QUERY PLAN @@ -347,4 +348,38 @@ SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo; -> Parallel Seq Scan on tenk1 (9 rows) +-- using batch hash +SET enable_batch_sort = OFF; +SET enable_batch_hashagg = ON; +EXPLAIN (costs off) +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; + QUERY PLAN +---------------------------------------------- + Aggregate + -> Gather + Workers Planned: 2 + -> Parallel BatchHashAggregate + Group Key: tenk1.unique2 + -> Parallel Seq Scan on tenk1 +(6 rows) + +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; + count +------- + 10000 +(1 row) + +explain (costs off) +SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo; + QUERY PLAN +---------------------------------------------- + Gather + Workers Planned: 2 + -> Parallel BatchHashAggregate + Group Key: tenk1.unique2 + -> Parallel BatchHashAggregate + Group Key: tenk1.unique2 + -> Parallel Seq Scan on tenk1 +(7 rows) + ABORT; diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 8ed047e520..a7219644a8 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal; select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- + enable_batch_hashagg | off enable_batch_sort | off enable_bitmapscan | on enable_gathermerge | on @@ -107,7 +108,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(19 rows) +(20 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out index 5a2be9aec9..519a50bb20 100644 --- a/src/test/regress/expected/union.out +++ b/src/test/regress/expected/union.out @@ -1059,6 +1059,7 @@ SET min_parallel_table_scan_size =0; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET enable_indexonlyscan = OFF; +-- using batch sort EXPLAIN (costs off) SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; QUERY PLAN @@ -1106,4 +1107,33 @@ SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) 10000 (1 row) +-- using batch hash +SET enable_batch_sort = OFF; +SET enable_batch_hashagg = ON; +EXPLAIN (costs off) +SELECT count(*) from ( + SELECT hundred from tenk1 + union + SELECT hundred from tenk1) foo; + QUERY PLAN +------------------------------------------------------------ + Aggregate + -> Gather + Workers Planned: 2 + -> Parallel BatchHashAggregate + Group Key: tenk1.hundred + -> Parallel Append + -> Parallel Seq Scan on tenk1 + -> Parallel Seq Scan on tenk1 tenk1_1 +(8 rows) + +SELECT count(*) from ( + SELECT hundred from tenk1 + union + SELECT hundred from tenk1) foo; + count +------- + 100 +(1 row) + ABORT; diff --git a/src/test/regress/sql/groupingsets.sql b/src/test/regress/sql/groupingsets.sql index d4e5628eba..2eff77af47 100644 --- a/src/test/regress/sql/groupingsets.sql +++ b/src/test/regress/sql/groupingsets.sql @@ -511,4 +511,14 @@ set work_mem to default; drop table gs_group_1; drop table gs_hash_1; +-- parallel grouping sets +BEGIN; +set enable_batch_hashagg = on; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 10; +explain (costs off) +select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6; +select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6; +ABORT; + -- end diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql index 3e50a48d37..7ecbe3ed56 100644 --- a/src/test/regress/sql/partition_aggregate.sql +++ b/src/test/regress/sql/partition_aggregate.sql @@ -338,6 +338,26 @@ SET min_parallel_table_scan_size = 0; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET enable_indexonlyscan = OFF; +-- using batch sort +EXPLAIN (COSTS OFF) +SELECT unique2,count(*) FROM tenk1 GROUP BY 1; +EXPLAIN (COSTS OFF) +SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo; +SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo; +EXPLAIN (COSTS OFF) +SELECT count(*) FROM + (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1 + INNER JOIN + (SELECT unique2 id FROM tenk1) t2 + USING(id); +SELECT count(*) FROM + (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1 + INNER JOIN + (SELECT unique2 id FROM tenk1) t2 + USING(id); +-- using batch hash +SET enable_batch_sort = OFF; +SET enable_batch_hashagg = ON; EXPLAIN (COSTS OFF) SELECT unique2,count(*) FROM tenk1 GROUP BY 1; EXPLAIN (COSTS OFF) diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql index 3ff7acf64d..2a16d9b23d 100644 --- a/src/test/regress/sql/select_distinct.sql +++ b/src/test/regress/sql/select_distinct.sql @@ -143,6 +143,15 @@ SET min_parallel_table_scan_size =0; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET enable_indexonlyscan = OFF; +-- using batch sort +EXPLAIN (costs off) +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; +SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; +explain (costs off) +SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo; +-- using batch hash +SET enable_batch_sort = OFF; +SET enable_batch_hashagg = ON; EXPLAIN (costs off) SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo; diff --git a/src/test/regress/sql/union.sql b/src/test/regress/sql/union.sql index a1cb1bb7ac..9fd50db549 100644 --- a/src/test/regress/sql/union.sql +++ b/src/test/regress/sql/union.sql @@ -448,10 +448,23 @@ SET min_parallel_table_scan_size =0; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET enable_indexonlyscan = OFF; +-- using batch sort EXPLAIN (costs off) SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo; EXPLAIN (costs off) SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2); SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2); +-- using batch hash +SET enable_batch_sort = OFF; +SET enable_batch_hashagg = ON; +EXPLAIN (costs off) +SELECT count(*) from ( + SELECT hundred from tenk1 + union + SELECT hundred from tenk1) foo; +SELECT count(*) from ( + SELECT hundred from tenk1 + union + SELECT hundred from tenk1) foo; ABORT; \ No newline at end of file -- 2.16.3