diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index c762fb0..43e85fc 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1023,7 +1023,10 @@ ExplainNode(PlanState *planstate, List *ancestors, pname = sname = "Limit"; break; case T_Hash: - pname = sname = "Hash"; + if (((Hash *) plan)->shared_table) + pname = sname = "Shared Hash"; + else + pname = sname = "Hash"; break; default: pname = sname = "???"; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 86d9fb5..361d56a 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -27,6 +27,7 @@ #include "executor/executor.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" +#include "executor/nodeHashjoin.h" #include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" @@ -203,6 +204,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); break; + case T_HashJoinState: + ExecHashJoinEstimate((HashJoinState *) planstate, + e->pcxt); + break; default: break; } @@ -255,6 +260,9 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecCustomScanInitializeDSM((CustomScanState *) planstate, d->pcxt); break; + case T_HashJoinState: + ExecHashJoinInitializeDSM((HashJoinState *) planstate, + d->pcxt); default: break; } @@ -731,6 +739,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ExecCustomScanInitializeWorker((CustomScanState *) planstate, toc); break; + case T_HashJoinState: + ExecHashJoinInitializeWorker((HashJoinState *) planstate, + toc); + break; default: break; } diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index b8edd36..5c402bb 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -806,6 +806,9 @@ ExecShutdownNode(PlanState *node) case T_GatherState: ExecShutdownGather((GatherState *) node); break; + case T_HashJoinState: + ExecShutdownHashJoin((HashJoinState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 11db08f..5301bc0 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -25,6 +25,7 @@ #include #include "access/htup_details.h" +#include "access/parallel.h" #include "catalog/pg_statistic.h" #include "commands/tablespace.h" #include "executor/execdebug.h" @@ -32,14 +33,17 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "port/atomics.h" #include "utils/dynahash.h" #include "utils/memutils.h" #include "utils/lsyscache.h" +#include "utils/probes.h" #include "utils/syscache.h" - static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); +static void ExecHashShrink(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -47,8 +51,28 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, uint32 hashvalue, int bucketNumber); static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); +static void ExecHashTableComputeOptimalBuckets(HashJoinTable hashtable); + +static HashJoinTuple next_tuple_in_bucket(HashJoinTable table, + HashJoinTuple tuple); +static HashJoinTuple first_tuple_in_skew_bucket(HashJoinTable table, + int skew_bucket_no); +static HashJoinTuple first_tuple_in_skew_bucket(HashJoinTable table, + int bucket_no); +static void insert_tuple_into_bucket(HashJoinTable table, int bucket_no, + HashJoinTuple tuple, + dsa_pointer tuple_pointer); +static void insert_tuple_into_skew_bucket(HashJoinTable table, + int bucket_no, + HashJoinTuple tuple, + dsa_pointer tuple_pointer); static void *dense_alloc(HashJoinTable hashtable, Size size); +static void *dense_alloc_shared(HashJoinTable hashtable, Size size, + dsa_pointer *chunk_shared, + bool secondary, + bool force); + /* ---------------------------------------------------------------- * ExecHash @@ -64,6 +88,98 @@ ExecHash(HashState *node) } /* ---------------------------------------------------------------- + * ExecHashCheckForEarlyExit + * + * return true if this process needs to abandon work on the + * hash join to avoid a deadlock + * ---------------------------------------------------------------- + */ +bool +ExecHashCheckForEarlyExit(HashJoinTable hashtable) +{ + /* + * The golden rule of leader deadlock avoidance: since leader processes + * have two separate roles, namely reading from worker queues AND executing + * the same plan as workers, we must never allow a leader to wait for + * workers if there is any possibility those workers have emitted tuples. + * Otherwise we could get into a situation where a worker fills up its + * output tuple queue and begins waiting for the leader to read, while + * the leader is busy waiting for the worker. + * + * Parallel hash joins with shared tables are inherently susceptible to + * such deadlocks because there are points at which all participants must + * wait (you can't start check for unmatched tuples in the hash table until + * probing has completed in all workers, etc). + * + * So we follow these rules: + * + * 1. If there are workers participating, the leader MUST NOT not + * participate in any further work after probing the first batch, so + * that it never has to wait for workers that might have emitted + * tuples. + * + * 2. If there are no workers participating, the leader MUST run all the + * batches to completion, because that's the only way for the join + * to complete. There is no deadlock risk if there are no workers. + * + * 3. Workers MUST NOT participate if the hashing phase has finished by + * the time they have joined, so that the leader can reliably determine + * whether there are any workers running when it comes to the point + * where it must choose between 1 and 2. + * + * In other words, if the leader makes it all the way through hashing and + * probing before any workers show up, then the leader will run the whole + * hash join on its own. If workers do show up any time before hashing is + * finished, the leader will stop executing the join after helping probe + * the first batch. In the unlikely event of the first worker showing up + * after the leader has finished hashing, it will exit because it's too + * late, the leader has already decided to do all the work alone. + */ + + if (!IsParallelWorker()) + { + /* Running in the leader process. */ + if (BarrierPhase(&hashtable->shared->barrier) >= PHJ_PHASE_PROBING && + hashtable->shared->at_least_one_worker) + { + /* Abandon ship due to rule 1. There are workers running. */ + TRACE_POSTGRESQL_HASH_LEADER_EARLY_EXIT(); + return true; + } + else + { + /* + * Continue processing due to rule 2. There are no workers, and + * any workers that show up later will abandon ship. + */ + } + } + else + { + /* Running in a worker process. */ + if (hashtable->attached_at_phase < PHJ_PHASE_PROBING) + { + /* + * Advertise that there are workers, so that the leader can + * choose between rules 1 and 2. It's OK that several workers can + * write to this variable without immediately memory + * synchronization, because the leader will only read it in a later + * phase (see above). + */ + hashtable->shared->at_least_one_worker = true; + } + else + { + /* Abandon ship due to rule 3. */ + TRACE_POSTGRESQL_HASH_WORKER_EARLY_EXIT(); + return true; + } + } + + return false; +} + +/* ---------------------------------------------------------------- * MultiExecHash * * build hash table for hashjoin, doing partitioning if more @@ -79,6 +195,7 @@ MultiExecHash(HashState *node) TupleTableSlot *slot; ExprContext *econtext; uint32 hashvalue; + Barrier *barrier = NULL; /* must provide our own instrumentation support */ if (node->ps.instrument) @@ -90,6 +207,63 @@ MultiExecHash(HashState *node) outerNode = outerPlanState(node); hashtable = node->hashtable; + if (HashJoinTableIsShared(hashtable)) + { + /* + * Synchronize parallel hash table builds. At this stage we know that + * the shared hash table has been created, but we don't know if our + * peers are still in MultiExecHash and if so how far through. We use + * the phase to synchronize with them. + */ + barrier = &hashtable->shared->barrier; + + switch (BarrierPhase(barrier)) + { + case PHJ_PHASE_BEGINNING: + /* ExecHashTableCreate already handled this phase. */ + Assert(false); + case PHJ_PHASE_CREATING: + /* Wait for serial phase, and then either hash or wait. */ + if (BarrierWait(barrier, WAIT_EVENT_HASH_CREATING)) + goto hash; + else if (node->ps.plan->parallel_aware) + goto hash; + else + goto post_hash; + case PHJ_PHASE_HASHING: + /* Hashing is already underway. Can we join in? */ + if (node->ps.plan->parallel_aware) + goto hash; + else + goto post_hash; + case PHJ_PHASE_RESIZING: + /* Can't help with serial phase. */ + goto post_resize; + case PHJ_PHASE_REBUCKETING: + /* Rebucketing is in progress. Let's help do that. */ + goto rebucket; + default: + /* The hash table building work is already finished. */ + goto finish; + } + } + + hash: + TRACE_POSTGRESQL_HASH_HASHING_START(); + + if (HashJoinTableIsShared(hashtable)) + { + /* Make sure our local hashtable is up-to-date so we can hash. */ + Assert(BarrierPhase(barrier) == PHJ_PHASE_HASHING); + ExecHashUpdate(hashtable); + + /* + * Attach to the second barrier that is just used for coordinating + * shrinking during the hashing phase, in case we run out of work_mem. + */ + BarrierAttach(&hashtable->shared->shrink_barrier); + } + /* * set expression context */ @@ -123,22 +297,106 @@ MultiExecHash(HashState *node) else { /* Not subject to skew optimization, so insert normally */ - ExecHashTableInsert(hashtable, slot, hashvalue); + ExecHashTableInsert(hashtable, slot, hashvalue, false); } - hashtable->totalTuples += 1; + /* + * Shared tuple counters are managed by dense_alloc_shared. For + * private hash tables we maintain the counter here. + */ + if (!HashJoinTableIsShared(hashtable)) + hashtable->totalTuples += 1; } } + if (HashJoinTableIsShared(hashtable)) + { + /* Detach from the shrink barrier. */ + BarrierDetach(&hashtable->shared->shrink_barrier); + } + + TRACE_POSTGRESQL_HASH_HASHING_DONE(); + + post_hash: + + if (HashJoinTableIsShared(hashtable)) + { + bool elected_to_resize; + + /* + * Wait for all backends to finish hashing. If only one worker is + * running the hashing phase because of a non-partial inner plan, the + * other workers will pile up here waiting. If multiple worker are + * hashing, they should finish close to each other in time. + * + * TODO: Even if only one backend is allowed to run the plan, other + * backends might as well stand ready to help with rebatching work if + * the need arises. Maybe we need a way to 'arrive' at a barrier, but + * not block, then a way to loop on another condition variable, + * running ExecHashShrink each time we're woken, and break when all + * partipants have arrived at the barrier (ie when + * BarrierPhase(barrier) reports that the phase has advanced). + */ + Assert(BarrierPhase(barrier) == PHJ_PHASE_HASHING); + elected_to_resize = BarrierWait(barrier, WAIT_EVENT_HASH_HASHING); + /* + * Resizing is a serial phase. All but one should skip ahead to + * rebucketing, but all workers should update their copy of the shared + * tuple count with the final total first. + */ + /* + hashtable->totalTuples = + pg_atomic_read_u64(&hashtable->shared->total_primary_tuples); + */ + if (!elected_to_resize) + goto post_resize; + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); + } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ - if (hashtable->nbuckets != hashtable->nbuckets_optimal) - ExecHashIncreaseNumBuckets(hashtable); + ExecHashIncreaseNumBuckets(hashtable); + + post_resize: + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING); + BarrierWait(&hashtable->shared->barrier, + WAIT_EVENT_HASH_RESIZING); + Assert(BarrierPhase(barrier) == PHJ_PHASE_REBUCKETING); + } + + rebucket: + /* If the table was resized, insert tuples into the new buckets. */ + ExecHashUpdate(hashtable); + ExecHashRebucket(hashtable); /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinBucketHead); if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(barrier) == PHJ_PHASE_REBUCKETING); + BarrierWait(barrier, WAIT_EVENT_HASH_REBUCKETING); + Assert(BarrierPhase(barrier) == PHJ_PHASE_PROBING); + } + + finish: + if (HashJoinTableIsShared(hashtable)) + { + /* + * All hashing work has finished. The other workers may be probing or + * processing unmatched tuples for the initial batch, or dealing with + * later batches. The next synchronization point is in ExecHashJoin's + * HJ_BUILD_HASHTABLE case, which will figure that out and synchronize + * its local state machine with the parallel processing group's phase. + */ + Assert(BarrierPhase(barrier) >= PHJ_PHASE_PROBING); + ExecHashUpdate(hashtable); + } + /* must provide our own instrumentation support */ + /* TODO: report only the tuples that WE hashed here? */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); @@ -243,10 +501,13 @@ ExecEndHash(HashState *node) * ---------------------------------------------------------------- */ HashJoinTable -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) { + Hash *node; HashJoinTable hashtable; + SharedHashJoinTable shared_hashtable; Plan *outerNode; + size_t space_allowed; int nbuckets; int nbatch; int num_skew_mcvs; @@ -261,10 +522,15 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) * "outer" subtree of this node, but the inner relation of the hashjoin). * Compute the appropriate size of the hash table. */ + node = (Hash *) state->ps.plan; outerNode = outerPlan(node); - + shared_hashtable = state->shared_table_data; ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, OidIsValid(node->skewTable), + shared_hashtable != NULL, + shared_hashtable != NULL ? + shared_hashtable->planned_participants - 1 : 0, + &space_allowed, &nbuckets, &nbatch, &num_skew_mcvs); /* nbuckets must be a power of 2 */ @@ -301,11 +567,19 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; - hashtable->spaceAllowed = work_mem * 1024L; + hashtable->spaceAllowed = space_allowed; hashtable->spaceUsedSkew = 0; hashtable->spaceAllowedSkew = hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; - hashtable->chunks = NULL; + hashtable->chunk = NULL; + hashtable->chunk_preload = NULL; + hashtable->chunks_to_rebucket = NULL; + hashtable->chunk_shared = InvalidDsaPointer; + hashtable->chunk_preload_shared = InvalidDsaPointer; + hashtable->area = state->ps.state->es_query_dsa; + hashtable->shared = state->shared_table_data; + hashtable->preloaded_spare_tuple = false; + hashtable->detached_early = false; #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -340,7 +614,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) /* * Create temporary memory contexts in which to keep the hashtable working - * storage. See notes in executor/hashjoin.h. + * storage if using private hash table. See notes in executor/hashjoin.h. */ hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext, "HashTableContext", @@ -368,23 +642,95 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) PrepareTempTablespaces(); } - /* - * Prepare context for the first-scan space allocations; allocate the - * hashbucket array therein, and set each bucket "empty". - */ - MemoryContextSwitchTo(hashtable->batchCxt); + MemoryContextSwitchTo(oldcxt); - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + if (HashJoinTableIsShared(hashtable)) + { + Barrier *barrier; - /* - * Set up for skew optimization, if possible and there's a need for more - * than one batch. (In a one-batch join, there's no point in it.) - */ - if (nbatch > 1) - ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + /* + * Attach to the barrier. The corresponding detach operation is in + * ExecHashTableDestroy. + */ + barrier = &hashtable->shared->barrier; + hashtable->attached_at_phase = BarrierAttach(barrier); - MemoryContextSwitchTo(oldcxt); + /* + * So far we have no idea whether there are any other participants, and + * if so, what phase they are working on. The only thing we care about + * at this point is whether someone has already created the shared + * hash table yet. If not, one backend will be elected to do that + * now. + */ + if (BarrierPhase(barrier) == PHJ_PHASE_BEGINNING) + { + if (BarrierWait(barrier, WAIT_EVENT_HASH_BEGINNING)) + { + /* Serial phase: create the hash tables */ + Size bytes; + HashJoinBucketHead *buckets; + int i; + SharedHashJoinTable shared; + dsa_area *area; + + shared = hashtable->shared; + area = hashtable->area; + bytes = nbuckets * sizeof(HashJoinBucketHead); + + /* Allocate the hash table buckets. */ + shared->buckets = dsa_allocate(area, bytes); + if (!DsaPointerIsValid(shared->buckets)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("out of memory"))); + + /* Initialize the hash table buckets to empty. */ + buckets = dsa_get_address(area, shared->buckets); + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_init(&buckets[i].shared, + InvalidDsaPointer); + + /* Initialize the rest of parallel_state. */ + hashtable->shared->nbuckets = nbuckets; + hashtable->shared->nbatch = nbatch; + hashtable->shared->size = bytes; + hashtable->shared->size_preloaded = 0; + ExecHashJoinRewindBatches(hashtable, 0); + + /* TODO: ExecHashBuildSkewHash */ + + /* + * The backend-local pointers in hashtable will be set up by + * ExecHashUpdate, at each point where they might have + * changed. + */ + } + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_CREATING); + /* The next synchronization point is in MultiExecHash. */ + } + } + else + { + /* + * Prepare context for the first-scan space allocations; allocate the + * hashbucket array therein, and set each bucket "empty". + */ + MemoryContextSwitchTo(hashtable->batchCxt); + + hashtable->buckets = (HashJoinBucketHead *) + palloc0(nbuckets * sizeof(HashJoinBucketHead)); + + MemoryContextSwitchTo(oldcxt); + + /* + * Set up for skew optimization, if possible and there's a need for + * more than one batch. (In a one-batch join, there's no point in + * it.) + */ + if (nbatch > 1) + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); + } return hashtable; } @@ -402,6 +748,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool shared, int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs) @@ -432,9 +780,15 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, inner_rel_bytes = ntuples * tupsize; /* - * Target in-memory hashtable size is work_mem kilobytes. + * Target in-memory hashtable size is work_mem kilobytes. Shared hash + * tables are allowed to multiply work_mem by the number of participants, + * since other non-shared memory based plans allow each participant to use + * work_mem for the same total. */ hash_table_bytes = work_mem * 1024L; + if (shared && parallel_workers > 0) + hash_table_bytes *= parallel_workers + 1; /* one for the leader */ + *space_allowed = hash_table_bytes; /* * If skew optimization is possible, estimate the number of skew buckets @@ -481,8 +835,8 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * Note that both nbuckets and nbatch must be powers of 2 to make * ExecHashGetBucketAndBatch fast. */ - max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple); - max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); + max_pointers = (work_mem * 1024L) / sizeof(HashJoinBucketHead); + max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinBucketHead)); /* If max_pointers isn't a power of 2, must round it down to one */ mppow2 = 1L << my_log2(max_pointers); if (max_pointers != mppow2) @@ -504,7 +858,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * If there's not enough space to store the projected number of tuples and * the required bucket headers, we will need multiple batches. */ - bucket_bytes = sizeof(HashJoinTuple) * nbuckets; + bucket_bytes = sizeof(HashJoinBucketHead) * nbuckets; if (inner_rel_bytes + bucket_bytes > hash_table_bytes) { /* We'll need multiple batches */ @@ -519,12 +873,12 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, * NTUP_PER_BUCKET tuples, whose projected size already includes * overhead for the hash code, pointer to the next tuple, etc. */ - bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)); + bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinBucketHead)); lbuckets = 1L << my_log2(hash_table_bytes / bucket_size); lbuckets = Min(lbuckets, max_pointers); nbuckets = (int) lbuckets; nbuckets = 1 << my_log2(nbuckets); - bucket_bytes = nbuckets * sizeof(HashJoinTuple); + bucket_bytes = nbuckets * sizeof(HashJoinBucketHead); /* * Buckets are simple pointers to hashjoin tuples, while tupsize @@ -564,6 +918,31 @@ ExecHashTableDestroy(HashJoinTable hashtable) { int i; + /* Detached, if we haven't already. */ + if (HashJoinTableIsShared(hashtable) && !hashtable->detached_early) + { + Barrier *barrier = &hashtable->shared->barrier; + + /* + * TODO: Can we just detach if there is only one batch, but wait here + * if there is more than one (to make sure batch files created by this + * participant are not deleted)? When detaching, the last one to + * detach should do the cleanup work, and/or leave things in the right + * state for rescanning. + */ + + if (BarrierWait(barrier, WAIT_EVENT_HASH_DESTROY)) + { + /* Serial: free the tables */ + if (DsaPointerIsValid(hashtable->shared->buckets)) + { + dsa_free(hashtable->area, hashtable->shared->buckets); + hashtable->shared->buckets = InvalidDsaPointer; + } + } + BarrierDetach(&hashtable->shared->barrier); + } + /* * Make sure all the temp files are closed. We skip batch 0, since it * can't have any temp files (and the arrays might not even exist if @@ -584,37 +963,13 @@ ExecHashTableDestroy(HashJoinTable hashtable) pfree(hashtable); } -/* - * ExecHashIncreaseNumBatches - * increase the original number of batches in order to reduce - * current memory consumption - */ static void -ExecHashIncreaseNumBatches(HashJoinTable hashtable) +extend_batch_file_arrays(HashJoinTable hashtable, int nbatch) { - int oldnbatch = hashtable->nbatch; - int curbatch = hashtable->curbatch; - int nbatch; MemoryContext oldcxt; - long ninmemory; - long nfreed; - HashMemoryChunk oldchunks; + int oldnbatch = hashtable->nbatch; - /* do nothing if we've decided to shut off growth */ - if (!hashtable->growEnabled) - return; - - /* safety check to avoid overflow */ - if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2))) - return; - - nbatch = oldnbatch * 2; - Assert(nbatch > 1); - -#ifdef HJDEBUG - printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n", - hashtable, nbatch, hashtable->spaceUsed); -#endif + TRACE_POSTGRESQL_HASH_INCREASE_BATCHES(nbatch); oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); @@ -641,9 +996,49 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) (nbatch - oldnbatch) * sizeof(BufFile *)); } + hashtable->nbatch = nbatch; + MemoryContextSwitchTo(oldcxt); +} - hashtable->nbatch = nbatch; +/* + * ExecHashIncreaseNumBatches + * increase the original number of batches in order to reduce + * current memory consumption + */ +static void +ExecHashIncreaseNumBatches(HashJoinTable hashtable) +{ + int oldnbatch = hashtable->nbatch; + int curbatch = hashtable->curbatch; + int nbatch; + long ninmemory; + long nfreed; + HashMemoryChunk oldchunks; + + /* + * TODO: Should private hash tables also switch to chunk-based memory + * accounting, done in dense_alloc, and use ExecHashShrink? + */ + Assert(!HashJoinTableIsShared(hashtable)); + + /* do nothing if we've decided to shut off growth */ + if (!hashtable->growEnabled) + return; + + /* safety check to avoid overflow */ + if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2))) + return; + + nbatch = oldnbatch * 2; + Assert(nbatch > 1); + +#ifdef HJDEBUG + printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n", + hashtable, nbatch, hashtable->spaceUsed); +#endif + + extend_batch_file_arrays(hashtable, nbatch); /* * Scan through the existing hash table entries and dump out any that are @@ -661,7 +1056,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; hashtable->buckets = repalloc(hashtable->buckets, - sizeof(HashJoinTuple) * hashtable->nbuckets); + sizeof(HashJoinBucketHead) * hashtable->nbuckets); } /* @@ -669,14 +1064,14 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) * buckets now and not have to keep track which tuples in the buckets have * already been processed. We will free the old chunks as we go. */ - memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets); - oldchunks = hashtable->chunks; - hashtable->chunks = NULL; + memset(hashtable->buckets, 0, sizeof(HashJoinBucketHead) * hashtable->nbuckets); + oldchunks = hashtable->chunk; + hashtable->chunk = NULL; /* so, let's scan through the old chunks, and all tuples in each chunk */ while (oldchunks != NULL) { - HashMemoryChunk nextchunk = oldchunks->next; + HashMemoryChunk nextchunk = oldchunks->next.private; /* position within the buffer (up to oldchunks->used) */ size_t idx = 0; @@ -699,20 +1094,23 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) /* keep tuple in memory - copy it into the new chunk */ HashJoinTuple copyTuple; - copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); + copyTuple = (HashJoinTuple) + dense_alloc(hashtable, hashTupleSize); memcpy(copyTuple, hashTuple, hashTupleSize); /* and add it back to the appropriate bucket */ - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + insert_tuple_into_bucket(hashtable, bucketno, copyTuple, + InvalidDsaPointer); } else { /* dump it out */ Assert(batchno > curbatch); - ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), + ExecHashJoinSaveTuple(hashtable, + HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno]); + batchno, + true); hashtable->spaceUsed -= hashTupleSize; nfreed++; @@ -758,8 +1156,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable) { - HashMemoryChunk chunk; - /* do nothing if not an increase (it's called increase for a reason) */ if (hashtable->nbuckets >= hashtable->nbuckets_optimal) return; @@ -780,45 +1176,412 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) * Just reallocate the proper number of buckets - we don't need to walk * through them - we can walk the dense-allocated chunks (just like in * ExecHashIncreaseNumBatches, but without all the copying into new - * chunks) + * chunks): see ExecHashRebucket, which must be called next. + */ + if (HashJoinTableIsShared(hashtable)) + { + Size bytes; + int i; + + /* Serial phase: only one backend reallocates. */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_RESIZING); + + /* Free the old hash table. */ + dsa_free(hashtable->area, hashtable->shared->buckets); + + /* Allocate replacement. */ + bytes = hashtable->nbuckets * sizeof(HashJoinBucketHead); + hashtable->shared->buckets = dsa_allocate(hashtable->area, bytes); + if (!DsaPointerIsValid(hashtable->shared->buckets)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("out of memory"))); + + /* Initialize empty hash table buckets. */ + hashtable->buckets = + dsa_get_address(hashtable->area, + hashtable->shared->buckets); + for (i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&hashtable->buckets[i].shared, + InvalidDsaPointer); + hashtable->shared->nbuckets = hashtable->nbuckets; + + /* Update size accounting. */ + hashtable->shared->size += bytes / 2; + + /* Move all chunks to the rebucket list. */ + hashtable->shared->chunks_to_rebucket = hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + } + else + { + hashtable->buckets = + (HashJoinBucketHead *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinBucketHead)); + + memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinBucketHead)); + /* Move all chunks to the rebucket list. */ + hashtable->chunks_to_rebucket = hashtable->chunk; + hashtable->chunk = NULL; + } +} + +/* + * Pop a memory chunk from a given list. Returns a backend-local pointer to + * the chunk, or NULL if the list is empty. Also sets *chunk_out to the + * dsa_pointer to the chunk. + */ +static HashMemoryChunk +ExecHashPopChunk(HashJoinTable hashtable, + dsa_pointer *chunk_out, + dsa_pointer *head) +{ + HashMemoryChunk chunk; + + Assert(LWLockHeldByMe(&hashtable->shared->chunk_lock)); + + if (!DsaPointerIsValid(*head)) + return NULL; + + *chunk_out = *head; + chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, *chunk_out); + *head = chunk->next.shared; + + return chunk; +} + +/* + * Push a shared memory chunk onto a given list. + */ +static void +ExecHashPushChunk(HashJoinTable hashtable, + HashMemoryChunk chunk, + dsa_pointer chunk_shared, + dsa_pointer *head) +{ + Assert(LWLockHeldByMeInMode(&hashtable->shared->chunk_lock, LW_EXCLUSIVE)); + Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); + + chunk->next.shared = *head; + *head = chunk_shared; +} + +/* + * ExecHashRebucket + * insert the tuples from hashtable->chunks_to_rebucket into the hashtable + */ +void +ExecHashRebucket(HashJoinTable hashtable) +{ + HashMemoryChunk chunk; + dsa_pointer chunk_shared; + int chunks_processed = 0; + + TRACE_POSTGRESQL_HASH_REBUCKET_START(); + + /* + * Scan through all tuples in all chunks in the rebucket list to rebuild + * the hash table. + */ + if (HashJoinTableIsShared(hashtable)) + { + LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE); + chunk = + ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_to_rebucket); + LWLockRelease(&hashtable->shared->chunk_lock); + } + else + chunk = hashtable->chunks_to_rebucket; + while (chunk != NULL) + { + /* process all tuples stored in this chunk */ + size_t idx = 0; + + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + dsa_pointer hashTuple_shared = chunk_shared + + offsetof(HashMemoryChunkData, data) + idx; + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + /* add the tuple to the proper bucket */ + insert_tuple_into_bucket(hashtable, bucketno, hashTuple, + hashTuple_shared); + + /* advance index past the tuple */ + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + ++chunks_processed; + + /* Push chunk back onto the chunk list and move to the next. */ + if (HashJoinTableIsShared(hashtable)) + { + LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE); + ExecHashPushChunk(hashtable, chunk, chunk_shared, + &hashtable->shared->chunks); + chunk = + ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_to_rebucket); + LWLockRelease(&hashtable->shared->chunk_lock); + } + else + { + HashMemoryChunk next = chunk->next.private; + + chunk->next.private = hashtable->chunk; + hashtable->chunk = chunk; + chunk = next; + } + } + + TRACE_POSTGRESQL_HASH_REBUCKET_DONE(chunks_processed); +} + +static void +ExecHashTableComputeOptimalBuckets(HashJoinTable hashtable) +{ + double ntuples = (hashtable->totalTuples - hashtable->skewTuples); + + /* + * Guard against integer overflow and alloc size overflow. The + * MaxAllocSize limitation doesn't really apply for shared hash tables, + * since DSA has no such limit, but for now let's apply the same limit. */ - hashtable->buckets = - (HashJoinTuple *) repalloc(hashtable->buckets, - hashtable->nbuckets * sizeof(HashJoinTuple)); + while (ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET) && + hashtable->nbuckets_optimal <= INT_MAX / 2 && + hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinBucketHead)) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } +} + +/* + * Process the queue of chunks whose tuples need to be redistributed into the + * correct batches until it is empty. Hopefully this will shrink the hash + * table, keeping about half of the tuples in memory and sending the rest to a + * future batch. + */ +static void +ExecHashShrink(HashJoinTable hashtable) +{ + Size size_before_shrink = 0; + Size tuples_in_memory = 0; + Size tuples_written_out = 0; + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + bool elected_to_decide = false; + + TRACE_POSTGRESQL_HASH_SHRINK_START(hashtable->nbatch); + + if (HashJoinTableIsShared(hashtable)) + { + /* + * Since a newly launched participant could arrive while shrinking is + * already underway, we need to be able to jump to the correct place + * in this function. + */ + switch (BarrierPhase(&hashtable->shared->shrink_barrier)) + { + case PHJ_SHRINK_PHASE_BEGINNING: /* likely case */ + break; + case PHJ_SHRINK_PHASE_CLEARING: + goto clearing; + case PHJ_SHRINK_PHASE_WORKING: + goto working; + case PHJ_SHRINK_PHASE_DECIDING: + goto deciding; + } + + /* + * We wait until all participants have reached this point. We need to + * do that because we can't clear the hash table if any partipicant is + * still inserting tuples into it, and we can't modify chunks that any + * participant is still writing into. + */ + if (BarrierWait(&hashtable->shared->shrink_barrier, + WAIT_EVENT_HASH_SHRINKING1)) + { + /* TODO: could also resize hash table here! */ + + /* Serial phase: one participant clears the hash table. */ + memset(hashtable->buckets, 0, + hashtable->nbuckets * sizeof(HashJoinBucketHead)); - memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple)); + /* + * This participant will also make the decision about whether to + * disable further attempts to shrink. + */ + size_before_shrink = hashtable->shared->size; + elected_to_decide = true; + } + clearing: + /* Wait until hash table is cleared. */ + BarrierWait(&hashtable->shared->shrink_barrier, + WAIT_EVENT_HASH_SHRINKING2); + + Assert(hashtable->shared->nbatch == hashtable->nbatch); + } + else + { + /* Clear the hash table. */ + memset(hashtable->buckets, 0, + sizeof(HashJoinBucketHead) * hashtable->nbuckets); + } + + /* Pop first chunk from the shrink queue. */ + if (HashJoinTableIsShared(hashtable)) + { + working: + LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE); + chunk = ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_to_shrink); + LWLockRelease(&hashtable->shared->chunk_lock); + } + else + chunk = hashtable->chunks_to_shrink; + + /* Process queue until empty. */ + while (chunk != NULL) + { + Size idx = 0; + + /* Process all tuples stored in this chunk. */ + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + dsa_pointer copyTupleShared = InvalidDsaPointer; + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + if (batchno == hashtable->curbatch) + { + /* keep tuple in memory - copy it into the new chunk */ + HashJoinTuple copyTuple; + + if (HashJoinTableIsShared(hashtable)) + copyTuple = (HashJoinTuple) + dense_alloc_shared(hashtable, hashTupleSize, + ©TupleShared, false, false); + else + copyTuple = (HashJoinTuple) + dense_alloc(hashtable, hashTupleSize); + memcpy(copyTuple, hashTuple, hashTupleSize); + + /* and add it back to the appropriate bucket */ + insert_tuple_into_bucket(hashtable, bucketno, copyTuple, + copyTupleShared); + ++tuples_in_memory; + } + else + { + /* dump it out */ + Assert(batchno > hashtable->curbatch); + ExecHashJoinSaveTuple(hashtable, + HJTUPLE_MINTUPLE(hashTuple), + hashTuple->hashvalue, + batchno, + true); + + hashtable->spaceUsed -= hashTupleSize; + ++tuples_written_out; + } - /* scan through all tuples in all chunks to rebuild the hash table */ - for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) - { - /* process all tuples stored in this chunk */ - size_t idx = 0; + /* next tuple in this chunk */ + idx += MAXALIGN(hashTupleSize); + } - while (idx < chunk->used) + /* Free chunk and pop next from the shrink queue. */ + if (HashJoinTableIsShared(hashtable)) { - HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); - int bucketno; - int batchno; + Size size = chunk->maxlen + offsetof(HashMemoryChunkData, data); + + TRACE_POSTGRESQL_HASH_FREE_CHUNK(size); + dsa_free(hashtable->area, chunk_shared); + + LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE); + Assert(hashtable->shared->size > size); + hashtable->shared->size -= size; + hashtable->shared->tuples_in_memory += tuples_in_memory; + hashtable->shared->tuples_written_out += tuples_written_out; + tuples_in_memory = 0; + tuples_written_out = 0; + chunk = ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_to_shrink); + LWLockRelease(&hashtable->shared->chunk_lock); + } + else + { + HashMemoryChunk next = chunk->next.private; - ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, - &bucketno, &batchno); + pfree(chunk); + chunk = next; + } + } - /* add the tuple to the proper bucket */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + /* Decide if shrinking actually reduced memory usage. */ + if (HashJoinTableIsShared(hashtable)) + { + /* + * Wait until all have finished shrinking chunks. We need to do that + * because we need the total tuple counts before we can decide whether + * to prevent further attempts at shrinking. + */ + BarrierWait(&hashtable->shared->shrink_barrier, + WAIT_EVENT_HASH_SHRINKING3); - /* advance index past the tuple */ - idx += MAXALIGN(HJTUPLE_OVERHEAD + - HJTUPLE_MINTUPLE(hashTuple)->t_len); + if (elected_to_decide) + { + /* Serial phase: one participant decides. */ + if (hashtable->shared->tuples_in_memory == 0 || + hashtable->shared->tuples_written_out == 0) + { + TRACE_POSTGRESQL_HASH_SHRINK_DISABLED(); + hashtable->shared->shrinking_enabled = false; + } + + TRACE_POSTGRESQL_HASH_SHRINK_STATS(hashtable->shared->tuples_in_memory, + hashtable->shared->tuples_written_out, + size_before_shrink, + hashtable->shared->size); + } + deciding: + /* Wait for above decision to be made. */ + BarrierWaitSet(&hashtable->shared->shrink_barrier, + PHJ_SHRINK_PHASE_BEGINNING, + WAIT_EVENT_HASH_SHRINKING4); + } + else + { + if (tuples_in_memory == 0 || tuples_written_out == 0) + { + TRACE_POSTGRESQL_HASH_SHRINK_DISABLED(); + hashtable->growEnabled = false; } } -} + TRACE_POSTGRESQL_HASH_SHRINK_DONE(); +} /* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value - * it may just go to a temp file for later batches + * it may just go to a temp file for later batches; if 'preload' is + * then it may be loaded into a chunk but not actually inserted yet; + * return true on success, false if we ran out of work_mem * * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual * tuple; the minimal case in particular is certain to happen while reloading @@ -826,10 +1589,11 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) * case by not forcing the slot contents into minimal form; not clear if it's * worth the messiness required. */ -void +bool ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, - uint32 hashvalue) + uint32 hashvalue, + bool preload) { MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); int bucketno; @@ -839,20 +1603,61 @@ ExecHashTableInsert(HashJoinTable hashtable, &bucketno, &batchno); /* - * decide whether to put the tuple in the hash table or a temp file + * decide whether to put the tuple in memory or in a temp file */ - if (batchno == hashtable->curbatch) + if (batchno == hashtable->curbatch + (preload ? 1 : 0)) { /* * put the tuple in hash table */ HashJoinTuple hashTuple; int hashTupleSize; - double ntuples = (hashtable->totalTuples - hashtable->skewTuples); + dsa_pointer hashTuple_shared = InvalidDsaPointer; /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; - hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); + + retry: + if (HashJoinTableIsShared(hashtable)) + hashTuple = (HashJoinTuple) + dense_alloc_shared(hashtable, hashTupleSize, + &hashTuple_shared, preload, true); + else + hashTuple = (HashJoinTuple) + dense_alloc(hashtable, hashTupleSize); + + /* Check for failure for allocate. */ + if (!hashTuple) + { + if (preload) + { + /* + * There is no more work_mem into which to preload tuples for + * the next batch, so tell caller to stop doing that. + */ + Assert(HashJoinTableIsShared(hashtable)); + return false; + } + else + { + /* + * Either dense_alloc_shared has decided that we should + * increase the number of batches or another participant has + * already decided to do that, so we should go and help shrink + * the hash table by sending tuples to future batches. + */ + Assert(HashJoinTableIsShared(hashtable)); + ExecHashShrink(hashtable); + + /* + * Try again. Hopefully memory has been freed up, or we've + * decided to stop respecting work_mem because increasing the + * number of batches isn't helping (large numbers of tuples + * with the same hash value can't be separated). + */ + goto retry; + } + } hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); @@ -865,33 +1670,32 @@ ExecHashTableInsert(HashJoinTable hashtable, */ HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); - /* Push it onto the front of the bucket's list */ - hashTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = hashTuple; + /* Push it onto the front of the bucket's list, unless preloading */ + if (!preload) + insert_tuple_into_bucket(hashtable, bucketno, hashTuple, + hashTuple_shared); /* * Increase the (optimal) number of buckets if we just exceeded the * NTUP_PER_BUCKET threshold, but only when there's still a single * batch. */ - if (hashtable->nbatch == 1 && - ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)) - { - /* Guard against integer overflow and alloc size overflow */ - if (hashtable->nbuckets_optimal <= INT_MAX / 2 && - hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple)) - { - hashtable->nbuckets_optimal *= 2; - hashtable->log2_nbuckets_optimal += 1; - } - } + if (hashtable->nbatch == 1) + ExecHashTableComputeOptimalBuckets(hashtable); + + /* + * TODO: Get rid of the following code, and use the same pattern as + * above, namely let dense_alloc count chunk size (it's more + * accurate!) and let it tell you when you need to back off and + * ExecHashShrink? + */ /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinBucketHead) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } @@ -900,11 +1704,15 @@ ExecHashTableInsert(HashJoinTable hashtable, /* * put the tuple into a temp file for later batches */ - Assert(batchno > hashtable->curbatch); - ExecHashJoinSaveTuple(tuple, + Assert(batchno > hashtable->curbatch + (preload ? 1 : 0)); + ExecHashJoinSaveTuple(hashtable, + tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchno, + true); } + + return true; } /* @@ -1047,6 +1855,134 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } /* + * Update the local hashtable with the current pointers and sizes from + * hashtable->parallel_state. + */ +void +ExecHashUpdate(HashJoinTable hashtable) +{ + Barrier *barrier; + + if (!HashJoinTableIsShared(hashtable)) + return; + + barrier = &hashtable->shared->barrier; + + /* + * This should only be called in a phase when the hash table is not being + * mutated (ie resized, swapped etc). + */ + Assert(!PHJ_PHASE_MUTATING_TABLE( + BarrierPhase(&hashtable->shared->barrier))); + + /* The hash table. */ + hashtable->buckets = (HashJoinBucketHead *) + dsa_get_address(hashtable->area, hashtable->shared->buckets); + hashtable->nbuckets = hashtable->shared->nbuckets; + /* TODO nbatch? */ + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); + + hashtable->curbatch = PHJ_PHASE_TO_BATCHNO(BarrierPhase(barrier)); +} + +/* + * Get the next tuple in the same bucket as 'tuple'. + */ +static HashJoinTuple +next_tuple_in_bucket(HashJoinTable table, HashJoinTuple tuple) +{ + if (HashJoinTableIsShared(table)) + return (HashJoinTuple) + dsa_get_address(table->area, tuple->next.shared); + else + return tuple->next.private; +} + +/* + * Get the first tuple in a given skew bucket identified by number. + */ +static HashJoinTuple +first_tuple_in_skew_bucket(HashJoinTable table, int skew_bucket_no) +{ + if (HashJoinTableIsShared(table)) + return (HashJoinTuple) + dsa_get_address(table->area, + table->skewBucket[skew_bucket_no]->tuples.shared); + else + return table->skewBucket[skew_bucket_no]->tuples.private; +} + +/* + * Get the first tuple in a given bucket identified by number. + */ +static HashJoinTuple +first_tuple_in_bucket(HashJoinTable table, int bucket_no) +{ + if (HashJoinTableIsShared(table)) + { + dsa_pointer p = + dsa_pointer_atomic_read(&table->buckets[bucket_no].shared); + return (HashJoinTuple) dsa_get_address(table->area, p); + } + else + return table->buckets[bucket_no].private; +} + +/* + * Insert a tuple at the front of a given bucket identified by number. For + * shared hash joins, tuple_shared must be provided, pointing to the tuple in + * the dsa_area backing the table. For private hash joins, it should be + * InvalidDsaPointer. + */ +static void +insert_tuple_into_bucket(HashJoinTable table, int bucket_no, + HashJoinTuple tuple, dsa_pointer tuple_shared) +{ + if (HashJoinTableIsShared(table)) + { + Assert(tuple == dsa_get_address(table->area, tuple_shared)); + for (;;) + { + tuple->next.shared = + dsa_pointer_atomic_read(&table->buckets[bucket_no].shared); + if (dsa_pointer_atomic_compare_exchange(&table->buckets[bucket_no].shared, + &tuple->next.shared, + tuple_shared)) + break; + } + } + else + { + tuple->next.private = table->buckets[bucket_no].private; + table->buckets[bucket_no].private = tuple; + } +} + +/* + * Insert a tuple at the front of a given skew bucket identified by number. + * For shared hash joins, tuple_shared must be provided, pointing to the tuple + * in the dsa_area backing the table. For private hash joins, it should be + * InvalidDsaPointer. + */ +static void +insert_tuple_into_skew_bucket(HashJoinTable table, int skew_bucket_no, + HashJoinTuple tuple, + dsa_pointer tuple_shared) +{ + if (HashJoinTableIsShared(table)) + { + tuple->next.shared = + table->skewBucket[skew_bucket_no]->tuples.shared; + table->skewBucket[skew_bucket_no]->tuples.shared = tuple_shared; + } + else + { + tuple->next.private = table->skewBucket[skew_bucket_no]->tuples.private; + table->skewBucket[skew_bucket_no]->tuples.private = tuple; + } +} + +/* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple * @@ -1073,11 +2009,12 @@ ExecScanHashBucket(HashJoinState *hjstate, * otherwise scan the standard hashtable bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) - hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; + hashTuple = first_tuple_in_skew_bucket(hashtable, + hjstate->hj_CurSkewBucketNo); else - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; + hashTuple = first_tuple_in_bucket(hashtable, hjstate->hj_CurBucketNo); while (hashTuple != NULL) { @@ -1101,7 +2038,7 @@ ExecScanHashBucket(HashJoinState *hjstate, } } - hashTuple = hashTuple->next; + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); } /* @@ -1144,6 +2081,81 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) HashJoinTable hashtable = hjstate->hj_HashTable; HashJoinTuple hashTuple = hjstate->hj_CurTuple; + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_UNMATCHED_BATCH(hashtable->curbatch)); + + /* + * For the parallel verison, we'll let each participant pull chunks + * from the queue to work on independently. + */ + for (;;) + { + /* Do we need a new chunk? */ + if (hashtable->chunk == NULL) + { + dsa_pointer chunk_shared; + + /* + * Try to pop a chunk from the unmatched queue, and put it + * back on the main chunks list. + */ + LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE); + hashtable->chunk = + ExecHashPopChunk(hashtable, &chunk_shared, + &hashtable->shared->chunks_unmatched); + if (hashtable->chunk != NULL) + ExecHashPushChunk(hashtable, hashtable->chunk, + chunk_shared, + &hashtable->shared->chunks); + LWLockRelease(&hashtable->shared->chunk_lock); + + /* If no more chunks in the queue: we're done. */ + if (hashtable->chunk == NULL) + return false; + + hashtable->chunk_unmatched_pos = 0; + } + + /* Does the current chunk have any more tuples? */ + if (hashtable->chunk_unmatched_pos >= hashtable->chunk->used) + { + /* Try a new chunk. */ + hashtable->chunk = NULL; + continue; + } + hashTuple = (HashJoinTuple) + hashtable->chunk->data + hashtable->chunk_unmatched_pos; + + /* Move to the next tuple in this chunk. */ + hashtable->chunk_unmatched_pos += + HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len; + + /* Is it unmatched? */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + } + } + for (;;) { /* @@ -1152,21 +2164,21 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) * bucket. */ if (hashTuple != NULL) - hashTuple = hashTuple->next; - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); { - hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; - hjstate->hj_CurBucketNo++; - } - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) - { - int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; + if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = first_tuple_in_bucket(hashtable, + hjstate->hj_CurBucketNo++); + else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) + { + int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; - hashTuple = hashtable->skewBucket[j]->tuples; - hjstate->hj_CurSkewBucketNo++; + hashTuple = first_tuple_in_skew_bucket(hashtable, j); + hjstate->hj_CurSkewBucketNo++; + } + else + break; /* finished all buckets */ } - else - break; /* finished all buckets */ while (hashTuple != NULL) { @@ -1191,7 +2203,7 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return true; } - hashTuple = hashTuple->next; + hashTuple = next_tuple_in_bucket(hashtable, hashTuple); } } @@ -1212,6 +2224,59 @@ ExecHashTableReset(HashJoinTable hashtable) MemoryContext oldcxt; int nbuckets = hashtable->nbuckets; + if (HashJoinTableIsShared(hashtable)) + { + /* Wait for all workers to finish accessing the hash table. */ + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_UNMATCHED); + if (BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_UNMATCHED)) + { + /* Serial phase: set up hash table for new batch. */ + int i; + + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_PROMOTING); + + /* Clear the hash table. */ + for (i = 0; i < nbuckets; ++i) + dsa_pointer_atomic_write(&hashtable->buckets[i].shared, + InvalidDsaPointer); + + /* Free all the chunks. */ + /* TODO: Put them on a freelist instead? Better than making one backend free them all! */ + while (DsaPointerIsValid(hashtable->shared->chunks)) + { + HashMemoryChunk chunk = (HashMemoryChunk) + dsa_get_address(hashtable->area, hashtable->shared->chunks); + dsa_pointer next = chunk->next.shared; + + dsa_free(hashtable->area, hashtable->shared->chunks); + hashtable->shared->chunks = next; + } + + /* Any preloaded chunks for the next batch need to be bucketed. */ + hashtable->shared->chunks_to_rebucket = + hashtable->shared->chunks_preloaded; + hashtable->shared->chunks_preloaded = InvalidDsaPointer; + + /* Update the hash table size: it now has the preloaded chunks. */ + hashtable->shared->size = + (hashtable->nbuckets * sizeof(HashJoinBucketHead)) + + hashtable->shared->size_preloaded; + hashtable->shared->size_preloaded = 0; + } + /* Wait again, so that all workers now have the new table. */ + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_PROMOTING); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_LOADING); + ExecHashUpdate(hashtable); + + /* Forget the current chunks. */ + hashtable->chunk = NULL; + hashtable->chunk_preload = NULL; + return; + } + /* * Release all the hash buckets and tuples acquired in the prior pass, and * reinitialize the context for a new pass. @@ -1220,15 +2285,15 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + hashtable->buckets = (HashJoinBucketHead *) + palloc0(nbuckets * sizeof(HashJoinBucketHead)); hashtable->spaceUsed = 0; MemoryContextSwitchTo(oldcxt); /* Forget the chunks (the memory was freed by the context reset above). */ - hashtable->chunks = NULL; + hashtable->chunk = NULL; } /* @@ -1241,10 +2306,14 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) HashJoinTuple tuple; int i; + /* TODO: share this work out? */ + /* Reset all flags in the main table ... */ for (i = 0; i < hashtable->nbuckets; i++) { - for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next) + for (tuple = first_tuple_in_bucket(hashtable, i); + tuple != NULL; + tuple = next_tuple_in_bucket(hashtable, tuple)) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } @@ -1252,9 +2321,10 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable) for (i = 0; i < hashtable->nSkewBuckets; i++) { int j = hashtable->skewBucketNums[i]; - HashSkewBucket *skewBucket = hashtable->skewBucket[j]; - for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next) + for (tuple = first_tuple_in_skew_bucket(hashtable, j); + tuple != NULL; + tuple = next_tuple_in_bucket(hashtable, tuple)) HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); } } @@ -1414,11 +2484,11 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) continue; /* Okay, create a new skew bucket for this hashvalue. */ - hashtable->skewBucket[bucket] = (HashSkewBucket *) + hashtable->skewBucket[bucket] = (HashSkewBucket *) /* TODO */ MemoryContextAlloc(hashtable->batchCxt, sizeof(HashSkewBucket)); hashtable->skewBucket[bucket]->hashvalue = hashvalue; - hashtable->skewBucket[bucket]->tuples = NULL; + hashtable->skewBucket[bucket]->tuples.private = NULL; hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; @@ -1496,18 +2566,29 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); HashJoinTuple hashTuple; int hashTupleSize; + dsa_pointer tuple_pointer; /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; - hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, - hashTupleSize); + if (HashJoinTableIsShared(hashtable)) + { + tuple_pointer = dsa_allocate(hashtable->area, hashTupleSize); + hashTuple = (HashJoinTuple) dsa_get_address(hashtable->area, + tuple_pointer); + } + else + { + tuple_pointer = InvalidDsaPointer; + hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, + hashTupleSize); + } hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the skew bucket's list */ - hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; - hashtable->skewBucket[bucketNumber]->tuples = hashTuple; + insert_tuple_into_skew_bucket(hashtable, bucketNumber, hashTuple, + tuple_pointer); /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; @@ -1538,6 +2619,9 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) int batchno; HashJoinTuple hashTuple; + /* TODO: skew buckets not yet supported for parallel mode */ + Assert(!HashJoinTableIsShared(hashtable)); + /* Locate the bucket to remove */ bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; bucket = hashtable->skewBucket[bucketToRemove]; @@ -1552,10 +2636,10 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); /* Process all tuples in the bucket */ - hashTuple = bucket->tuples; + hashTuple = first_tuple_in_skew_bucket(hashtable, bucketToRemove); while (hashTuple != NULL) { - HashJoinTuple nextHashTuple = hashTuple->next; + HashJoinTuple nextHashTuple = next_tuple_in_bucket(hashtable, hashTuple); MinimalTuple tuple; Size tupleSize; @@ -1581,8 +2665,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) memcpy(copyTuple, hashTuple, tupleSize); pfree(hashTuple); - copyTuple->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = copyTuple; + insert_tuple_into_bucket(hashtable, bucketno, copyTuple, + InvalidDsaPointer); /* We have reduced skew space, but overall space doesn't change */ hashtable->spaceUsedSkew -= tupleSize; @@ -1591,8 +2675,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) { /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); - ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + ExecHashJoinSaveTuple(hashtable, tuple, hashvalue, + batchno, true); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; @@ -1636,6 +2720,198 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } /* + * Allocate 'size' bytes from the currently active shared HashMemoryChunk, or + * create a new chunk if necessary. This is similar to the private memory + * version, but also deals with 'preload' chunks and coordination with other + * participants. + * + * If respect_work_mem is true, then return NULL if the number of batches has + * been increased in order to avoid exceeding work_mem. Pass false to allow + * work_mem to be exceeded (as can be temporarily needed by ExecHashShrink, or + * if increasing the number of batches doesn't seem to be helping us shrink + * the memory usage). + */ +static void * +dense_alloc_shared(HashJoinTable hashtable, + Size size, + dsa_pointer *shared, + bool preload, + bool respect_work_mem) +{ + dsa_pointer chunk_shared; + HashMemoryChunk chunk; + Size chunk_size; + + /* just in case the size is not already aligned properly */ + size = MAXALIGN(size); + + /* + * Fast path: if there is enough space in this backend's current chunk, + * then we can allocate without any locking or work_mem accounting. If + * HASH_CHUNK_SIZE is large enough, this strategy should keep lock + * contention low. It doesn't matter if another participant has decided + * to increase the number of batches; we'll finish filling up this chunk + * and then find out about the increase when we need to allocate a new + * chunk. + */ + chunk = preload ? hashtable->chunk_preload : hashtable->chunk; + if (chunk != NULL && + size < HASH_CHUNK_THRESHOLD && + chunk->maxlen - chunk->used >= size) + { + void *result; + + chunk_shared = preload + ? hashtable->chunk_preload_shared + : hashtable->chunk_shared; + Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); + *shared = chunk_shared + + offsetof(HashMemoryChunkData, data) + + chunk->used; + result = chunk->data + chunk->used; + chunk->used += size; + chunk->ntuples += 1; + + Assert(chunk->used <= chunk->maxlen); + Assert(result == dsa_get_address(hashtable->area, *shared)); + + return result; + } + + /* + * Slow path: try to allocate a new chunk, while also coordinating with + * other participants to keep memory usage under work_mem by increasing + * the number of batches as required. + */ + LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE); + + /* Check if some other participant has increased nbatch. */ + if (hashtable->shared->nbatch > hashtable->nbatch) + { + Assert(!preload); + Assert(respect_work_mem); + extend_batch_file_arrays(hashtable, hashtable->shared->nbatch); + + hashtable->chunk = NULL; + hashtable->chunk_shared = InvalidDsaPointer; + LWLockRelease(&hashtable->shared->chunk_lock); + + /* + * Whenever nbatch changes, every participant attached to + * shrink_barrier must run ExecHashShrink to help shrink the hash + * table. So return NULL to tell caller to go and do that. + */ + return NULL; + } + + /* Oversized tuples get their own chunk. */ + if (size > HASH_CHUNK_THRESHOLD) + chunk_size = size + offsetof(HashMemoryChunkData, data); + else + chunk_size = HASH_CHUNK_SIZE; + + /* If appropriate, check if work_mem would be exceeded by a new chunk. */ + if (respect_work_mem && + hashtable->shared->shrinking_enabled && + (hashtable->shared->size + + hashtable->shared->size_preloaded + + chunk_size) > (work_mem * 1024L)) + { + /* + * It would. If allocating for the current batch (ie not preloading + * the next batch), increase number of batches so we can shrink the + * hash table. + */ + if (!preload) + { + hashtable->shared->nbatch *= 2; + extend_batch_file_arrays(hashtable, hashtable->shared->nbatch); + + /* All allocated chunks now need to be shrunk. */ + hashtable->shared->chunks_to_shrink = hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + hashtable->shared->tuples_in_memory = 0; + hashtable->shared->tuples_written_out = 0; + + hashtable->chunk = NULL; + hashtable->chunk_shared = InvalidDsaPointer; + } + LWLockRelease(&hashtable->shared->chunk_lock); + + /* + * If the caller is preloading, it should now stop doing that because + * there is no more work_mem. If it is loading, it should now run + * ExecHashShrink so we can get some memory back. + */ + return NULL; + } + + /* We are cleared to allocate a new chunk. */ + chunk_shared = dsa_allocate(hashtable->area, chunk_size); + if (!DsaPointerIsValid(chunk_shared)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("out of memory"))); + TRACE_POSTGRESQL_HASH_ALLOCATE_CHUNK(chunk_size); + if (preload) + hashtable->shared->size_preloaded += chunk_size; + else + hashtable->shared->size += chunk_size; + + /* Set up the chunk. */ + chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); + *shared = chunk_shared + offsetof(HashMemoryChunkData, data); + chunk->maxlen = chunk_size - offsetof(HashMemoryChunkData, data); + chunk->used = size; + chunk->ntuples = 1; + + /* + * Push it onto the appropriate list of chunks, so that it can be found if + * we need to rebucket or shrink the whole hash table. + */ + ExecHashPushChunk(hashtable, chunk, chunk_shared, + preload + ? &hashtable->shared->chunks_preloaded + : &hashtable->shared->chunks); + + if (size > HASH_CHUNK_THRESHOLD) + { + /* + * Count oversized tuples immediately, but don't bother making this + * chunk the 'current' chunk because it has no more space in it for + * next time. + */ + if (preload) + ++hashtable->shared->tuples_next_batch; + else + ++hashtable->shared->tuples_this_batch; + } + else + { + /* + * Make this the current chunk so that we can use the fast path to + * fill the rest of it up in future called. We will count this tuple + * later, when the chunk is full. + */ + if (preload) + { + hashtable->chunk_preload = chunk; + hashtable->chunk_preload_shared = chunk_shared; + } + else + { + hashtable->chunk = chunk; + hashtable->chunk_shared = chunk_shared; + } + } + LWLockRelease(&hashtable->shared->chunk_lock); + + Assert(chunk->data == dsa_get_address(hashtable->area, *shared)); + + return chunk->data; +} + +/* * Allocate 'size' bytes from the currently active HashMemoryChunk */ static void * @@ -1653,26 +2929,28 @@ dense_alloc(HashJoinTable hashtable, Size size) */ if (size > HASH_CHUNK_THRESHOLD) { + /* allocate new chunk and put it at the beginning of the list */ - newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, - offsetof(HashMemoryChunkData, data) + size); + newChunk = (HashMemoryChunk) + MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashMemoryChunkData, data) + size); newChunk->maxlen = size; newChunk->used = 0; - newChunk->ntuples = 0; + newChunk->ntuples= 0; /* * Add this chunk to the list after the first existing chunk, so that * we don't lose the remaining space in the "current" chunk. */ - if (hashtable->chunks != NULL) + if (hashtable->chunk != NULL) { - newChunk->next = hashtable->chunks->next; - hashtable->chunks->next = newChunk; + newChunk->next.private = hashtable->chunk->next.private; + hashtable->chunk->next.private = newChunk; } else { - newChunk->next = hashtable->chunks; - hashtable->chunks = newChunk; + newChunk->next.private = NULL; + hashtable->chunk = newChunk; } newChunk->used += size; @@ -1685,27 +2963,27 @@ dense_alloc(HashJoinTable hashtable, Size size) * See if we have enough space for it in the current chunk (if any). If * not, allocate a fresh chunk. */ - if ((hashtable->chunks == NULL) || - (hashtable->chunks->maxlen - hashtable->chunks->used) < size) + if ((hashtable->chunk == NULL) || + (hashtable->chunk->maxlen - hashtable->chunk->used) < size) { /* allocate new chunk and put it at the beginning of the list */ - newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, - offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE); - + newChunk = (HashMemoryChunk) + MemoryContextAlloc(hashtable->batchCxt, + offsetof(HashMemoryChunkData, data) + + HASH_CHUNK_SIZE); + newChunk->next.private = hashtable->chunk; + hashtable->chunk = newChunk; newChunk->maxlen = HASH_CHUNK_SIZE; newChunk->used = size; newChunk->ntuples = 1; - newChunk->next = hashtable->chunks; - hashtable->chunks = newChunk; - return newChunk->data; } /* There is enough space in the current chunk, let's add the tuple */ - ptr = hashtable->chunks->data + hashtable->chunks->used; - hashtable->chunks->used += size; - hashtable->chunks->ntuples += 1; + ptr = hashtable->chunk->data + hashtable->chunk->used; + hashtable->chunk->used += size; + hashtable->chunk->ntuples += 1; /* return pointer to the start of the tuple memory */ return ptr; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index b41e4e2..e267bab 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -21,8 +21,10 @@ #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/barrier.h" #include "utils/memutils.h" - +#include "utils/probes.h" /* * States of the ExecHashJoin state machine @@ -42,11 +44,16 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); -static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, - BufFile *file, +static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinTable hashtable, uint32 *hashvalue, TupleTableSlot *tupleSlot); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static void ExecHashJoinLoadBatch(HashJoinState *hjstate); +static void ExecHashJoinExportAllBatches(HashJoinTable hashtable); +static void ExecHashJoinExportBatch(HashJoinTable hashtable, int batchno, bool inner); +static void ExecHashJoinImportBatch(HashJoinTable hashtable, + HashJoinBatchReader *reader); +static void ExecHashJoinPreloadNextBatch(HashJoinState *hjstate); /* ---------------------------------------------------------------- @@ -147,6 +154,14 @@ ExecHashJoin(HashJoinState *node) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (hashNode->shared_table_data != NULL) + { + /* + * TODO: The empty-outer optimization is not implemented + * for shared hash tables yet. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -166,7 +181,7 @@ ExecHashJoin(HashJoinState *node) /* * create the hash table */ - hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan, + hashtable = ExecHashTableCreate(hashNode, node->hj_HashOperators, HJ_FILL_INNER(node)); node->hj_HashTable = hashtable; @@ -177,12 +192,57 @@ ExecHashJoin(HashJoinState *node) hashNode->hashtable = hashtable; (void) MultiExecProcNode((PlanState *) hashNode); + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(&hashtable->shared->barrier) >= + PHJ_PHASE_HASHING); + + /* + * Check if we are a worker that attached too late to + * avoid deadlock risk with the leader, or a leader that + * arrived here too late. + */ + if (ExecHashCheckForEarlyExit(hashtable)) + { + /* + * Other participants will need to handle all future + * batches written by me. We don't detach until after + * we've exported all batches, otherwise the phase + * might advance and another participant might try to + * import them. + */ + if (BarrierPhase(&hashtable->shared->barrier) <= + PHJ_PHASE_PROBING) + ExecHashJoinExportAllBatches(hashtable); + BarrierDetach(&hashtable->shared->barrier); + hashtable->detached_early = true; + return NULL; + } + + /* + * Export just the next batch, if there is one, because it + * is now read-only and other participants may decide to + * read from it. Future batches can still be written to + * if work_mem is exceeded by any future batch and we + * decide to increase their number, so we can't export + * those yet. We'll export the batch files written by + * each participant only as they become read-only, but + * before any participant reads from them. + */ + if (hashtable->nbatch > 1) + { + ExecHashJoinExportBatch(hashtable, 1, false); + ExecHashJoinExportBatch(hashtable, 1, true); + } + } + /* * If the inner relation is completely empty, and we're not * doing a left outer join, we can quit without scanning the * outer relation. */ - if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) + if (!HashJoinTableIsShared(hashtable) && /* TODO:TM */ + hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) return NULL; /* @@ -198,12 +258,73 @@ ExecHashJoin(HashJoinState *node) */ node->hj_OuterNotEmpty = false; - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (HashJoinTableIsShared(hashtable)) + { + Barrier *barrier = &hashtable->shared->barrier; + int phase = BarrierPhase(barrier); + + /* + * Map the current phase to the appropriate initial state + * for this worker, so we can get started. + */ + Assert(BarrierPhase(barrier) >= PHJ_PHASE_PROBING); + hashtable->curbatch = PHJ_PHASE_TO_BATCHNO(phase); + switch (PHJ_PHASE_TO_SUBPHASE(phase)) + { + case PHJ_SUBPHASE_PROMOTING: + /* Wait for serial phase to finish. */ + BarrierWait(barrier, WAIT_EVENT_HASHJOIN_PROMOTING); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(barrier)) == + PHJ_SUBPHASE_LOADING); + /* fall through */ + case PHJ_SUBPHASE_LOADING: + /* Help load the current batch. */ + ExecHashUpdate(hashtable); + ExecHashJoinOpenBatch(hashtable, hashtable->curbatch, + true); + ExecHashJoinLoadBatch(node); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(barrier)) == + PHJ_SUBPHASE_PROBING); + /* fall through */ + case PHJ_SUBPHASE_PREPARING: + /* Wait for serial phase to finish. */ + BarrierWait(barrier, WAIT_EVENT_HASHJOIN_PROMOTING); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(barrier)) == + PHJ_SUBPHASE_PROBING); + /* fall through */ + case PHJ_SUBPHASE_PROBING: + /* Help probe the current batch. */ + ExecHashUpdate(hashtable); + ExecHashJoinOpenBatch(hashtable, hashtable->curbatch, + false); + node->hj_JoinState = HJ_NEED_NEW_OUTER; + break; + case PHJ_SUBPHASE_UNMATCHED: + /* Help scan for unmatched inner tuples. */ + ExecHashUpdate(hashtable); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + break; + } + continue; + } + else + { + node->hj_JoinState = HJ_NEED_NEW_OUTER; + ExecHashJoinOpenBatch(hashtable, 0, false); + } /* FALL THRU */ case HJ_NEED_NEW_OUTER: + if (HashJoinTableIsShared(hashtable)) + { + Assert(PHJ_PHASE_TO_BATCHNO(BarrierPhase(&hashtable->shared->barrier)) == + hashtable->curbatch); + Assert(PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_PROBING); + } + /* * We don't have an outer tuple, try to get the next one */ @@ -213,6 +334,67 @@ ExecHashJoin(HashJoinState *node) if (TupIsNull(outerTupleSlot)) { /* end of batch, or maybe whole join */ + + /* + * Switch to reading tuples from the next inner batch. We + * do this here because in the shared hash table case we + * want to do this before ExecHashJoinPreloadNextBatch. + */ + if (hashtable->curbatch + 1 < hashtable->nbatch) + ExecHashJoinOpenBatch(hashtable, + hashtable->curbatch + 1, + true); + + if (HashJoinTableIsShared(hashtable)) + { + /* + * Check if we are a leader that can't go further than + * probing the first batch without deadlock risk, + * because there are workers running. + */ + if (ExecHashCheckForEarlyExit(hashtable)) + { + /* + * Other backends will need to handle all future + * batches written by me. We don't detach until + * after we've exported all batches, otherwise + * another participant might try to import them + * too soon. + */ + ExecHashJoinExportAllBatches(hashtable); + BarrierDetach(&hashtable->shared->barrier); + hashtable->detached_early = true; + return NULL; + } + + /* + * We may be able to load some amount of the next + * batch into spare work_mem, before we start waiting + * for other workers to finish probing the current + * batch. + */ + ExecHashJoinPreloadNextBatch(node); + + /* + * We can't start searching for unmatched tuples until + * all participants have finished probing, so we + * synchronize here. + */ + if (BarrierWait(&hashtable->shared->barrier, + WAIT_EVENT_HASHJOIN_PROBING)) + { + /* Serial phase: prepare for unmatched. */ + if (HJ_FILL_INNER(node)) + { + hashtable->chunk = NULL; + hashtable->shared->chunks_unmatched = + hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + } + } + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_UNMATCHED_BATCH(hashtable->curbatch)); + } if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ @@ -250,9 +432,9 @@ ExecHashJoin(HashJoinState *node) * Save it in the corresponding outer-batch file. */ Assert(batchno > hashtable->curbatch); - ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), - hashvalue, - &hashtable->outerBatchFile[batchno]); + ExecHashJoinSaveTuple(hashtable, + ExecFetchSlotMinimalTuple(outerTupleSlot), + hashvalue, batchno, false); /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } @@ -296,6 +478,13 @@ ExecHashJoin(HashJoinState *node) if (joinqual == NIL || ExecQual(joinqual, econtext, false)) { node->hj_MatchedOuter = true; + /* + * Note: it is OK to do this in a shared hash table + * without any kind of memory synchronization, because the + * only transition is 0->1, so ordering doesn't matter if + * several backends do it, and there will be a memory + * barrier before anyone reads it. + */ HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); /* In an antijoin, we never return a matched tuple */ @@ -631,6 +820,88 @@ ExecEndHashJoin(HashJoinState *node) ExecEndNode(innerPlanState(node)); } +void +ExecShutdownHashJoin(HashJoinState *node) +{ + /* + * TODO: Figure out how to handle this! For now, just clear the shared + * hash table so that ExecEndHashJoin won't blow up when it's called after + * the dsa_area has been detached... + */ + if (node->hj_HashTable) + node->hj_HashTable->shared = NULL; +} + +/* + * For shared hash joins, load as much of the next batch as we can as part of + * the probing phase for the current batch. This overlapping means that we do + * something useful with a CPU and the spare memory before we start waiting + * for other workers. + */ +static void +ExecHashJoinPreloadNextBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + + if (HashJoinTableIsShared(hashtable)) + { + Barrier *barrier PG_USED_FOR_ASSERTS_ONLY = &hashtable->shared->barrier; + int curbatch = hashtable->curbatch; + int next_batch = curbatch + 1; + TupleTableSlot *slot; + uint32 hashvalue; + + Assert(BarrierPhase(barrier) == PHJ_PHASE_PROBING_BATCH(curbatch)); + + /* + * TODO: We can't preload batch 1 at the end of probing batch 0, + * because the leader might call ExecHashJoinExportAllBatches() during + * that phase. Batches can't be exported by one backend and imported + * and accessed by another in the same phase. Is there a way to + * reorder things and avoid that problem? + */ + if (next_batch == 1) + return; + + if (next_batch < hashtable->nbatch) + { + for (;;) + { + slot = ExecHashJoinGetSavedTuple(hashtable, + &hashvalue, + hjstate->hj_HashTupleSlot); + if (slot == NULL) + { + /* + * We were able to load the whole batch into memory + * without running out of work_mem. + */ + break; + } + + /* + * Try to preload this tuple into a chunk. It is not actually + * inserted into the hash table yet. + */ + if (!ExecHashTableInsert(hashtable, + hjstate->hj_HashTupleSlot, + hashvalue, + true)) /* preload */ + { + /* + * There is no more work_mem. We'll leave this tuple in + * the slot and tell ExecHashJoinLoadBatch to insert it + * once we've finish probing the current hash table. + */ + hashtable->preloaded_spare_tuple = true; + hashtable->preloaded_spare_tuple_hash = hashvalue; + return; + } + } + } + } +} + /* * ExecHashJoinOuterGetTuple * @@ -680,7 +951,6 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, { /* remember outer relation is not empty for possible rescan */ hjstate->hj_OuterNotEmpty = true; - return slot; } @@ -699,11 +969,10 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, * In outer-join cases, we could get here even though the batch file * is empty. */ - if (file == NULL) + if (!HashJoinTableIsShared(hashtable) && file == NULL) return NULL; - slot = ExecHashJoinGetSavedTuple(hjstate, - file, + slot = ExecHashJoinGetSavedTuple(hashtable, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -726,22 +995,26 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) HashJoinTable hashtable = hjstate->hj_HashTable; int nbatch; int curbatch; - BufFile *innerFile; - TupleTableSlot *slot; - uint32 hashvalue; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + if (HashJoinTableIsShared(hashtable)) + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_UNMATCHED_BATCH(curbatch)); + if (curbatch > 0) { /* * We no longer need the previous outer batch file; close it right * away to free disk space. */ + /* TODO: is this ok for a shared hash table? */ if (hashtable->outerBatchFile[curbatch]) + { BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; + hashtable->outerBatchFile[curbatch] = NULL; + } } else /* we just finished the first batch */ { @@ -776,7 +1049,8 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * need to be reassigned. */ curbatch++; - while (curbatch < nbatch && + while (!HashJoinTableIsShared(hashtable) && + curbatch < nbatch && (hashtable->outerBatchFile[curbatch] == NULL || hashtable->innerBatchFile[curbatch] == NULL)) { @@ -792,13 +1066,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) if (hashtable->outerBatchFile[curbatch] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ - /* We can ignore this batch. */ /* Release associated temp files right away. */ + /* TODO review */ if (hashtable->innerBatchFile[curbatch]) BufFileClose(hashtable->innerBatchFile[curbatch]); + hashtable->innerBatchFile[curbatch] = NULL; if (hashtable->outerBatchFile[curbatch]) BufFileClose(hashtable->outerBatchFile[curbatch]); + hashtable->outerBatchFile[curbatch] = NULL; curbatch++; } @@ -812,48 +1088,163 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); + ExecHashJoinLoadBatch(hjstate); + + return true; +} - innerFile = hashtable->innerBatchFile[curbatch]; +static void +ExecHashJoinLoadBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + uint32 hashvalue; + + TRACE_POSTGRESQL_HASH_LOADING_START(); - if (innerFile != NULL) + if (HashJoinTableIsShared(hashtable)) { - if (BufFileSeek(innerFile, 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind hash-join temporary file: %m"))); + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_LOADING_BATCH(curbatch)); - while ((slot = ExecHashJoinGetSavedTuple(hjstate, - innerFile, - &hashvalue, - hjstate->hj_HashTupleSlot))) - { - /* - * NOTE: some tuples may be sent to future batches. Also, it is - * possible for hashtable->nbatch to be increased here! - */ - ExecHashTableInsert(hashtable, slot, hashvalue); - } + /* + * Shrinking may be triggered while loading, if work_mem is exceeded. + * We need to be attached to shrink_barrier so that we can coordinate + * that among participants. + */ + BarrierAttach(&hashtable->shared->shrink_barrier); + } + + /* + * In HJ_NEED_NEW_OUTER, we already selected the current inner batch for + * reading from. If there is a shared hash table, we may have already + * partially loaded the hash table in ExecHashJoinPreloadNextBatch. It + * may have already loaded one tuple that it couldn't insert, so we'll do + * that first. + */ + Assert(hashtable->batch_reader.batchno == curbatch); + Assert(hashtable->batch_reader.inner); + + if (hashtable->preloaded_spare_tuple) + { + bool success; + + Assert(HashJoinTableIsShared(hashtable)); + Assert(!TupIsNull(hjstate->hj_HashTupleSlot)); + success = ExecHashTableInsert(hashtable, hjstate->hj_HashTupleSlot, + hashtable->preloaded_spare_tuple_hash, + false); + Assert(success); + hashtable->preloaded_spare_tuple = false; + } + + /* + * If we preloaded any tuples, we now need to insert them into the + * hashtable. + */ + ExecHashRebucket(hashtable); + + /* Finally, we can read in the rest of the batch. */ + for (;;) + { + slot = ExecHashJoinGetSavedTuple(hashtable, + &hashvalue, + hjstate->hj_HashTupleSlot); + + if (slot == NULL) + break; /* - * after we build the hash table, the inner batch file is no longer - * needed + * NOTE: some tuples may be sent to future batches. Also, it is + * possible for hashtable->nbatch to be increased here! */ - BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + ExecHashTableInsert(hashtable, slot, hashvalue, false); + } + + if (HashJoinTableIsShared(hashtable)) + { + /* We have finished any potential shrinking. */ + BarrierDetach(&hashtable->shared->shrink_barrier); } + TRACE_POSTGRESQL_HASH_LOADING_DONE(); + /* - * Rewind outer batch file (if present), so that we can start reading it. + * Now that we have finished loading this batch into the hash table, we + * can set our outer batch read head to the start of the current batch, + * and our inner batch read head to the start of the NEXT batch (as + * expected by ExecHashJoinPreloadNextBatch). */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (HashJoinTableIsShared(hashtable)) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind hash-join temporary file: %m"))); + /* + * Wait until all participants have finished loading their portion of + * the hash table. + */ + if (BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASHJOIN_LOADING)) + { + /* Serial phase: prepare to read this outer and next inner batch */ + ExecHashJoinRewindBatches(hashtable, hashtable->curbatch); + } + + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PREPARING_BATCH(hashtable->curbatch)); + /* + * Since we have finished loading the current batch into memory, the + * batch files generated by this participant for the next batch are + * now read-only. So it's time to export them for other participants + * to read from if they run out of tuples to read from their own batch + * files. We'll export the current outer batch, so that it can be + * used for probing, and the next inner batch so that it can be used + * for preloading tuples for the next batch when that is finished. + */ + ExecHashJoinExportBatch(hashtable, hashtable->curbatch, false); + if (hashtable->curbatch + 1 < hashtable->nbatch) + ExecHashJoinExportBatch(hashtable, hashtable->curbatch + 1, true); + + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASHJOIN_PREPARING); + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); } + else + ExecHashJoinRewindBatches(hashtable, hashtable->curbatch); - return true; + /* + * The inner batch file is no longer needed by any participant, because + * the hash table has been fully reloaded. + */ + ExecHashJoinCloseBatch(hashtable, hashtable->curbatch, true); + + /* Prepare to read from the current outer batch. */ + ExecHashJoinOpenBatch(hashtable, hashtable->curbatch, false); +} + +/* + * Export a BufFile, copy the descriptor to DSA memory and return the + * dsa_pointer. + */ +static dsa_pointer +make_batch_descriptor(dsa_area *area, BufFile *file) +{ + dsa_pointer pointer; + BufFileDescriptor *source; + BufFileDescriptor *target; + size_t size; + + source = BufFileExport(file); + size = BufFileDescriptorSize(source); + pointer = dsa_allocate(area, size); + if (!DsaPointerIsValid(pointer)) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed on dsa_allocate of size %zu.", size))); + target = dsa_get_address(area, pointer); + memcpy(target, source, size); + pfree(source); + + return pointer; } /* @@ -868,17 +1259,26 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * will get messed up. */ void -ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, - BufFile **fileptr) +ExecHashJoinSaveTuple(HashJoinTable hashtable, + MinimalTuple tuple, uint32 hashvalue, + int batchno, + bool inner) { - BufFile *file = *fileptr; + BufFile *file; size_t written; + if (inner) + file = hashtable->innerBatchFile[batchno]; + else + file = hashtable->outerBatchFile[batchno]; if (file == NULL) { /* First write to this batch file, so open it. */ file = BufFileCreateTemp(false); - *fileptr = file; + if (inner) + hashtable->innerBatchFile[batchno] = file; + else + hashtable->outerBatchFile[batchno] = file; } written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32)); @@ -892,57 +1292,519 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to hash-join temporary file: %m"))); + + TRACE_POSTGRESQL_HASH_SAVE_TUPLE(HashJoinParticipantNumber(), + batchno, + inner); +} + +/* + * Export the inner or outer batch file written by this participant for a + * given batch number, so that other backends can import and read from it if + * they run out of tuples to read from their own files. This must be done + * after this participant has finished writing to the batch, but before any + * other participant might attempt to read from it. + */ +static void +ExecHashJoinExportBatch(HashJoinTable hashtable, int batchno, bool inner) +{ + HashJoinParticipantState *participant; + BufFile *file; + + TRACE_POSTGRESQL_HASHJOIN_EXPORT_BATCH(HashJoinParticipantNumber(), + batchno, + inner); + + Assert(HashJoinTableIsShared(hashtable)); + Assert(batchno < hashtable->nbatch); + + participant = &hashtable->shared->participants[HashJoinParticipantNumber()]; + + /* We will export batches one-by-one. */ + participant->nbatch = -1; + + if (inner) + { + participant->inner_batchno = batchno; + file = hashtable->innerBatchFile[batchno]; + if (file != NULL) + participant->inner_batch_descriptor = + make_batch_descriptor(hashtable->area, file); + else + participant->inner_batch_descriptor = + InvalidDsaPointer; + } + else + { + participant->outer_batchno = batchno; + file = hashtable->outerBatchFile[batchno]; + if (file != NULL) + participant->outer_batch_descriptor = + make_batch_descriptor(hashtable->area, file); + else + participant->outer_batch_descriptor = + InvalidDsaPointer; + } +} + +/* + * Export all future batches. This must be called by any backend that exits + * early, to make sure that the batch files it wrote to can be consumed by + * other participants. + */ +static void +ExecHashJoinExportAllBatches(HashJoinTable hashtable) +{ + HashJoinParticipantState *participant; + dsa_pointer *inner_batch_descriptors; + dsa_pointer *outer_batch_descriptors; + Size size; + BufFile *file; + int i; + + /* + * Sanity check that we are in one of the expected phases, in which no + * other participant could be reading the state we are writing. + * + * TODO: See ExecHashJoinPreloadNextBatch where we can't actually preload + * batch 1 because of this. Need to figure something better out. + * + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == PHJ_PHASE_HASHING || + BarrierPhase(&hashtable->shared->barrier) == PHJ_PHASE_PROBING); + + TRACE_POSTGRESQL_HASHJOIN_EXPORT_ALL_BATCHES(HashJoinParticipantNumber(), + hashtable->nbatch); + + /* If we didn't generate any batches there is nothing to do. */ + participant = &hashtable->shared->participants[HashJoinParticipantNumber()]; + if (hashtable->nbatch <= 1) + { + /* No one ever needs to read batch 0. */ + participant->nbatch = 0; + return; + } + + /* Set up space for descriptors for all my batches. */ + participant->nbatch = hashtable->nbatch; + size = sizeof(dsa_pointer) * hashtable->nbatch; + participant->inner_batch_descriptors = dsa_allocate(hashtable->area, size); + participant->outer_batch_descriptors = dsa_allocate(hashtable->area, size); + if (!DsaPointerIsValid(participant->inner_batch_descriptors) || + !DsaPointerIsValid(participant->outer_batch_descriptors)) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed on dsa_allocate of size %zu.", size))); + inner_batch_descriptors = + dsa_get_address(hashtable->area, + participant->inner_batch_descriptors); + outer_batch_descriptors = + dsa_get_address(hashtable->area, + participant->outer_batch_descriptors); + memset(inner_batch_descriptors, 0, size); + memset(outer_batch_descriptors, 0, size); + + /* Now export all batches that were written by this participant. */ + for (i = hashtable->curbatch + 1; i < hashtable->nbatch; ++i) + { + file = hashtable->innerBatchFile[i]; + if (file != NULL) + inner_batch_descriptors[i] = + make_batch_descriptor(hashtable->area, file); + file = hashtable->outerBatchFile[i]; + if (file != NULL) + outer_batch_descriptors[i] = + make_batch_descriptor(hashtable->area, file); + } +} + +/* + * Import a batch that was exported by another participant, so that this + * process can read it. The participant and batch numbers should be already + * set in the reader object that is passed in. + */ +static void +ExecHashJoinImportBatch(HashJoinTable hashtable, HashJoinBatchReader *reader) +{ + dsa_pointer descriptor = InvalidDsaPointer; + HashJoinParticipantState *participant; + + TRACE_POSTGRESQL_HASHJOIN_IMPORT_BATCH(reader->participant_number, + reader->batchno, + reader->inner); + + Assert(reader->participant_number >= 0 && + reader->participant_number < hashtable->shared->planned_participants); + + /* Find the participant referenced by the reader. */ + participant = &hashtable->shared->participants[reader->participant_number]; + + /* Find the descriptor exported by that participant for that batch. */ + if (participant->nbatch != -1) + { + /* It exported all its batches and left. Find the correct one. */ + if (reader->batchno < participant->nbatch) + { + dsa_pointer *descriptors; + + Assert(DsaPointerIsValid(participant->inner_batch_descriptors)); + Assert(DsaPointerIsValid(participant->outer_batch_descriptors)); + descriptors = + dsa_get_address(hashtable->area, + reader->inner + ? participant->inner_batch_descriptors + : participant->outer_batch_descriptors); + if (DsaPointerIsValid(descriptors[reader->batchno])) + descriptor = descriptors[reader->batchno]; + } + } + else + { + /* It must have just exported the exact batch we expect. */ + Assert((reader->inner && + (reader->batchno == participant->inner_batchno)) || + (!reader->inner && + (reader->batchno == participant->outer_batchno))); + + if (reader->inner) + descriptor = participant->inner_batch_descriptor; + else + descriptor = participant->outer_batch_descriptor; + } + + /* Import the BufFile, if we found one. */ + if (DsaPointerIsValid(descriptor)) + { + reader->head.fileno = reader->head.offset = -1; + reader->file = BufFileImport(dsa_get_address(hashtable->area, + descriptor)); + if (reader->inner) + reader->shared = &participant->inner_batch_reader; + else + reader->shared = &participant->outer_batch_reader; + Assert(reader->shared->batchno == reader->batchno); + } + else + { + reader->file = NULL; + reader->shared = NULL; + } +} + +/* + * Select the batch file that ExecHashJoinGetSavedTuple will read from. + */ +void +ExecHashJoinOpenBatch(HashJoinTable hashtable, int batchno, bool inner) +{ + HashJoinBatchReader *batch_reader = &hashtable->batch_reader; + + TRACE_POSTGRESQL_HASHJOIN_OPEN_BATCH(HashJoinParticipantNumber(), + batchno, + inner); + + if (batchno == 0) + batch_reader->file = NULL; + else + batch_reader->file = inner + ? hashtable->innerBatchFile[batchno] + : hashtable->outerBatchFile[batchno]; + + if (HashJoinTableIsShared(hashtable)) + { + HashJoinParticipantState *participant; + + /* Initially we will read from the caller's batch file. */ + participant = + &hashtable->shared->participants[HashJoinParticipantNumber()]; + batch_reader->shared = inner + ? &participant->inner_batch_reader + : &participant->outer_batch_reader; + /* Seek to the shared position at next read. */ + batch_reader->head.fileno = -1; + batch_reader->head.offset = -1; + } + else + { + batch_reader->shared = NULL; + /* Seek to start of batch now, if there is one. */ + if (batch_reader->file != NULL) + BufFileSeek(batch_reader->file, 0, 0, SEEK_SET); + } + + batch_reader->participant_number = HashJoinParticipantNumber(); + batch_reader->batchno = batchno; + batch_reader->inner = inner; +} + +/* + * Close a batch, once it is not needed by any participant. This causes batch + * files created by this participant to be deleted. + */ +void +ExecHashJoinCloseBatch(HashJoinTable hashtable, int batchno, bool inner) +{ + HashJoinParticipantState *participant; + HashJoinBatchReader *batch_reader; + BufFile *file; + + /* + * We only need to close the batch owned by THIS participant. That causes + * it to be deleted. Batches opened in this backend but created by other + * participants are closed by ExecHashJoinGetSavedTuple when it reaches + * the end of the file, allowing them to be closed sooner. + */ + batch_reader = &hashtable->batch_reader; + participant = &hashtable->shared->participants[HashJoinParticipantNumber()]; + if (inner) + { + file = hashtable->innerBatchFile[batchno]; + hashtable->innerBatchFile[batchno] = NULL; + } + else + { + file = hashtable->outerBatchFile[batchno]; + hashtable->outerBatchFile[batchno] = NULL; + } + if (file == NULL) + return; + + Assert(batch_reader->file == NULL || file == batch_reader->file); + + BufFileClose(file); + batch_reader->file = NULL; +} + +/* + * Rewind batch readers. The outer batch reader is rewound to the start of + * batchno. The inner batch reader is rewound to the start of batchno + 1, in + * anticipation of preloading the next batch. + */ +void +ExecHashJoinRewindBatches(HashJoinTable hashtable, int batchno) +{ + HashJoinBatchReader *batch_reader; + int i; + + batch_reader = &hashtable->batch_reader; + + if (HashJoinTableIsShared(hashtable)) + { + Assert(BarrierPhase(&hashtable->shared->barrier) == PHJ_PHASE_CREATING || + (PHJ_PHASE_TO_SUBPHASE(BarrierPhase(&hashtable->shared->barrier)) == + PHJ_SUBPHASE_PREPARING && + PHJ_PHASE_TO_BATCHNO(BarrierPhase(&hashtable->shared->barrier)) == + batchno)); + + /* Position the shared read heads for each participant's batch. */ + for (i = 0; i < hashtable->shared->planned_participants; ++i) + { + HashJoinSharedBatchReader *reader; + + reader = &hashtable->shared->participants[i].outer_batch_reader; + reader->batchno = batchno; /* for probing this batch */ + reader->head.fileno = 0; + reader->head.offset = 0; + + reader = &hashtable->shared->participants[i].inner_batch_reader; + reader->batchno = batchno + 1; /* for preloading the next batch */ + reader->head.fileno = 0; + reader->head.offset = 0; + } + } } /* * ExecHashJoinGetSavedTuple - * read the next tuple from a batch file. Return NULL if no more. + * read the next tuple from the batch selected with + * ExecHashJoinOpenBatch, including the batch files of + * other participants if the hash table is shared. Return NULL if no + * more. * * On success, *hashvalue is set to the tuple's hash value, and the tuple * itself is stored in the given slot. */ static TupleTableSlot * -ExecHashJoinGetSavedTuple(HashJoinState *hjstate, - BufFile *file, +ExecHashJoinGetSavedTuple(HashJoinTable hashtable, uint32 *hashvalue, TupleTableSlot *tupleSlot) { - uint32 header[2]; - size_t nread; - MinimalTuple tuple; + TupleTableSlot *result = NULL; + HashJoinBatchReader *batch_reader = &hashtable->batch_reader; - /* - * Since both the hash value and the MinimalTuple length word are uint32, - * we can read them both in one BufFileRead() call without any type - * cheating. - */ - nread = BufFileRead(file, (void *) header, sizeof(header)); - if (nread == 0) /* end of file */ + for (;;) { - ExecClearTuple(tupleSlot); - return NULL; - } - if (nread != sizeof(header)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from hash-join temporary file: %m"))); - *hashvalue = header[0]; - tuple = (MinimalTuple) palloc(header[1]); - tuple->t_len = header[1]; - nread = BufFileRead(file, - (void *) ((char *) tuple + sizeof(uint32)), - header[1] - sizeof(uint32)); - if (nread != header[1] - sizeof(uint32)) - ereport(ERROR, - (errcode_for_file_access(), + uint32 header[2]; + size_t nread; + MinimalTuple tuple; + bool can_close = false; + + if (batch_reader->file == NULL) + { + /* + * No file found for the current participant. Try stealing tuples + * from the next participant. + */ + goto next_participant; + } + + if (HashJoinTableIsShared(hashtable)) + { + Assert((batch_reader->inner && + batch_reader->shared == + &hashtable->shared->participants[batch_reader->participant_number].inner_batch_reader) || + (!batch_reader->inner && + batch_reader->shared == + &hashtable->shared->participants[batch_reader->participant_number].outer_batch_reader)); + + LWLockAcquire(&batch_reader->shared->lock, LW_EXCLUSIVE); + Assert(batch_reader->shared->batchno == batch_reader->batchno); + if (batch_reader->shared->error) + { + /* Don't try to read if reading failed in some other backend. */ + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from hash-join temporary file"))); + } + + /* Set the shared error flag, which we'll clear if we succeed. */ + batch_reader->shared->error = true; + + /* + * If another worker has moved the shared read head since we last read, + * we'll need to seek to the new shared position. + */ + if (batch_reader->head.fileno != batch_reader->shared->head.fileno || + batch_reader->head.offset != batch_reader->shared->head.offset) + { + TRACE_POSTGRESQL_HASH_SEEK(HashJoinParticipantNumber(), + batch_reader->participant_number, + batch_reader->batchno, + batch_reader->inner, + batch_reader->shared->head.fileno, + batch_reader->shared->head.offset); + BufFileSeek(batch_reader->file, + batch_reader->shared->head.fileno, + batch_reader->shared->head.offset, + SEEK_SET); + batch_reader->head = batch_reader->shared->head; + } + } + + /* Try to read the size and hash. */ + nread = BufFileRead(batch_reader->file, (void *) header, sizeof(header)); + if (nread > 0) + { + if (nread != sizeof(header)) + { + ereport(ERROR, + (errcode_for_file_access(), errmsg("could not read from hash-join temporary file: %m"))); - return ExecStoreMinimalTuple(tuple, tupleSlot, true); -} + } + *hashvalue = header[0]; + tuple = (MinimalTuple) palloc(header[1]); + tuple->t_len = header[1]; + nread = BufFileRead(batch_reader->file, + (void *) ((char *) tuple + sizeof(uint32)), + header[1] - sizeof(uint32)); + if (nread != header[1] - sizeof(uint32)) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from hash-join temporary file: %m"))); + } + + TRACE_POSTGRESQL_HASH_GET_SAVED_TUPLE(HashJoinParticipantNumber(), + batch_reader->participant_number, + batch_reader->batchno, + batch_reader->inner); + result = ExecStoreMinimalTuple(tuple, tupleSlot, true); + } + + if (HashJoinTableIsShared(hashtable)) + { + if (nread == 0 && + batch_reader->participant_number != + HashJoinParticipantNumber()) + { + /* + * We've reached the end of another paticipant's batch file, + * so close it now. We'll deal with closing THIS + * participant's batch file later, because we don't want the + * files to be deleted just yet. + */ + can_close = true; + } + /* Commit new head position to shared memory and clear error. */ + BufFileTell(batch_reader->file, + &batch_reader->head.fileno, + &batch_reader->head.offset); + batch_reader->shared->head = batch_reader->head; + batch_reader->shared->error = false; + if (nread == 0) + TRACE_POSTGRESQL_HASH_TELL(HashJoinParticipantNumber(), + batch_reader->participant_number, + batch_reader->batchno, + batch_reader->inner, + batch_reader->shared->head.fileno, + batch_reader->shared->head.offset); + LWLockRelease(&batch_reader->shared->lock); + } + + if (can_close) + { + BufFileClose(batch_reader->file); + batch_reader->file = NULL; + } + + if (result != NULL) + return result; + +next_participant: + if (!HashJoinTableIsShared(hashtable)) + { + /* Private hash table, end of batch. */ + ExecClearTuple(tupleSlot); /* TODO:TM also needed for shared n'est-ce pas? */ + return NULL; + } + + /* Try the next participant's batch file. */ + batch_reader->participant_number = + (batch_reader->participant_number + 1) % + hashtable->shared->planned_participants; + if (batch_reader->participant_number == HashJoinParticipantNumber()) + { + /* + * We've made it all the way back to the file we started with, + * which is the one that this backend wrote. So there are no more + * tuples to be had in any participant's batch file. + */ + ExecClearTuple(tupleSlot); + return NULL; + } + /* Import the BufFile from that participant, if it exported one. */ + ExecHashJoinImportBatch(hashtable, batch_reader); + } +} void ExecReScanHashJoin(HashJoinState *node) { + HashState *hashNode = (HashState *) innerPlanState(node); + + /* We can't use HashJoinTableIsShared if the table is NULL. */ + if (hashNode->shared_table_data != NULL) + { + elog(ERROR, "TODO: shared ExecReScanHashJoin not yet implemented"); + + /* Coordinate a rewind to the shared hash table creation phase. */ + BarrierWaitSet(&hashNode->shared_table_data->barrier, + PHJ_PHASE_BEGINNING, + WAIT_EVENT_HASHJOIN_REWINDING); + } + /* * In a multi-batch join, we currently have to do rescans the hard way, * primarily because batch temp files may have already been released. But @@ -977,6 +1839,14 @@ ExecReScanHashJoin(HashJoinState *node) /* ExecHashJoin can skip the BUILD_HASHTABLE step */ node->hj_JoinState = HJ_NEED_NEW_OUTER; + + if (HashJoinTableIsShared(node->hj_HashTable)) + { + /* Coordinate a rewind to the shared probing phase. */ + BarrierWaitSet(&hashNode->shared_table_data->barrier, + PHJ_PHASE_PROBING, + WAIT_EVENT_HASHJOIN_REWINDING2); + } } else { @@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node) node->hj_HashTable = NULL; node->hj_JoinState = HJ_BUILD_HASHTABLE; + if (HashJoinTableIsShared(node->hj_HashTable)) + { + /* Coordinate a rewind to the shared hash table creation phase. */ + BarrierWaitSet(&hashNode->shared_table_data->barrier, + PHJ_PHASE_BEGINNING, + WAIT_EVENT_HASHJOIN_REWINDING3); + } + /* * if chgParam of subnode is not null then plan will be re-scanned * by first ExecProcNode. @@ -1011,3 +1889,97 @@ ExecReScanHashJoin(HashJoinState *node) if (node->js.ps.lefttree->chgParam == NULL) ExecReScan(node->js.ps.lefttree); } + +void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) +{ + size_t size; + + size = offsetof(SharedHashJoinTableData, participants) + + sizeof(HashJoinParticipantState) * (pcxt->nworkers + 1); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +void +ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) +{ + HashState *hashNode; + SharedHashJoinTable shared; + size_t size; + int planned_participants; + int i; + + /* + * Disable shared hash table mode if we failed to create a real DSM + * segment, because that means that we don't have a DSA area to work + * with. + */ + if (pcxt->seg == NULL) + return; + + /* + * Set up the state needed to coordinate access to the shared hash table, + * using the plan node ID as the toc key. + */ + planned_participants = pcxt->nworkers + 1; /* possible workers + leader */ + size = offsetof(SharedHashJoinTableData, participants) + + sizeof(HashJoinParticipantState) * planned_participants; + shared = shm_toc_allocate(pcxt->toc, size); + BarrierInit(&shared->barrier, 0); + BarrierInit(&shared->shrink_barrier, 0); + shared->buckets = InvalidDsaPointer; + shared->chunks = InvalidDsaPointer; + shared->chunks_preloaded = InvalidDsaPointer; + shared->chunks_to_rebucket = InvalidDsaPointer; + shared->chunks_to_shrink = InvalidDsaPointer; + shared->chunks_unmatched = InvalidDsaPointer; + shared->planned_participants = planned_participants; + shared->size = 0; + shared->size_preloaded = 0; + shared->shrinking_enabled = true; + shm_toc_insert(pcxt->toc, state->js.ps.plan->plan_node_id, shared); + + /* Initialize the LWLocks. */ + LWLockInitialize(&shared->chunk_lock, LWTRANCHE_PARALLEL_HASH_JOIN_CHUNK); + for (i = 0; i < planned_participants; ++i) + { + LWLockInitialize(&shared->participants[i].inner_batch_reader.lock, + LWTRANCHE_PARALLEL_HASH_JOIN_INNER_BATCH_READER); + LWLockInitialize(&shared->participants[i].outer_batch_reader.lock, + LWTRANCHE_PARALLEL_HASH_JOIN_OUTER_BATCH_READER); + } + + /* + * Pass the SharedHashJoinTable to the hash node. If the Gather node + * running in the leader backend decides to execute the hash join, it + * hasn't called ExecHashJoinInitializeWorker so it doesn't have + * state->shared_table_data set up. So we must do it here. + */ + hashNode = (HashState *) innerPlanState(state); + hashNode->shared_table_data = shared; +} + +void +ExecHashJoinInitializeWorker(HashJoinState *state, shm_toc *toc) +{ + HashState *hashNode; + + state->hj_sharedHashJoinTable = + shm_toc_lookup(toc, state->js.ps.plan->plan_node_id); + + /* + * Inject SharedHashJoinTable into the hash node. It could instead have + * its own ExecHashInitializeWorker function, but we only want to set its + * 'parallel_aware' flag if we want to tell it to actually build the hash + * table in parallel. Since its parallel_aware flag also controls whether + * its 'InitializeWorker' function gets called, and it also needs access + * to this object for serial shared hash mode, we'll pass it on here + * instead of depending on that. + */ + hashNode = (HashState *) innerPlanState(state); + hashNode->shared_table_data = state->hj_sharedHashJoinTable; + Assert(hashNode->shared_table_data != NULL); + + Assert(HashJoinParticipantNumber() < + hashNode->shared_table_data->planned_participants); +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 439a946..df1d574 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -31,6 +31,8 @@ #include "executor/nodeSeqscan.h" #include "utils/rel.h" +#include + static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); static TupleTableSlot *SeqNext(SeqScanState *node); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 806d0a9..a2beb27 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1993,6 +1993,7 @@ _outHashPath(StringInfo str, const HashPath *node) WRITE_NODE_FIELD(path_hashclauses); WRITE_INT_FIELD(num_batches); + WRITE_ENUM_FIELD(table_type, HashPathTableType); } static void diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index a52eb7e..2856bcd 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -104,6 +104,7 @@ double seq_page_cost = DEFAULT_SEQ_PAGE_COST; double random_page_cost = DEFAULT_RANDOM_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; +double cpu_shared_tuple_cost = DEFAULT_CPU_SHARED_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST; @@ -2693,16 +2694,19 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, List *hashclauses, Path *outer_path, Path *inner_path, SpecialJoinInfo *sjinfo, - SemiAntiJoinFactors *semifactors) + SemiAntiJoinFactors *semifactors, + HashPathTableType table_type) { Cost startup_cost = 0; Cost run_cost = 0; double outer_path_rows = outer_path->rows; double inner_path_rows = inner_path->rows; + double inner_path_rows_total = inner_path_rows; int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; int num_skew_mcvs; + size_t space_allowed; /* not used */ /* cost of source data */ startup_cost += outer_path->startup_cost; @@ -2724,8 +2728,43 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows; /* + * If this is a shared hash table, there is a extra charge for inserting + * each tuple into the shared hash table, to cover the overhead of memory + * synchronization that makes the hash table slightly slower to build than + * a private hash table. There is no extra charge for probing the hash + * table for outer path row, on the basis that read-only access to the + * hash table shouldn't generate any extra memory synchronization. + * + * cpu_shared_tuple_cost acts a tie-breaker controlling whether we prefer + * HASH_TABLE_PRIVATE or HASH_TABLE_SHARED_SERIAL plans, when the hash + * table fits in work_mem, since the cost is otherwise the same. If it is + * positive, then we'll prefer private hash tables, even though that means + * that we'll be running N copies of the inner plan. Running N copies of + * the copies of the inner plan in parallel is not considered more + * expensive than running 1 copy of the inner plan while N-1 participants + * do nothing, despite doing less work in total. + */ + if (table_type != HASHPATH_TABLE_PRIVATE) + startup_cost += cpu_shared_tuple_cost * inner_path_rows; + + /* + * If this is a parallel shared hash table, then the value we have for + * inner_rows refers only to the rows returned by each participant. For + * shared hash table size estimation, we need the total number, so we need + * to undo the division. + */ + if (table_type == HASHPATH_TABLE_SHARED_PARALLEL) + inner_path_rows_total *= outer_path->parallel_workers + 1; + + /* * Get hash table size that executor would use for inner relation. * + * Shared hash tables are allowed to be larger to make up for the fact + * that there is only one copy shared by all parallel query participants, + * which may reduce the number of batches. That means that + * HASH_TABLE_SHARED_SERIAL is likely to beat HASH_TABLE_PRIVATE when we + * expect to exceed work_mem. + * * XXX for the moment, always assume that skew optimization will be * performed. As long as SKEW_WORK_MEM_PERCENT is small, it's not worth * trying to determine that for sure. @@ -2733,9 +2772,12 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * XXX at some point it might be interesting to try to account for skew * optimization in the cost estimate, but for now, we don't. */ - ExecChooseHashTableSize(inner_path_rows, + ExecChooseHashTableSize(inner_path_rows_total, inner_path->pathtarget->width, true, /* useskew */ + table_type != HASHPATH_TABLE_PRIVATE, /* shared */ + outer_path->parallel_workers, + &space_allowed, &numbuckets, &numbatches, &num_skew_mcvs); @@ -2746,12 +2788,19 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * time. Charge seq_page_cost per page, since the I/O should be nice and * sequential. Writing the inner rel counts as startup cost, all the rest * as run cost. + * + * If the hash table is HASH_TABLE_PRIVATE, then every participant will + * write a copy of every batch file, but this happens in parallel so we + * don't consider that to be more expensive than the + * HASH_TABLE_SHARED_SERIAL case where only one participant does that. It + * is not clear how the costing should be affected by higher disk + * bandwidth usage. */ if (numbatches > 1) { double outerpages = page_size(outer_path_rows, outer_path->pathtarget->width); - double innerpages = page_size(inner_path_rows, + double innerpages = page_size(inner_path_rows_total, inner_path->pathtarget->width); startup_cost += seq_page_cost * innerpages; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index 7c30ec6..209b9d1 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -492,7 +492,8 @@ try_hashjoin_path(PlannerInfo *root, Path *inner_path, List *hashclauses, JoinType jointype, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + HashPathTableType table_type) { Relids required_outer; JoinCostWorkspace workspace; @@ -517,7 +518,7 @@ try_hashjoin_path(PlannerInfo *root, */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, outer_path, inner_path, - extra->sjinfo, &extra->semifactors); + extra->sjinfo, &extra->semifactors, table_type); if (add_path_precheck(joinrel, workspace.startup_cost, workspace.total_cost, @@ -534,7 +535,8 @@ try_hashjoin_path(PlannerInfo *root, inner_path, extra->restrictlist, required_outer, - hashclauses)); + hashclauses, + table_type)); } else { @@ -555,7 +557,8 @@ try_partial_hashjoin_path(PlannerInfo *root, Path *inner_path, List *hashclauses, JoinType jointype, - JoinPathExtraData *extra) + JoinPathExtraData *extra, + HashPathTableType table_type) { JoinCostWorkspace workspace; @@ -580,7 +583,8 @@ try_partial_hashjoin_path(PlannerInfo *root, */ initial_cost_hashjoin(root, &workspace, jointype, hashclauses, outer_path, inner_path, - extra->sjinfo, &extra->semifactors); + extra->sjinfo, &extra->semifactors, + table_type); if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL)) return; @@ -596,7 +600,8 @@ try_partial_hashjoin_path(PlannerInfo *root, inner_path, extra->restrictlist, NULL, - hashclauses)); + hashclauses, + table_type)); } /* @@ -1401,7 +1406,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); /* no possibility of cheap startup here */ } else if (jointype == JOIN_UNIQUE_INNER) @@ -1417,7 +1423,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); if (cheapest_startup_outer != NULL && cheapest_startup_outer != cheapest_total_outer) try_hashjoin_path(root, @@ -1426,7 +1433,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); } else { @@ -1447,7 +1455,8 @@ hash_inner_and_outer(PlannerInfo *root, cheapest_total_inner, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); foreach(lc1, outerrel->cheapest_parameterized_paths) { @@ -1481,7 +1490,8 @@ hash_inner_and_outer(PlannerInfo *root, innerpath, hashclauses, jointype, - extra); + extra, + HASHPATH_TABLE_PRIVATE); } } } @@ -1490,23 +1500,32 @@ hash_inner_and_outer(PlannerInfo *root, * If the joinrel is parallel-safe, we may be able to consider a * partial hash join. However, we can't handle JOIN_UNIQUE_OUTER, * because the outer path will be partial, and therefore we won't be - * able to properly guarantee uniqueness. Similarly, we can't handle - * JOIN_FULL and JOIN_RIGHT, because they can produce false null - * extended rows. Also, the resulting path must not be parameterized. + * able to properly guarantee uniqueness. Also, the resulting path + * must not be parameterized. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { Path *cheapest_partial_outer; + Path *cheapest_partial_inner = NULL; Path *cheapest_safe_inner = NULL; cheapest_partial_outer = (Path *) linitial(outerrel->partial_pathlist); + /* Can we use a partial inner plan too? */ + if (innerrel->partial_pathlist != NIL) + cheapest_partial_inner = + (Path *) linitial(innerrel->partial_pathlist); + if (cheapest_partial_inner != NULL) + try_partial_hashjoin_path(root, joinrel, + cheapest_partial_outer, + cheapest_partial_inner, + hashclauses, jointype, extra, + HASHPATH_TABLE_SHARED_PARALLEL); + /* * Normally, given that the joinrel is parallel-safe, the cheapest * total inner path will also be parallel-safe, but if not, we'll @@ -1534,10 +1553,27 @@ hash_inner_and_outer(PlannerInfo *root, } if (cheapest_safe_inner != NULL) + { + /* Try a shared table with only one worker building the table. */ try_partial_hashjoin_path(root, joinrel, cheapest_partial_outer, cheapest_safe_inner, - hashclauses, jointype, extra); + hashclauses, jointype, extra, + HASHPATH_TABLE_SHARED_SERIAL); + /* + * Also try private hash tables, built by each worker, but + * only if it's not a FULL or RIGHT join. Those rely on being + * able to track which hash table entries have been matched, + * but we don't have a way to unify the HEAP_TUPLE_HAS_MATCH + * flags from all the private copies of the hash table. + */ + if (save_jointype != JOIN_FULL && save_jointype != JOIN_RIGHT) + try_partial_hashjoin_path(root, joinrel, + cheapest_partial_outer, + cheapest_safe_inner, + hashclauses, jointype, extra, + HASHPATH_TABLE_PRIVATE); + } } } } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index c7bcd9b..cac4932 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -3938,6 +3938,23 @@ create_hashjoin_plan(PlannerInfo *root, copy_plan_costsize(&hash_plan->plan, inner_plan); hash_plan->plan.startup_cost = hash_plan->plan.total_cost; + /* + * Set the table as sharable if appropriate, with parallel or serial + * building. + */ + switch (best_path->table_type) + { + case HASHPATH_TABLE_SHARED_PARALLEL: + hash_plan->shared_table = true; + hash_plan->plan.parallel_aware = true; + break; + case HASHPATH_TABLE_SHARED_SERIAL: + hash_plan->shared_table = true; + break; + case HASHPATH_TABLE_PRIVATE: + break; + } + join_plan = make_hashjoin(tlist, joinclauses, otherclauses, diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 3b7c56d..a1d7b20 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2096,6 +2096,7 @@ create_mergejoin_path(PlannerInfo *root, * 'required_outer' is the set of required outer rels * 'hashclauses' are the RestrictInfo nodes to use as hash clauses * (this should be a subset of the restrict_clauses list) + * 'table_type' for level of hash table sharing */ HashPath * create_hashjoin_path(PlannerInfo *root, @@ -2108,7 +2109,8 @@ create_hashjoin_path(PlannerInfo *root, Path *inner_path, List *restrict_clauses, Relids required_outer, - List *hashclauses) + List *hashclauses, + HashPathTableType table_type) { HashPath *pathnode = makeNode(HashPath); @@ -2123,9 +2125,13 @@ create_hashjoin_path(PlannerInfo *root, sjinfo, required_outer, &restrict_clauses); - pathnode->jpath.path.parallel_aware = false; + pathnode->jpath.path.parallel_aware = + joinrel->consider_parallel && + (table_type == HASHPATH_TABLE_SHARED_SERIAL || + table_type == HASHPATH_TABLE_SHARED_PARALLEL); pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->table_type = table_type; /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index f37a0bf..d562fef 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3392,6 +3392,63 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_HASH_CREATING: + event_name = "Hash/Creating"; + break; + case WAIT_EVENT_HASH_HASHING: + event_name = "Hash/Hashing"; + break; + case WAIT_EVENT_HASH_SHRINKING1: + event_name = "Hash/Shrinking1"; + break; + case WAIT_EVENT_HASH_SHRINKING2: + event_name = "Hash/Shrinking2"; + break; + case WAIT_EVENT_HASH_SHRINKING3: + event_name = "Hash/Shrinking3"; + break; + case WAIT_EVENT_HASH_SHRINKING4: + event_name = "Hash/Shrinking4"; + break; + case WAIT_EVENT_HASH_RESIZING: + event_name = "Hash/Resizing"; + break; + case WAIT_EVENT_HASH_REBUCKETING: + event_name = "Hash/Rebucketing"; + break; + case WAIT_EVENT_HASH_BEGINNING: + event_name = "Hash/Beginning"; + break; + case WAIT_EVENT_HASH_DESTROY: + event_name = "Hash/Destroy"; + break; + case WAIT_EVENT_HASH_UNMATCHED: + event_name = "Hash/Unmatched"; + break; + case WAIT_EVENT_HASH_PROMOTING: + event_name = "Hash/Promoting"; + break; + case WAIT_EVENT_HASHJOIN_PROMOTING: + event_name = "HashJoin/Promoting"; + break; + case WAIT_EVENT_HASHJOIN_PREPARING: + event_name = "HashJoin/Preparing"; + break; + case WAIT_EVENT_HASHJOIN_PROBING: + event_name = "HashJoin/Probing"; + break; + case WAIT_EVENT_HASHJOIN_LOADING: + event_name = "HashJoin/Loading";; + break; + case WAIT_EVENT_HASHJOIN_REWINDING: + event_name = "HashJoin/Rewinding";; + break; + case WAIT_EVENT_HASHJOIN_REWINDING2: + event_name = "HashJoin/Rewinding2";; + break; + case WAIT_EVENT_HASHJOIN_REWINDING3: + event_name = "HashJoin/Rewinding3";; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 7ebd636..18ffd4e 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -40,8 +40,11 @@ #include "storage/fd.h" #include "storage/buffile.h" #include "storage/buf_internals.h" +#include "utils/probes.h" #include "utils/resowner.h" +extern int ParallelWorkerNumber; + /* * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE. * The reason is that we'd like large temporary BufFiles to be spread across @@ -89,6 +92,24 @@ struct BufFile char buffer[BLCKSZ]; }; +/* + * Serialized representation of a single file managed by a BufFile. + */ +typedef struct BufFileFileDescriptor +{ + char path[MAXPGPATH]; +} BufFileFileDescriptor; + +/* + * Serialized representation of a BufFile, to be created by BufFileExport and + * consumed by BufFileImport. + */ +struct BufFileDescriptor +{ + size_t num_files; + BufFileFileDescriptor files[FLEXIBLE_ARRAY_MEMBER]; +}; + static BufFile *makeBufFile(File firstfile); static void extendBufFile(BufFile *file); static void BufFileLoadBuffer(BufFile *file); @@ -178,6 +199,81 @@ BufFileCreateTemp(bool interXact) return file; } +/* + * Export a BufFile description in a serialized form so that another backend + * can attach to it and read from it. The format is opaque, but it may be + * bitwise copied, and its size may be obtained with BufFileDescriptorSize(). + */ +BufFileDescriptor * +BufFileExport(BufFile *file) +{ + BufFileDescriptor *descriptor; + int i; + + /* Flush output from local buffers. */ + BufFileFlush(file); + + /* Create and fill in a descriptor. */ + descriptor = palloc0(offsetof(BufFileDescriptor, files) + + sizeof(BufFileFileDescriptor) * file->numFiles); + descriptor->num_files = file->numFiles; + for (i = 0; i < descriptor->num_files; ++i) + { + TRACE_POSTGRESQL_BUFFILE_EXPORT_FILE(FilePathName(file->files[i])); + strcpy(descriptor->files[i].path, FilePathName(file->files[i])); + } + + return descriptor; +} + +/* + * Return the size in bytes of a BufFileDescriptor, so that it can be copied. + */ +size_t +BufFileDescriptorSize(const BufFileDescriptor *descriptor) +{ + return offsetof(BufFileDescriptor, files) + + sizeof(BufFileFileDescriptor) * descriptor->num_files; +} + +/* + * Open a BufFile that was created by another backend and then exported. The + * file must be read-only in all backends, and is still owned by the backend + * that created it. This provides a way for cooperating backends to share + * immutable temporary data such as hash join batches. + */ +BufFile * +BufFileImport(BufFileDescriptor *descriptor) +{ + BufFile *file = (BufFile *) palloc0(sizeof(BufFile)); + int i; + + file->numFiles = descriptor->num_files; + file->files = (File *) palloc0(sizeof(File) * descriptor->num_files); + file->offsets = (off_t *) palloc0(sizeof(off_t) * descriptor->num_files); + file->isTemp = false; + file->isInterXact = true; /* prevent cleanup by this backend */ + file->dirty = false; + file->resowner = CurrentResourceOwner; + file->curFile = 0; + file->curOffset = 0L; + file->pos = 0; + file->nbytes = 0; + + for (i = 0; i < descriptor->num_files; ++i) + { + TRACE_POSTGRESQL_BUFFILE_IMPORT_FILE(descriptor->files[i].path); + file->files[i] = + PathNameOpenFile(descriptor->files[i].path, + O_RDONLY | PG_BINARY, 0600); + if (file->files[i] <= 0) + elog(ERROR, "failed to import \"%s\": %m", + descriptor->files[i].path); + } + + return file; +} + #ifdef NOT_USED /* * Create a BufFile and attach it to an already-opened virtual File. diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 1cf0684..833b059 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -510,6 +510,12 @@ RegisterLWLockTranches(void) "predicate_lock_manager"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA, "parallel_query_dsa"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN_INNER_BATCH_READER, + "hash_join_inner_batches"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN_OUTER_BATCH_READER, + "hash_join_outer_batches"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN_CHUNK, + "hash_join_chunk"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 5b23dbf..fdb6d24 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2855,6 +2855,16 @@ static struct config_real ConfigureNamesReal[] = NULL, NULL, NULL }, { + {"cpu_shared_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "sharing each tuple with other parallel workers."), + NULL + }, + &cpu_shared_tuple_cost, + DEFAULT_CPU_SHARED_TUPLE_COST, -DBL_MAX, DBL_MAX, + NULL, NULL, NULL + }, + { {"cpu_index_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, gettext_noop("Sets the planner's estimate of the cost of " "processing each index entry during an index scan."), diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 146fce9..3239c3c 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -60,6 +60,40 @@ provider postgresql { probe sort__start(int, bool, int, int, bool); probe sort__done(bool, long); + probe hash__leader__early__exit(); + probe hash__worker__early__exit(); + probe hash__hashing__start(); + probe hash__hashing__done(); + probe hash__loading__start(); + probe hash__loading__done(); + probe hash__increase__buckets(int); + probe hash__increase__batches(int); + probe hash__shrink__start(int); + probe hash__shrink__done(); + probe hash__shrink__chunk(); + probe hash__shrink__disabled(); + probe hash__shrink__stats(size_t, size_t, size_t, size_t); + probe hash__rebucket__start(); + probe hash__rebucket__done(int); + probe hash__free__chunk(size_t); + probe hash__allocate__chunk(size_t); + probe hash__save__tuple(int, int, int); + probe hash__get__saved__tuple(int, int, int, int); + probe hash__seek(int, int, int, int, int, size_t); + probe hash__tell(int, int, int, int, int, size_t); + probe hash__insert(int); + probe hash__probe(int, int); + + probe hashjoin__start(); + probe hashjoin__done(); + probe hashjoin__export__all__batches(int, int); + probe hashjoin__export__batch(int, int, bool); + probe hashjoin__import__batch(int, int, bool); + probe hashjoin__open__batch(int, int, bool); + + probe buffile__import__file(const char *); + probe buffile__export__file(const char *); + probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool); probe buffer__read__done(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool, bool); probe buffer__flush__start(ForkNumber, BlockNumber, Oid, Oid, Oid); diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index ac84053..2effc77 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -15,7 +15,13 @@ #define HASHJOIN_H #include "nodes/execnodes.h" +#include "port/atomics.h" +#include "storage/barrier.h" #include "storage/buffile.h" +#include "storage/fd.h" +#include "storage/lwlock.h" +#include "storage/spin.h" +#include "utils/dsa.h" /* ---------------------------------------------------------------- * hash-join hash table structures @@ -63,7 +69,12 @@ typedef struct HashJoinTupleData { - struct HashJoinTupleData *next; /* link to next tuple in same bucket */ + /* link to next tuple in same bucket */ + union + { + dsa_pointer shared; + struct HashJoinTupleData *private; + } next; uint32 hashvalue; /* tuple's hash code */ /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ } HashJoinTupleData; @@ -94,7 +105,12 @@ typedef struct HashJoinTupleData typedef struct HashSkewBucket { uint32 hashvalue; /* common hash value */ - HashJoinTuple tuples; /* linked list of inner-relation tuples */ + /* linked list of inner-relation tuples */ + union + { + dsa_pointer shared; + HashJoinTuple private; + } tuples; } HashSkewBucket; #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket)) @@ -103,8 +119,9 @@ typedef struct HashSkewBucket #define SKEW_MIN_OUTER_FRACTION 0.01 /* - * To reduce palloc overhead, the HashJoinTuples for the current batch are - * packed in 32kB buffers instead of pallocing each tuple individually. + * To reduce palloc/dsa_allocate overhead, the HashJoinTuples for the current + * batch are packed in 32kB buffers instead of pallocing each tuple + * individually. */ typedef struct HashMemoryChunkData { @@ -112,17 +129,137 @@ typedef struct HashMemoryChunkData size_t maxlen; /* size of the buffer holding the tuples */ size_t used; /* number of buffer bytes already used */ - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked - * list) */ + /* pointer to the next chunk (linked list) */ + union + { + dsa_pointer shared; + struct HashMemoryChunkData *private; + } next; char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */ } HashMemoryChunkData; typedef struct HashMemoryChunkData *HashMemoryChunk; + + #define HASH_CHUNK_SIZE (32 * 1024L) #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) +/* + * Read head position in a shared batch file. + */ +typedef struct HashJoinBatchPosition +{ + int fileno; + off_t offset; +} HashJoinBatchPosition; + +/* + * The state exposed in shared memory by each participant to coordinate + * reading of batch files that it wrote. + */ +typedef struct HashJoinSharedBatchReader +{ + int batchno; /* the batch number we are currently reading */ + + LWLock lock; /* protects access to the members below */ + bool error; /* has an IO error occurred? */ + HashJoinBatchPosition head; /* shared read head for current batch */ +} HashJoinSharedBatchReader; + +/* + * The state exposed in shared memory by each participant allowing its batch + * files to be read by other participants. + */ +typedef struct HashJoinParticipantState +{ + /* + * To allow other participants to read from this participant's batch + * files, this participant publishes its batch descriptors (or invalid + * pointers) here. + */ + int inner_batchno; + int outer_batchno; + dsa_pointer inner_batch_descriptor; + dsa_pointer outer_batch_descriptor; + + /* + * In the case of participants that exit early, they must publish all + * their future batches, rather than publishing them one by one above. + * These point to an array of dsa_pointers to BufFileDescriptor objects. + */ + int nbatch; + dsa_pointer inner_batch_descriptors; + dsa_pointer outer_batch_descriptors; + + /* + * The shared state used to coordinate reading from the current batch. We + * need separate objects for the outer and inner side, because in the + * probing phase some participants can be reading from the outer batch, + * while others can be reading from the inner side to preload the next + * batch. + */ + HashJoinSharedBatchReader inner_batch_reader; + HashJoinSharedBatchReader outer_batch_reader; +} HashJoinParticipantState; + +/* + * The state used by each backend to manage reading from batch files written + * by all participants. + */ +typedef struct HashJoinBatchReader +{ + int participant_number; /* read which participant's batch? */ + int batchno; /* which batch are we reading? */ + bool inner; /* inner or outer? */ + HashJoinSharedBatchReader *shared; /* holder of the shared read head */ + BufFile *file; /* the file opened in this backend */ + HashJoinBatchPosition head; /* local read head position */ +} HashJoinBatchReader; + +/* + * State for a shared hash join table. Each backend participating in a hash + * join with a shared hash table also has a HashJoinTableData object in + * backend-private memory, which points to this shared state in the DSM + * segment. + */ +typedef struct SharedHashJoinTableData +{ + Barrier barrier; /* synchronization for the whole join */ + Barrier shrink_barrier; /* synchronization to shrink hashtable */ + dsa_pointer buckets; /* primary hash table */ + bool at_least_one_worker; /* did at least one worker join in time? */ + int nbuckets; + int nbuckets_optimal; + int nbatch; + + LWLock chunk_lock; /* protects the following members */ + dsa_pointer chunks; /* chunks loaded for the current batch */ + dsa_pointer chunks_preloaded; /* chunks preloaded for the next batch */ + dsa_pointer chunks_to_rebucket; /* chunks with tuples to insert */ + dsa_pointer chunks_to_shrink; /* chunks needing to be thinned out */ + dsa_pointer chunks_unmatched; /* chunks for unmatched scanning */ + Size tuples_this_batch; /* number of tuples in chunks */ + Size tuples_next_batch; /* number of tuples in chunks_preloaded */ + Size tuples_in_memory; /* shared counter while rebatching */ + Size tuples_written_out; /* shared counter while rebatching */ + Size size; /* size of buckets + chunks */ + Size size_preloaded; /* size of chunks_preloaded */ + bool shrinking_enabled; + + int planned_participants; /* number of planned workers + leader */ + + /* state exposed by each participant for sharing batches */ + HashJoinParticipantState participants[FLEXIBLE_ARRAY_MEMBER]; +} SharedHashJoinTableData; + +typedef union HashJoinBucketHead +{ + dsa_pointer_atomic shared; + HashJoinTuple private; +} HashJoinBucketHead; + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ @@ -134,7 +271,7 @@ typedef struct HashJoinTableData int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ /* buckets[i] is head of list of tuples in i'th in-memory bucket */ - struct HashJoinTupleData **buckets; + HashJoinBucketHead *buckets; /* buckets array is per-batch storage, as are all the tuples */ bool keepNulls; /* true to store unmatchable NULL tuples */ @@ -185,7 +322,84 @@ typedef struct HashJoinTableData MemoryContext batchCxt; /* context for this-batch-only storage */ /* used for dense allocation of tuples (into linked chunks) */ - HashMemoryChunk chunks; /* one list for the whole batch */ + HashMemoryChunk chunk; /* current chunk */ + HashMemoryChunk chunk_preload; /* current chunk for next batch */ + HashMemoryChunk chunks_to_rebucket; /* after resizing table */ + HashMemoryChunk chunks_to_shrink; + int chunk_unmatched_pos; /* head when scanning for unmatched tuples */ + + /* State for coordinating shared tables for parallel hash joins. */ + dsa_area *area; + SharedHashJoinTableData *shared; /* the shared state */ + int attached_at_phase; /* the phase this participant joined */ + bool detached_early; /* did we decide to detach early? */ + HashJoinBatchReader batch_reader; /* state for reading batches in */ + bool preloaded_spare_tuple; /* is there an extra preloaded tuple? */ + uint32 preloaded_spare_tuple_hash; /* the tuple's hash value if so */ + dsa_pointer chunk_shared; /* DSA pointer to 'chunk' */ + dsa_pointer chunk_preload_shared; /* DSA pointer to 'chunk_preload' */ + } HashJoinTableData; +/* Check if a HashJoinTable is shared by parallel workers. */ +#define HashJoinTableIsShared(table) ((table)->shared != NULL) + +/* The phases of a parallel hash join. */ +#define PHJ_PHASE_BEGINNING 0 +#define PHJ_PHASE_CREATING 1 +#define PHJ_PHASE_HASHING 2 +#define PHJ_PHASE_RESIZING 3 +#define PHJ_PHASE_REBUCKETING 4 +#define PHJ_PHASE_PROBING 5 /* PHJ_PHASE_PROBING_BATCH(0) */ +#define PHJ_PHASE_UNMATCHED 6 /* PHJ_PHASE_UNMATCHED_BATCH(0) */ + +/* The subphases for batches. */ +#define PHJ_SUBPHASE_PROMOTING 0 +#define PHJ_SUBPHASE_LOADING 1 +#define PHJ_SUBPHASE_PREPARING 2 +#define PHJ_SUBPHASE_PROBING 3 +#define PHJ_SUBPHASE_UNMATCHED 4 + +/* The phases of parallel processing for batch(n). */ +#define PHJ_PHASE_PROMOTING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 4) +#define PHJ_PHASE_LOADING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 3) +#define PHJ_PHASE_PREPARING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 2) +#define PHJ_PHASE_PROBING_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 1) +#define PHJ_PHASE_UNMATCHED_BATCH(n) (PHJ_PHASE_UNMATCHED + (n) * 5 - 0) + +/* Phase number -> sub-phase within a batch. */ +#define PHJ_PHASE_TO_SUBPHASE(p) \ + (((int)(p) - PHJ_PHASE_UNMATCHED + PHJ_SUBPHASE_UNMATCHED) % 5) + +/* Phase number -> batch number. */ +#define PHJ_PHASE_TO_BATCHNO(p) \ + (((int)(p) - PHJ_PHASE_UNMATCHED + PHJ_SUBPHASE_UNMATCHED) / 5) + +/* + * Is a given phase one in which a new hash table array is being assigned by + * one elected backend? That includes initial creation, reallocation during + * resize, and promotion of secondary hash table to primary. Workers that + * show up and attach at an arbitrary time must wait such phases out before + * doing anything with the hash table. + */ +#define PHJ_PHASE_MUTATING_TABLE(p) \ + ((p) == PHJ_PHASE_CREATING || \ + (p) == PHJ_PHASE_RESIZING || \ + (PHJ_PHASE_TO_BATCHNO(p) > 0 && \ + PHJ_PHASE_TO_SUBPHASE(p) == PHJ_SUBPHASE_PROMOTING)) + +/* The phases of ExecHashShrink. */ +#define PHJ_SHRINK_PHASE_BEGINNING 0 +#define PHJ_SHRINK_PHASE_CLEARING 1 +#define PHJ_SHRINK_PHASE_WORKING 2 +#define PHJ_SHRINK_PHASE_DECIDING 3 + +/* + * Return the 'participant number' for a process participating in a parallel + * hash join. We give a number < hashtable->shared->planned_participants + * to each potential participant, including the leader. + */ +#define HashJoinParticipantNumber() \ + (IsParallelWorker() ? ParallelWorkerNumber + 1 : 0) + #endif /* HASHJOIN_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index fe5c264..a7a5c6e 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -22,12 +22,12 @@ extern Node *MultiExecHash(HashState *node); extern void ExecEndHash(HashState *node); extern void ExecReScanHash(HashState *node); -extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators, +extern HashJoinTable ExecHashTableCreate(HashState *node, List *hashOperators, bool keepNulls); extern void ExecHashTableDestroy(HashJoinTable hashtable); -extern void ExecHashTableInsert(HashJoinTable hashtable, +extern bool ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, - uint32 hashvalue); + uint32 hashvalue, bool secondary); extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, @@ -45,9 +45,14 @@ extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool shared, int parallel_workers, + size_t *spaceAllowed, int *numbuckets, int *numbatches, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); +extern void ExecHashUpdate(HashJoinTable hashtable); +extern bool ExecHashCheckForEarlyExit(HashJoinTable hashtable); +extern void ExecHashRebucket(HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index ddc32b1..ef7d935 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -14,15 +14,28 @@ #ifndef NODEHASHJOIN_H #define NODEHASHJOIN_H +#include "access/parallel.h" #include "nodes/execnodes.h" #include "storage/buffile.h" +#include "storage/shm_toc.h" extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags); extern TupleTableSlot *ExecHashJoin(HashJoinState *node); extern void ExecEndHashJoin(HashJoinState *node); +extern void ExecShutdownHashJoin(HashJoinState *node); extern void ExecReScanHashJoin(HashJoinState *node); -extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, - BufFile **fileptr); +extern void ExecHashJoinSaveTuple(HashJoinTable hashtable, + MinimalTuple tuple, uint32 hashvalue, + int batchno, bool inner); +extern void ExecHashJoinRewindBatches(HashJoinTable hashtable, int batchno); +extern void ExecHashJoinOpenBatch(HashJoinTable hashtable, + int batchno, bool inner); +extern void ExecHashJoinCloseBatch(HashJoinTable hashtable, + int batchno, bool inner); + +extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeWorker(HashJoinState *state, shm_toc *toc); #endif /* NODEHASHJOIN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index ce13bf7..deb8497 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -21,6 +21,7 @@ #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "utils/dsa.h" #include "utils/hsearch.h" #include "utils/reltrigger.h" #include "utils/sortsupport.h" @@ -1755,6 +1756,7 @@ typedef struct MergeJoinState /* these structs are defined in executor/hashjoin.h: */ typedef struct HashJoinTupleData *HashJoinTuple; typedef struct HashJoinTableData *HashJoinTable; +typedef struct SharedHashJoinTableData *SharedHashJoinTable; typedef struct HashJoinState { @@ -1776,6 +1778,7 @@ typedef struct HashJoinState int hj_JoinState; bool hj_MatchedOuter; bool hj_OuterNotEmpty; + SharedHashJoinTable hj_sharedHashJoinTable; } HashJoinState; @@ -2006,6 +2009,9 @@ typedef struct HashState HashJoinTable hashtable; /* hash table for the hashjoin */ List *hashkeys; /* list of ExprState nodes */ /* hashkeys is same as parent's hj_InnerHashKeys */ + + /* The following are the same as the parent's. */ + SharedHashJoinTable shared_table_data; } HashState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 692a626..6d1460b 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -782,6 +782,7 @@ typedef struct Hash bool skewInherit; /* is outer join rel an inheritance tree? */ Oid skewColType; /* datatype of the outer key column */ int32 skewColTypmod; /* typmod of the outer key column */ + bool shared_table; /* table shared by multiple workers? */ /* all other info is in the parent HashJoin node */ } Hash; diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index e1d31c7..43f9515 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1258,6 +1258,16 @@ typedef struct MergePath bool materialize_inner; /* add Materialize to inner? */ } MergePath; +typedef enum +{ + /* Every worker builds its own private copy of the hash table. */ + HASHPATH_TABLE_PRIVATE, + /* One worker builds a shared hash table, and all workers probe it. */ + HASHPATH_TABLE_SHARED_SERIAL, + /* All workers build a shared hash table, and then probe it. */ + HASHPATH_TABLE_SHARED_PARALLEL +} HashPathTableType; + /* * A hashjoin path has these fields. * @@ -1272,6 +1282,7 @@ typedef struct HashPath JoinPath jpath; List *path_hashclauses; /* join clauses used for hashing */ int num_batches; /* number of batches expected */ + HashPathTableType table_type; /* level of sharedness */ } HashPath; /* diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 39376ec..220c013 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -24,6 +24,7 @@ #define DEFAULT_SEQ_PAGE_COST 1.0 #define DEFAULT_RANDOM_PAGE_COST 4.0 #define DEFAULT_CPU_TUPLE_COST 0.01 +#define DEFAULT_CPU_SHARED_TUPLE_COST 0.001 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 #define DEFAULT_PARALLEL_TUPLE_COST 0.1 @@ -48,6 +49,7 @@ typedef enum extern PGDLLIMPORT double seq_page_cost; extern PGDLLIMPORT double random_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; +extern PGDLLIMPORT double cpu_shared_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; extern PGDLLIMPORT double parallel_tuple_cost; @@ -144,7 +146,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root, List *hashclauses, Path *outer_path, Path *inner_path, SpecialJoinInfo *sjinfo, - SemiAntiJoinFactors *semifactors); + SemiAntiJoinFactors *semifactors, + HashPathTableType table_type); extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, SpecialJoinInfo *sjinfo, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index d16f879..42633c5 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -134,7 +134,8 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root, Path *inner_path, List *restrict_clauses, Relids required_outer, - List *hashclauses); + List *hashclauses, + HashPathTableType table_type); extern ProjectionPath *create_projection_path(PlannerInfo *root, RelOptInfo *rel, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5b37894..f54b0a5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -785,7 +785,26 @@ typedef enum WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_SAFE_SNAPSHOT, - WAIT_EVENT_SYNC_REP + WAIT_EVENT_SYNC_REP, + WAIT_EVENT_HASH_CREATING, + WAIT_EVENT_HASH_HASHING, + WAIT_EVENT_HASH_RESIZING, + WAIT_EVENT_HASH_REBUCKETING, + WAIT_EVENT_HASH_BEGINNING, + WAIT_EVENT_HASH_DESTROY, + WAIT_EVENT_HASH_UNMATCHED, + WAIT_EVENT_HASH_PROMOTING, + WAIT_EVENT_HASH_SHRINKING1, + WAIT_EVENT_HASH_SHRINKING2, + WAIT_EVENT_HASH_SHRINKING3, + WAIT_EVENT_HASH_SHRINKING4, + WAIT_EVENT_HASHJOIN_PROMOTING, + WAIT_EVENT_HASHJOIN_PROBING, + WAIT_EVENT_HASHJOIN_LOADING, + WAIT_EVENT_HASHJOIN_PREPARING, + WAIT_EVENT_HASHJOIN_REWINDING, + WAIT_EVENT_HASHJOIN_REWINDING2, /* TODO: rename me */ + WAIT_EVENT_HASHJOIN_REWINDING3 /* TODO: rename me */ } WaitEventIPC; /* ---------- diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index fe00bf0..023eb3f 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -30,12 +30,17 @@ typedef struct BufFile BufFile; +typedef struct BufFileDescriptor BufFileDescriptor; + /* * prototypes for functions in buffile.c */ extern BufFile *BufFileCreateTemp(bool interXact); extern void BufFileClose(BufFile *file); +extern BufFileDescriptor *BufFileExport(BufFile *file); +extern BufFile *BufFileImport(BufFileDescriptor *descriptor); +extern size_t BufFileDescriptorSize(const BufFileDescriptor *descriptor); extern size_t BufFileRead(BufFile *file, void *ptr, size_t size); extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size); extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 8bd93c3..dd6d48e 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -211,6 +211,9 @@ typedef enum BuiltinTrancheIds LWTRANCHE_BUFFER_MAPPING, LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, + LWTRANCHE_PARALLEL_HASH_JOIN_INNER_BATCH_READER, + LWTRANCHE_PARALLEL_HASH_JOIN_OUTER_BATCH_READER, + LWTRANCHE_PARALLEL_HASH_JOIN_CHUNK, LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;