From 02c63bd0fcdceb6cdf3026b706fe9aa6e54d3e26 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Thu, 17 Sep 2020 14:10:28 -0700 Subject: [PATCH v1] Support parallel FOJ and ROJ To support parallel FOJ and ROJ, - re-enable setting match bit for tuples in the hash table - a single worker preps for unmatched inner tuple scan in HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock. ExecParallelScanHashTableForUnmatched() is still safe for multiple workers, though only a single worker will run it --- src/backend/executor/nodeHash.c | 128 ++++++++++++++++++++++-- src/backend/executor/nodeHashjoin.c | 55 +++++----- src/backend/optimizer/path/joinpath.c | 9 +- src/backend/postmaster/pgstat.c | 3 + src/backend/storage/ipc/barrier.c | 23 ++++- src/include/executor/hashjoin.h | 5 +- src/include/executor/nodeHash.h | 3 + src/include/nodes/execnodes.h | 2 + src/include/pgstat.h | 1 + src/include/storage/barrier.h | 1 + src/test/regress/expected/join_hash.out | 56 ++++++++++- src/test/regress/sql/join_hash.sql | 23 ++++- 12 files changed, 267 insertions(+), 42 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index ea69eeb2a1..e049c8103d 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2056,6 +2056,45 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) hjstate->hj_CurBucketNo = 0; hjstate->hj_CurSkewBucketNo = 0; hjstate->hj_CurTuple = NULL; + hjstate->hj_AllocatedBucketRange = 0; +} + +/* + * ExecPrepHashTableForUnmatched + * set up for a series of ExecScanHashTableForUnmatched calls + * return true if this worker is elected to do the unmatched inner scan + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool last = false; + hjstate->hj_CurBucketNo = 0; + hjstate->hj_CurSkewBucketNo = 0; + hjstate->hj_CurTuple = NULL; + hjstate->hj_AllocatedBucketRange = 0; + if (curbatch < 0) + return false; + last = BarrierDetachOrElect(&batch->batch_barrier); + if (!last) + { + hashtable->batches[hashtable->curbatch].done = true; + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + } + return last; } /* @@ -2132,6 +2171,87 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return false; } +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + + for (;;) + { + /* + * hj_CurTuple is the address of the tuple last returned from the + * current bucket, or NULL if it's time to start scanning a new + * bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else if (hjstate->hj_CurBucketNo < hjstate->hj_AllocatedBucketRange) + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo++); + else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + { + /* + * Allocate a few cachelines' worth of buckets and loop around. + * Testing shows that 8 is a good factor. + */ + int step = (PG_CACHE_LINE_SIZE * 8) / sizeof(dsa_pointer_atomic); + + hjstate->hj_CurBucketNo = + pg_atomic_fetch_add_u32(&hashtable->batches[hashtable->curbatch].shared->bucket, + step); + hjstate->hj_AllocatedBucketRange = + Min(hjstate->hj_CurBucketNo + step, hashtable->nbuckets); + } + else + break; /* finished all buckets */ + + while (hashTuple != NULL) + { + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + + /* * ExecHashTableReset * @@ -2971,6 +3091,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) * up the Barrier. */ BarrierInit(&shared->batch_barrier, 0); + pg_atomic_init_u32(&shared->bucket, 0); if (i == 0) { /* Batch 0 doesn't need to be loaded. */ @@ -3131,13 +3252,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) /* Detach from the batch we were last working on. */ if (BarrierArriveAndDetach(&batch->batch_barrier)) { - /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. - */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); - /* Free shared chunks and buckets. */ while (DsaPointerIsValid(batch->chunks)) { diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5532b91a71..d34e87d575 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -82,6 +82,8 @@ * PHJ_BATCH_ALLOCATING -- one allocates buckets * PHJ_BATCH_LOADING -- all load the hash table from disk * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_SCAN_INNER -- scan unmatched inner tuples + * PHJ_BATCH_DONE -- end * * Batch 0 is a special case, because it starts out in phase @@ -360,9 +362,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) /* end of batch, or maybe whole join */ if (HJ_FILL_INNER(node)) { - /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + /* set up to scan for unmatched inner tuples */ + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -455,25 +467,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), + * but we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI) @@ -531,7 +531,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -742,6 +743,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate->hj_CurBucketNo = 0; hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; hjstate->hj_CurTuple = NULL; + hjstate->hj_AllocatedBucketRange = 0; hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys, (PlanState *) hjstate); @@ -1173,13 +1175,15 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_DONE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase + * so that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); return true; + case PHJ_BATCH_SCAN_INNER: + /* Fall through. */ case PHJ_BATCH_DONE: @@ -1360,6 +1364,7 @@ ExecReScanHashJoin(HashJoinState *node) node->hj_CurBucketNo = 0; node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; node->hj_CurTuple = NULL; + node->hj_AllocatedBucketRange = 0; node->hj_MatchedOuter = false; node->hj_FirstOuterTupleSlot = NULL; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index db54a6ba2e..d30ea4d86d 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1852,8 +1852,6 @@ hash_inner_and_outer(PlannerInfo *root, */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -1887,9 +1885,12 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * at all because no one process has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e6be2b7836..f6f242d806 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_BATCH_LOAD: event_name = "HashBatchLoad"; break; + case WAIT_EVENT_HASH_BATCH_PROBE: + event_name = "HashBatchProbe"; + break; case WAIT_EVENT_HASH_BUILD_ALLOCATE: event_name = "HashBuildAllocate"; break; diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c index 3e200e02cc..5ebd18314f 100644 --- a/src/backend/storage/ipc/barrier.c +++ b/src/backend/storage/ipc/barrier.c @@ -204,6 +204,28 @@ BarrierArriveAndDetach(Barrier *barrier) { return BarrierDetachImpl(barrier, true); } +/* + * Upon arriving at the barrier, if this worker is not the last worker attached, + * detach from the barrier and return false. If this worker is the last worker, + * remain attached and advance the phase of the barrier, return true to indicate + * you are the last or "elected" worker who is still attached to the barrier. + * Another name I considered was BarrierUniqueify or BarrierSoloAssign + */ +bool +BarrierDetachOrElect(Barrier *barrier) +{ + SpinLockAcquire(&barrier->mutex); + if (barrier->participants > 1) + { + --barrier->participants; + SpinLockRelease(&barrier->mutex); + return false; + } + Assert(barrier->participants == 1); + ++barrier->phase; + SpinLockRelease(&barrier->mutex); + return true; +} /* * Attach to a barrier. All waiting participants will now wait for this @@ -221,7 +243,6 @@ BarrierAttach(Barrier *barrier) ++barrier->participants; phase = barrier->phase; SpinLockRelease(&barrier->mutex); - return phase; } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index eb5daba36b..6299c9ce9f 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch size_t old_ntuples; /* number of tuples before repartitioning */ bool space_exhausted; + pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */ + /* * Variable-sized SharedTuplestore objects follow this struct in memory. * See the accessor macros below. @@ -265,7 +267,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_LOADING 2 #define PHJ_BATCH_PROBING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_SCAN_INNER 4 +#define PHJ_BATCH_DONE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 2db4e2f672..a642736d54 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); +extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index a5ab1aed14..a6b2446958 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1921,6 +1921,7 @@ typedef struct MergeJoinState * tuple, or NULL if starting search * (hj_CurXXX variables are undefined if * OuterTupleSlot is empty!) + * hj_AllocatedBucketRange range allocated for parallel unmatched scan * hj_OuterTupleSlot tuple slot for outer tuples * hj_HashTupleSlot tuple slot for inner (hashed) tuples * hj_NullOuterTupleSlot prepared null tuple for right/full outer joins @@ -1947,6 +1948,7 @@ typedef struct HashJoinState uint32 hj_CurHashValue; int hj_CurBucketNo; int hj_CurSkewBucketNo; + int hj_AllocatedBucketRange; HashJoinTuple hj_CurTuple; TupleTableSlot *hj_OuterTupleSlot; TupleTableSlot *hj_HashTupleSlot; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0dfbac46b4..62d5f1d16b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -856,6 +856,7 @@ typedef enum WAIT_EVENT_HASH_BATCH_ALLOCATE, WAIT_EVENT_HASH_BATCH_ELECT, WAIT_EVENT_HASH_BATCH_LOAD, + WAIT_EVENT_HASH_BATCH_PROBE, WAIT_EVENT_HASH_BUILD_ALLOCATE, WAIT_EVENT_HASH_BUILD_ELECT, WAIT_EVENT_HASH_BUILD_HASH_INNER, diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h index d71927cc2f..8b563fa7e0 100644 --- a/src/include/storage/barrier.h +++ b/src/include/storage/barrier.h @@ -37,6 +37,7 @@ typedef struct Barrier extern void BarrierInit(Barrier *barrier, int num_workers); extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info); extern bool BarrierArriveAndDetach(Barrier *barrier); +extern bool BarrierDetachOrElect(Barrier *barrier); extern int BarrierAttach(Barrier *barrier); extern bool BarrierDetach(Barrier *barrier); extern int BarrierPhase(Barrier *barrier); diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 3a91c144a2..4ca0e01756 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id); 20000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + rollback to settings; -- An full outer join where every record is not matched. -- non-parallel @@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); @@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Parallel Seq Scan on simple s + -> Parallel Hash + -> Parallel Seq Scan on simple r +(9 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +------- + 40000 +(1 row) + rollback to settings; -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 68c1a8c7b6..504b3611ca 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -418,7 +418,16 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -436,14 +445,24 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table) -- 2.20.1