Parallel Full Hash Join
Hello,
While thinking about looping hash joins (an alternative strategy for
limiting hash join memory usage currently being investigated by
Melanie Plageman in a nearby thread[1]/messages/by-id/CA+hUKGKWWmf=WELLG=aUGbcugRaSQbtm0tKYiBut-B2rVKX63g@mail.gmail.com), the topic of parallel query
deadlock hazards came back to haunt me. I wanted to illustrate the
problems I'm aware of with the concrete code where I ran into this
stuff, so here is a new-but-still-broken implementation of $SUBJECT.
This was removed from the original PHJ submission when I got stuck and
ran out of time in the release cycle for 11. Since the original
discussion is buried in long threads and some of it was also a bit
confused, here's a fresh description of the problems as I see them.
Hopefully these thoughts might help Melanie's project move forward,
because it's closely related, but I didn't want to dump another patch
into that other thread. Hence this new thread.
I haven't succeeded in actually observing a deadlock with the attached
patch (though I did last year, very rarely), but I also haven't tried
very hard. The patch seems to produce the right answers and is pretty
scalable, so it's really frustrating not to be able to get it over the
line.
Tuple queue deadlock hazard:
If the leader process is executing the subplan itself and waiting for
all processes to arrive in ExecParallelHashEndProbe() (in this patch)
while another process has filled up its tuple queue and is waiting for
the leader to read some tuples an unblock it, they will deadlock
forever. That can't happen in the the committed version of PHJ,
because it never waits for barriers after it has begun emitting
tuples.
Some possible ways to fix this:
1. You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
in this patch (the scan for unmatched tuples) is executed by only one
process, using the "detach-and-see-if-you-were-last" trick. Melanie
proposed that for an equivalent problem in the looping hash join. I
think it probably works, but it gives up a lot of parallelism and thus
won't scale as nicely as the attached patch.
2. You could probably make it so that only the leader process drops
out of executing the inner unmatched scan, and then I think you
wouldn't have this very specific problem at the cost of losing some
(but not all) parallelism (ie the leader), but there might be other
variants of the problem. For example, a GatherMerge leader process
might be blocked waiting for the next tuple for a tuple from P1, while
P2 is try to write to a full queue, and P1 waits for P2.
3. You could introduce some kind of overflow for tuple queues, so
that tuple queues can never block because they're full (until you run
out of extra memory buffers or disk and error out). I haven't
seriously looked into this but I'm starting to suspect it's the
industrial strength general solution to the problem and variants of it
that show up in other parallelism projects (Parallel Repartition). As
Robert mentioned last time I talked about this[2]/messages/by-id/CA+TgmoY4LogYcg1y5JPtto_fL-DBUqvxRiZRndDC70iFiVsVFQ@mail.gmail.com, you'd probably only
want to allow spooling (rather than waiting) when the leader is
actually waiting for other processes; I'm not sure how exactly to
control that.
4. <thinking-really-big>Goetz Graefe's writing about parallel sorting
comes close to this topic, which he calls flow control deadlocks. He
mentions the possibility of infinite spooling like (3) as a solution.
He's describing a world where producers and consumers are running
concurrently, and the consumer doesn't just decide to start running
the subplan (what we call "leader participation"), so he doesn't
actually have a problem like Gather deadlock. He describes
planner-enforced rules that allow deadlock free execution even with
fixed-size tuple queue flow control by careful controlling where
order-forcing operators are allowed to appear, so he doesn't have a
problem like Gather Merge deadlock. I'm not proposing we should
create a whole bunch of producer and consumer processes to run
different plan fragments, but I think you can virtualise the general
idea in an async executor with "streams", and that also solves other
problems when you start working with partitions in a world where it's
not even sure how many workers will show up. I see this as a long
term architectural goal requiring vast amounts of energy to achieve,
hence my new interest in (3) for now.</thinking-really-big>
Hypothetical inter-node deadlock hazard:
Right now I think it is the case the whenever any node begins pulling
tuples from a subplan, it continues to do so until either the query
ends early or the subplan runs out of tuples. For example, Append
processes its subplans one at a time until they're done -- it doesn't
jump back and forth. Parallel Append doesn't necessarily run them in
the order that they appear in the plan, but it still runs each one to
completion before picking another one. If we ever had a node that
didn't adhere to that rule, then two Parallel Full Hash Join nodes
could dead lock, if some of the workers were stuck waiting in one
while some were stuck waiting in the other.
If we were happy to decree that that is a rule of the current
PostgreSQL executor, then this hypothetical problem would go away.
For example, consider the old patch I recently rebased[3]/messages/by-id/CA+hUKGLBRyu0rHrDCMC4=Rn3252gogyp1SjOgG8SEKKZv=FwfQ@mail.gmail.com to allow
Append over a bunch of FDWs representing remote shards to return
tuples as soon as they're ready, not necessarily sequentially (and I
think several others have worked on similar patches). To be
committable under such a rule that applies globally to the whole
executor, that patch would only be allowed to *start* them in any
order, but once it's started pulling tuples from a given subplan it'd
have to pull them all to completion before considering another node.
(Again, that problem goes away in an async model like (4), which will
also be able to do much more interesting things with FDWs, and it's
the FDW thing that I think generates more interest in async execution
than my rambling about abstract parallel query problems.)
Some other notes on the patch:
Aside from the deadlock problem, there are some minor details to tidy
up (handling of late starters probably not quite right, rescans not
yet considered). There is a fun hard-coded parameter that controls
the parallel step size in terms of cache lines for the unmatched scan;
I found that 8 was a lot faster than 4, but no slower than 128 on my
laptop, so I set it to 8. More thoughts along those micro-optimistic
lines: instead of match bit in the header, you could tag the pointer
and sometimes avoid having to follow it, and you could prefetch next
non-matching tuple's cacheline by looking a head a bit.
[1]: /messages/by-id/CA+hUKGKWWmf=WELLG=aUGbcugRaSQbtm0tKYiBut-B2rVKX63g@mail.gmail.com
[2]: /messages/by-id/CA+TgmoY4LogYcg1y5JPtto_fL-DBUqvxRiZRndDC70iFiVsVFQ@mail.gmail.com
[3]: /messages/by-id/CA+hUKGLBRyu0rHrDCMC4=Rn3252gogyp1SjOgG8SEKKZv=FwfQ@mail.gmail.com
--
Thomas Munro
https://enterprisedb.com
Attachments:
0001-WIP-Add-support-for-Parallel-Full-Hash-Join.patchapplication/octet-stream; name=0001-WIP-Add-support-for-Parallel-Full-Hash-Join.patchDownload
From 549bd36fc39282503d2ab83f7827437ddf6f3e1b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 12 Sep 2019 00:30:49 +1200
Subject: [PATCH] WIP: Add support for Parallel Full Hash Join.
This has an unsolved problem: it's dangerous to run BarrierArriveAndWait()
when you're in a phase that has emitted tuples, because some process P
might be blocked writing to a full tuple queue, while the leader process,
which should be reading it, is also running the subplan and is waiting
for P!
---
src/backend/executor/nodeHash.c | 90 +++++++++++++++++++++++--
src/backend/executor/nodeHashjoin.c | 35 ++++++++--
src/backend/optimizer/path/joinpath.c | 9 +--
src/backend/postmaster/pgstat.c | 3 +
src/backend/storage/ipc/barrier.c | 1 -
src/include/executor/hashjoin.h | 5 +-
src/include/executor/nodeHash.h | 2 +
src/include/nodes/execnodes.h | 2 +
src/include/pgstat.h | 1 +
src/test/regress/expected/join_hash.out | 56 ++++++++++++++-
src/test/regress/sql/join_hash.sql | 23 ++++++-
11 files changed, 204 insertions(+), 23 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 224cbb32ba..c366a523c6 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2050,6 +2050,7 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
}
/*
@@ -2126,6 +2127,87 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return false;
}
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+
+ for (;;)
+ {
+ /*
+ * hj_CurTuple is the address of the tuple last returned from the
+ * current bucket, or NULL if it's time to start scanning a new
+ * bucket.
+ */
+ if (hashTuple != NULL)
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ else if (hjstate->hj_CurBucketNo < hjstate->hj_AllocatedBucketRange)
+ hashTuple = ExecParallelHashFirstTuple(hashtable,
+ hjstate->hj_CurBucketNo++);
+ else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ {
+ /*
+ * Allocate a few cachelines' worth of buckets and loop around.
+ * Testing shows that 8 is a good factor.
+ */
+ int step = (PG_CACHE_LINE_SIZE * 8) / sizeof(dsa_pointer_atomic);
+
+ hjstate->hj_CurBucketNo =
+ pg_atomic_fetch_add_u32(&hashtable->batches[hashtable->curbatch].shared->bucket,
+ step);
+ hjstate->hj_AllocatedBucketRange =
+ Min(hjstate->hj_CurBucketNo + step, hashtable->nbuckets);
+ }
+ else
+ break; /* finished all buckets */
+
+ while (hashTuple != NULL)
+ {
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot */
+ inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it
+ * parallel to ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ }
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+
/*
* ExecHashTableReset
*
@@ -2937,6 +3019,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
* up the Barrier.
*/
BarrierInit(&shared->batch_barrier, 0);
+ pg_atomic_init_u32(&shared->bucket, 0);
if (i == 0)
{
/* Batch 0 doesn't need to be loaded. */
@@ -3097,13 +3180,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ec37558c12..1c9a40dcff 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,6 +82,7 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
+ * PHJ_BATCH_SCAN_INNER -- scan unmatched inner tuples
* PHJ_BATCH_DONE -- end
*
* Batch 0 is a special case, because it starts out in phase
@@ -97,9 +98,9 @@
* all other backends attached to it are actively executing the node or have
* already arrived. Practically, that means that we never return a tuple
* while attached to a barrier, unless the barrier has reached its final
- * state. In the slightly special case of the per-batch barrier, we return
- * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * state.
+ *
+ * TODO: WIP: That's now not true, because of PHJ_SCAN_INNER.
*
*-------------------------------------------------------------------------
*/
@@ -144,6 +145,7 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
uint32 *hashvalue,
TupleTableSlot *tupleSlot);
static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashEndProbe(HashJoinState *hjstate);
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
@@ -358,6 +360,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (TupIsNull(outerTupleSlot))
{
/* end of batch, or maybe whole join */
+ if (parallel)
+ ExecParallelHashEndProbe(node);
+
if (HJ_FILL_INNER(node))
{
/* set up to scan for unmatched inner tuples */
@@ -512,7 +517,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -723,6 +729,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
(PlanState *) hjstate);
@@ -1060,6 +1067,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
return true;
}
+static void
+ExecParallelHashEndProbe(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ Barrier *batch_barrier =
+ &hashtable->batches[hashtable->curbatch].shared->batch_barrier;
+
+ Assert(BarrierPhase(batch_barrier) == PHJ_BATCH_PROBING);
+ BarrierArriveAndWait(batch_barrier, WAIT_EVENT_HASH_BATCH_PROBING);
+ Assert(BarrierPhase(batch_barrier) == PHJ_BATCH_SCAN_INNER);
+}
+
/*
* Choose a batch to work on, and attach to it. Returns true if successful,
* false if there are no more batches.
@@ -1155,13 +1174,16 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
* All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * BarrierArriveAndDetach() so that the next phase can be
+ * reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
+ case PHJ_BATCH_SCAN_INNER:
+ /* TODO -- not right */
+
case PHJ_BATCH_DONE:
/*
@@ -1335,6 +1357,7 @@ ExecReScanHashJoin(HashJoinState *node)
node->hj_CurBucketNo = 0;
node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
node->hj_CurTuple = NULL;
+ node->hj_AllocatedBucketRange = 0;
node->hj_MatchedOuter = false;
node->hj_FirstOuterTupleSlot = NULL;
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index dc28b56e74..ef613d7696 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1853,8 +1853,6 @@ hash_inner_and_outer(PlannerInfo *root,
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1888,9 +1886,12 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * at all because no one process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 011076c3e3..d64cb976db 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3774,6 +3774,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BATCH_LOADING:
event_name = "Hash/Batch/Loading";
break;
+ case WAIT_EVENT_HASH_BATCH_PROBING:
+ event_name = "Hash/Batch/Probing";
+ break;
case WAIT_EVENT_HASH_BUILD_ALLOCATING:
event_name = "Hash/Build/Allocating";
break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 83cbe33107..170d002444 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -221,7 +221,6 @@ BarrierAttach(Barrier *barrier)
++barrier->participants;
phase = barrier->phase;
SpinLockRelease(&barrier->mutex);
-
return phase;
}
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..f68f4568df 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch
size_t old_ntuples; /* number of tuples before repartitioning */
bool space_exhausted;
+ pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */
+
/*
* Variable-sized SharedTuplestore objects follow this struct in memory.
* See the accessor macros below.
@@ -265,7 +267,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_SCAN_INNER 4
+#define PHJ_BATCH_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index fc80f03aa8..94b0be380a 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -58,6 +58,8 @@ extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econ
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b593d22c48..bdf58cd2ee 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1913,6 +1913,7 @@ typedef struct MergeJoinState
* tuple, or NULL if starting search
* (hj_CurXXX variables are undefined if
* OuterTupleSlot is empty!)
+ * hj_AllocatedBucketRange range allocated for parallel unmatched scan
* hj_OuterTupleSlot tuple slot for outer tuples
* hj_HashTupleSlot tuple slot for inner (hashed) tuples
* hj_NullOuterTupleSlot prepared null tuple for right/full outer joins
@@ -1939,6 +1940,7 @@ typedef struct HashJoinState
uint32 hj_CurHashValue;
int hj_CurBucketNo;
int hj_CurSkewBucketNo;
+ int hj_AllocatedBucketRange;
HashJoinTuple hj_CurTuple;
TupleTableSlot *hj_OuterTupleSlot;
TupleTableSlot *hj_HashTupleSlot;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823d..ef2fbe39ca 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -827,6 +827,7 @@ typedef enum
WAIT_EVENT_HASH_BATCH_ALLOCATING,
WAIT_EVENT_HASH_BATCH_ELECTING,
WAIT_EVENT_HASH_BATCH_LOADING,
+ WAIT_EVENT_HASH_BATCH_PROBING,
WAIT_EVENT_HASH_BUILD_ALLOCATING,
WAIT_EVENT_HASH_BUILD_ELECTING,
WAIT_EVENT_HASH_BUILD_HASHING_INNER,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.22.0
On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas.munro@gmail.com>
wrote:
While thinking about looping hash joins (an alternative strategy for
limiting hash join memory usage currently being investigated by
Melanie Plageman in a nearby thread[1]), the topic of parallel query
deadlock hazards came back to haunt me. I wanted to illustrate the
problems I'm aware of with the concrete code where I ran into this
stuff, so here is a new-but-still-broken implementation of $SUBJECT.
This was removed from the original PHJ submission when I got stuck and
ran out of time in the release cycle for 11. Since the original
discussion is buried in long threads and some of it was also a bit
confused, here's a fresh description of the problems as I see them.
Hopefully these thoughts might help Melanie's project move forward,
because it's closely related, but I didn't want to dump another patch
into that other thread. Hence this new thread.I haven't succeeded in actually observing a deadlock with the attached
patch (though I did last year, very rarely), but I also haven't tried
very hard. The patch seems to produce the right answers and is pretty
scalable, so it's really frustrating not to be able to get it over the
line.Tuple queue deadlock hazard:
If the leader process is executing the subplan itself and waiting for
all processes to arrive in ExecParallelHashEndProbe() (in this patch)
while another process has filled up its tuple queue and is waiting for
the leader to read some tuples an unblock it, they will deadlock
forever. That can't happen in the the committed version of PHJ,
because it never waits for barriers after it has begun emitting
tuples.Some possible ways to fix this:
1. You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
in this patch (the scan for unmatched tuples) is executed by only one
process, using the "detach-and-see-if-you-were-last" trick. Melanie
proposed that for an equivalent problem in the looping hash join. I
think it probably works, but it gives up a lot of parallelism and thus
won't scale as nicely as the attached patch.
I have attached a patch which implements this
(v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch).
For starters, in order to support parallel FOJ and ROJ, I re-enabled
setting the match bit for the tuples in the hashtable which
3e4818e9dd5be294d97c disabled. I did so using the code suggested in [1]/messages/by-id/0F44E799048C4849BAE4B91012DB910462E9897A@SHSMSX103.ccr.corp.intel.com,
reading the match bit to see if it is already set before setting it.
Then, workers except for the last worker detach after exhausting the
outer side of a batch, leaving one worker to proceed to HJ_FILL_INNER
and do the scan of the hash table and emit unmatched inner tuples.
I have also attached a variant on this patch which I am proposing to
replace it (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch)
which has a new ExecParallelScanHashTableForUnmatched() in which the
single worker doing the unmatched scan scans one HashMemoryChunk at a
time and then frees them as it goes. I thought this might perform better
than the version which uses the buckets because 1) it should do a bit
less pointer chasing and 2) it frees each chunk of the hash table as it
scans it which (maybe) would save a bit of time during
ExecHashTableDetachBatch() when it goes through and frees the hash
table, but, my preliminary tests showed a negligible difference between
this and the version using buckets. I will do a bit more testing,
though.
I tried a few other variants of these patches, including one in which
the workers detach from the batch inside of the batch loading and
probing phase machine, ExecParallelHashJoinNewBatch(). This meant that
all workers transition to HJ_FILL_INNER and then HJ_NEED_NEW_BATCH in
order to detach in the batch phase machine. This, however, involved
adding a lot of new variables to distinguish whether or or not the
unmatched outer scan was already done, whether or not the current worker
was the worker elected to do the scan, etc. Overall, it is probably
incorrect to use the HJ_NEED_NEW_BATCH state in this way. I had
originally tried this to avoid operating on the batch_barrier in the
main hash join state machine. I've found that the more different places
we add code attaching and detaching to the batch_barrier (and other PHJ
barriers, for that matter), the harder it is to debug the code, however,
I think in this case it is required.
2. You could probably make it so that only the leader process drops
out of executing the inner unmatched scan, and then I think you
wouldn't have this very specific problem at the cost of losing some
(but not all) parallelism (ie the leader), but there might be other
variants of the problem. For example, a GatherMerge leader process
might be blocked waiting for the next tuple for a tuple from P1, while
P2 is try to write to a full queue, and P1 waits for P2.3. You could introduce some kind of overflow for tuple queues, so
that tuple queues can never block because they're full (until you run
out of extra memory buffers or disk and error out). I haven't
seriously looked into this but I'm starting to suspect it's the
industrial strength general solution to the problem and variants of it
that show up in other parallelism projects (Parallel Repartition). As
Robert mentioned last time I talked about this[2], you'd probably only
want to allow spooling (rather than waiting) when the leader is
actually waiting for other processes; I'm not sure how exactly to
control that.4. <thinking-really-big>Goetz Graefe's writing about parallel sorting
comes close to this topic, which he calls flow control deadlocks. He
mentions the possibility of infinite spooling like (3) as a solution.
He's describing a world where producers and consumers are running
concurrently, and the consumer doesn't just decide to start running
the subplan (what we call "leader participation"), so he doesn't
actually have a problem like Gather deadlock. He describes
planner-enforced rules that allow deadlock free execution even with
fixed-size tuple queue flow control by careful controlling where
order-forcing operators are allowed to appear, so he doesn't have a
problem like Gather Merge deadlock. I'm not proposing we should
create a whole bunch of producer and consumer processes to run
different plan fragments, but I think you can virtualise the general
idea in an async executor with "streams", and that also solves other
problems when you start working with partitions in a world where it's
not even sure how many workers will show up. I see this as a long
term architectural goal requiring vast amounts of energy to achieve,
hence my new interest in (3) for now.</thinking-really-big>Hypothetical inter-node deadlock hazard:
Right now I think it is the case the whenever any node begins pulling
tuples from a subplan, it continues to do so until either the query
ends early or the subplan runs out of tuples. For example, Append
processes its subplans one at a time until they're done -- it doesn't
jump back and forth. Parallel Append doesn't necessarily run them in
the order that they appear in the plan, but it still runs each one to
completion before picking another one. If we ever had a node that
didn't adhere to that rule, then two Parallel Full Hash Join nodes
could dead lock, if some of the workers were stuck waiting in one
while some were stuck waiting in the other.If we were happy to decree that that is a rule of the current
PostgreSQL executor, then this hypothetical problem would go away.
For example, consider the old patch I recently rebased[3] to allow
Append over a bunch of FDWs representing remote shards to return
tuples as soon as they're ready, not necessarily sequentially (and I
think several others have worked on similar patches). To be
committable under such a rule that applies globally to the whole
executor, that patch would only be allowed to *start* them in any
order, but once it's started pulling tuples from a given subplan it'd
have to pull them all to completion before considering another node.(Again, that problem goes away in an async model like (4), which will
also be able to do much more interesting things with FDWs, and it's
the FDW thing that I think generates more interest in async execution
than my rambling about abstract parallel query problems.)
The leader exclusion tactics and the spooling idea don't solve the
execution order deadlock possibility, so, this "all except last detach
and last does unmatched inner scan" seems like the best way to solve
both types of deadlock.
There is another option that could maintain some parallelism for the
unmatched inner scan.
This method is exactly like the "all except last detach and last does
unmatched inner scan" method from the perspective of the main hash join
state machine. The difference is in ExecParallelHashJoinNewBatch(). In
the batch_barrier phase machine, workers loop around looking for batches
that are not done.
In this "detach for now" method, all workers except the last one detach
from a batch after exhausting the outer side. They will mark the batch
they were just working on as "provisionally done" (as opposed to
"done"). The last worker advances the batch_barrier from
PHJ_BATCH_PROBING to PHJ_BATCH_SCAN_INNER.
All detached workers then proceed to HJ_NEED_NEW_BATCH and try to find
another batch to work on. If there are no batches that are neither
"done" or "provisionally done", then the worker will re-attach to
batches that are "provisionally done" and attempt to join in conducting
the unmatched inner scan. Once it finishes its worker there, it will
return to HJ_NEED_NEW_BATCH, enter ExecParallelHashJoinNewBatch() and
mark the batch as "done".
Because the worker detached from the batch, this method solves the tuple
queue flow control deadlock issue--this worker could not be attempting
to emit a tuple while the leader waits at the barrier for it. There is
no waiting at the barrier.
However, it is unclear to me whether or not this method will be at risk
of inter-node deadlock/execution order deadlock. It seems like this is
not more at risk than the existing code is for this issue.
If a worker never returns to the HashJoin after leaving to emit a tuple,
in any of the methods (and in master), the query would not finish
correctly because the workers are attached to the batch_barrier while
emitting tuples and, though they may not wait at this barrier again, the
hashtable is cleaned up by the last participant to detach, and this
would not happen if it doesn't return to the batch phase machine. I'm
not sure if this exhibits the problematic behavior detailed above, but,
if it does, it is not unique to this method.
Some other notes on the patch:
Aside from the deadlock problem, there are some minor details to tidy
up (handling of late starters probably not quite right, rescans not
yet considered).
These would not be an issue with only one worker doing the scan but
would have to be handled in a potential new parallel-enabled solution
like I suggested above.
There is a fun hard-coded parameter that controls
the parallel step size in terms of cache lines for the unmatched scan;
I found that 8 was a lot faster than 4, but no slower than 128 on my
laptop, so I set it to 8.
I didn't add this cache line optimization to my chunk scanning method. I
could do so. Do you think it is more relevant, less relevant, or the
same if only one worker is doing the unmatched inner scan?
More thoughts along those micro-optimistic
lines: instead of match bit in the header, you could tag the pointer
and sometimes avoid having to follow it, and you could prefetch next
non-matching tuple's cacheline by looking a head a bit.
I would be happy to try doing this once we get the rest of the patch
ironed out so that seeing how much of a performance difference it makes
is more straightforward.
[1]
/messages/by-id/CA+hUKGKWWmf=WELLG=aUGbcugRaSQbtm0tKYiBut-B2rVKX63g@mail.gmail.com
[2]
/messages/by-id/CA+TgmoY4LogYcg1y5JPtto_fL-DBUqvxRiZRndDC70iFiVsVFQ@mail.gmail.com
[3]
/messages/by-id/CA+hUKGLBRyu0rHrDCMC4=Rn3252gogyp1SjOgG8SEKKZv=FwfQ@mail.gmail.com
[1]: /messages/by-id/0F44E799048C4849BAE4B91012DB910462E9897A@SHSMSX103.ccr.corp.intel.com
/messages/by-id/0F44E799048C4849BAE4B91012DB910462E9897A@SHSMSX103.ccr.corp.intel.com
-- Melanie
Attachments:
v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patchapplication/octet-stream; name=v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patchDownload
From e91ec07029df3422184e316d47e2b1e4b74835cc Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Thu, 17 Sep 2020 14:10:28 -0700
Subject: [PATCH v1] Support Parallel FOJ and ROJ
To support parallel FOJ and ROJ,
- re-enable setting match bit for tuples in the hash table
- a single worker preps for unmatched inner tuple scan in
HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock.
ExecParallelScanHashTableForUnmatched() is no longer safe for
multiple workers. A single worker will scan each HashMemoryChunk
in the hash table, freeing it after finishing with it.
---
src/backend/executor/nodeHash.c | 116 ++++++++++++++++++++++--
src/backend/executor/nodeHashjoin.c | 60 ++++++------
src/backend/optimizer/path/joinpath.c | 9 +-
src/backend/postmaster/pgstat.c | 3 +
src/backend/storage/ipc/barrier.c | 23 ++++-
src/include/executor/hashjoin.h | 8 +-
src/include/executor/nodeHash.h | 3 +
src/include/nodes/execnodes.h | 2 +
src/include/pgstat.h | 1 +
src/include/storage/barrier.h | 1 +
src/test/regress/expected/join_hash.out | 56 +++++++++++-
src/test/regress/sql/join_hash.sql | 23 ++++-
12 files changed, 261 insertions(+), 44 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..e6487cc35b 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2056,6 +2056,52 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool last = false;
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = 0;
+ hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
+ if (curbatch < 0)
+ return false;
+ last = BarrierDetachOrElect(&batch->batch_barrier);
+ if (!last)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak,
+ batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ }
+ else
+ {
+ batch_accessor->shared_chunk = batch->chunks;
+ batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+ batch_accessor->current_chunk_idx = 0;
+ }
+ return last;
}
/*
@@ -2132,6 +2178,65 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return false;
}
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ dsa_pointer next;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+ while (accessor->current_chunk)
+ {
+ while (accessor->current_chunk_idx < accessor->current_chunk->used)
+ {
+ HashJoinTuple hashTuple = (HashJoinTuple) (
+ HASH_CHUNK_DATA(accessor->current_chunk) + accessor->current_chunk_idx
+ );
+ accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
+
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
+
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), hjstate->hj_HashTupleSlot,false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+
+ next = accessor->current_chunk->next.shared;
+ dsa_free(hashtable->area, accessor->shared_chunk);
+ accessor->shared_chunk = next;
+ accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+ accessor->current_chunk_idx = 0;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ accessor->shared->chunks = InvalidDsaPointer;
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
/*
* ExecHashTableReset
*
@@ -2971,6 +3076,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
* up the Barrier.
*/
BarrierInit(&shared->batch_barrier, 0);
+ pg_atomic_init_u32(&shared->bucket, 0);
if (i == 0)
{
/* Batch 0 doesn't need to be loaded. */
@@ -3131,13 +3237,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
@@ -3271,6 +3370,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
hashtable->current_chunk = NULL;
hashtable->current_chunk_shared = InvalidDsaPointer;
hashtable->batches[batchno].at_least_one_chunk = false;
+ hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+ hashtable->batches[batchno].current_chunk = NULL;
+ hashtable->batches[batchno].current_chunk_idx = 0;
}
/*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..23e652d130 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,9 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
- * PHJ_BATCH_DONE -- end
+
+ * PHJ_BATCH_DONE -- queries not requiring inner fill done
+ * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
*
* Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -238,6 +240,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* from the outer plan node. If we succeed, we have to stash
* it away for later consumption by ExecHashJoinOuterGetTuple.
*/
+ //volatile int mybp = 0; while (mybp == 0) {};
if (HJ_FILL_INNER(node))
{
/* no chance to not build the hash table */
@@ -360,9 +363,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
- /* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +468,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node),
+ * but we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +532,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -742,6 +744,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
(PlanState *) hjstate);
@@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase
+ * so that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
-
case PHJ_BATCH_DONE:
+ /* Fall through. */
+
+ case PHJ_BATCH_FILL_INNER_DONE:
/*
* Already done. Detach and go around again (if any
@@ -1360,6 +1365,7 @@ ExecReScanHashJoin(HashJoinState *node)
node->hj_CurBucketNo = 0;
node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
node->hj_CurTuple = NULL;
+ node->hj_AllocatedBucketRange = 0;
node->hj_MatchedOuter = false;
node->hj_FirstOuterTupleSlot = NULL;
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index db54a6ba2e..d30ea4d86d 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1852,8 +1852,6 @@ hash_inner_and_outer(PlannerInfo *root,
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1887,9 +1885,12 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * at all because no one process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e6be2b7836..f6f242d806 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BATCH_LOAD:
event_name = "HashBatchLoad";
break;
+ case WAIT_EVENT_HASH_BATCH_PROBE:
+ event_name = "HashBatchProbe";
+ break;
case WAIT_EVENT_HASH_BUILD_ALLOCATE:
event_name = "HashBuildAllocate";
break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 3e200e02cc..5ebd18314f 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -204,6 +204,28 @@ BarrierArriveAndDetach(Barrier *barrier)
{
return BarrierDetachImpl(barrier, true);
}
+/*
+ * Upon arriving at the barrier, if this worker is not the last worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ * Another name I considered was BarrierUniqueify or BarrierSoloAssign
+ */
+bool
+BarrierDetachOrElect(Barrier *barrier)
+{
+ SpinLockAcquire(&barrier->mutex);
+ if (barrier->participants > 1)
+ {
+ --barrier->participants;
+ SpinLockRelease(&barrier->mutex);
+ return false;
+ }
+ Assert(barrier->participants == 1);
+ ++barrier->phase;
+ SpinLockRelease(&barrier->mutex);
+ return true;
+}
/*
* Attach to a barrier. All waiting participants will now wait for this
@@ -221,7 +243,6 @@ BarrierAttach(Barrier *barrier)
++barrier->participants;
phase = barrier->phase;
SpinLockRelease(&barrier->mutex);
-
return phase;
}
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..8cf36cb6c5 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch
size_t old_ntuples; /* number of tuples before repartitioning */
bool space_exhausted;
+ pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */
+
/*
* Variable-sized SharedTuplestore objects follow this struct in memory.
* See the accessor macros below.
@@ -205,6 +207,9 @@ typedef struct ParallelHashJoinBatchAccessor
bool at_least_one_chunk; /* has this backend allocated a chunk? */
bool done; /* flag to remember that a batch is done */
+ dsa_pointer shared_chunk; /* current chunk in hashtable for scanning for unmatched inner tuples serially */
+ HashMemoryChunk current_chunk;
+ size_t current_chunk_idx;
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
} ParallelHashJoinBatchAccessor;
@@ -265,7 +270,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_FILL_INNER_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a5ab1aed14..a6b2446958 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1921,6 +1921,7 @@ typedef struct MergeJoinState
* tuple, or NULL if starting search
* (hj_CurXXX variables are undefined if
* OuterTupleSlot is empty!)
+ * hj_AllocatedBucketRange range allocated for parallel unmatched scan
* hj_OuterTupleSlot tuple slot for outer tuples
* hj_HashTupleSlot tuple slot for inner (hashed) tuples
* hj_NullOuterTupleSlot prepared null tuple for right/full outer joins
@@ -1947,6 +1948,7 @@ typedef struct HashJoinState
uint32 hj_CurHashValue;
int hj_CurBucketNo;
int hj_CurSkewBucketNo;
+ int hj_AllocatedBucketRange;
HashJoinTuple hj_CurTuple;
TupleTableSlot *hj_OuterTupleSlot;
TupleTableSlot *hj_HashTupleSlot;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac46b4..62d5f1d16b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -856,6 +856,7 @@ typedef enum
WAIT_EVENT_HASH_BATCH_ALLOCATE,
WAIT_EVENT_HASH_BATCH_ELECT,
WAIT_EVENT_HASH_BATCH_LOAD,
+ WAIT_EVENT_HASH_BATCH_PROBE,
WAIT_EVENT_HASH_BUILD_ALLOCATE,
WAIT_EVENT_HASH_BUILD_ELECT,
WAIT_EVENT_HASH_BUILD_HASH_INNER,
diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h
index d71927cc2f..8b563fa7e0 100644
--- a/src/include/storage/barrier.h
+++ b/src/include/storage/barrier.h
@@ -37,6 +37,7 @@ typedef struct Barrier
extern void BarrierInit(Barrier *barrier, int num_workers);
extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
extern bool BarrierArriveAndDetach(Barrier *barrier);
+extern bool BarrierDetachOrElect(Barrier *barrier);
extern int BarrierAttach(Barrier *barrier);
extern bool BarrierDetach(Barrier *barrier);
extern int BarrierPhase(Barrier *barrier);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.20.1
v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patchapplication/octet-stream; name=v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patchDownload
From 02c63bd0fcdceb6cdf3026b706fe9aa6e54d3e26 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Thu, 17 Sep 2020 14:10:28 -0700
Subject: [PATCH v1] Support parallel FOJ and ROJ
To support parallel FOJ and ROJ,
- re-enable setting match bit for tuples in the hash table
- a single worker preps for unmatched inner tuple scan in
HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock.
ExecParallelScanHashTableForUnmatched() is still safe for
multiple workers, though only a single worker will run it
---
src/backend/executor/nodeHash.c | 128 ++++++++++++++++++++++--
src/backend/executor/nodeHashjoin.c | 55 +++++-----
src/backend/optimizer/path/joinpath.c | 9 +-
src/backend/postmaster/pgstat.c | 3 +
src/backend/storage/ipc/barrier.c | 23 ++++-
src/include/executor/hashjoin.h | 5 +-
src/include/executor/nodeHash.h | 3 +
src/include/nodes/execnodes.h | 2 +
src/include/pgstat.h | 1 +
src/include/storage/barrier.h | 1 +
src/test/regress/expected/join_hash.out | 56 ++++++++++-
src/test/regress/sql/join_hash.sql | 23 ++++-
12 files changed, 267 insertions(+), 42 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..e049c8103d 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2056,6 +2056,45 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool last = false;
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = 0;
+ hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
+ if (curbatch < 0)
+ return false;
+ last = BarrierDetachOrElect(&batch->batch_barrier);
+ if (!last)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak,
+ batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ }
+ return last;
}
/*
@@ -2132,6 +2171,87 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return false;
}
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+
+ for (;;)
+ {
+ /*
+ * hj_CurTuple is the address of the tuple last returned from the
+ * current bucket, or NULL if it's time to start scanning a new
+ * bucket.
+ */
+ if (hashTuple != NULL)
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ else if (hjstate->hj_CurBucketNo < hjstate->hj_AllocatedBucketRange)
+ hashTuple = ExecParallelHashFirstTuple(hashtable,
+ hjstate->hj_CurBucketNo++);
+ else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ {
+ /*
+ * Allocate a few cachelines' worth of buckets and loop around.
+ * Testing shows that 8 is a good factor.
+ */
+ int step = (PG_CACHE_LINE_SIZE * 8) / sizeof(dsa_pointer_atomic);
+
+ hjstate->hj_CurBucketNo =
+ pg_atomic_fetch_add_u32(&hashtable->batches[hashtable->curbatch].shared->bucket,
+ step);
+ hjstate->hj_AllocatedBucketRange =
+ Min(hjstate->hj_CurBucketNo + step, hashtable->nbuckets);
+ }
+ else
+ break; /* finished all buckets */
+
+ while (hashTuple != NULL)
+ {
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot */
+ inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it
+ * parallel to ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ }
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+
/*
* ExecHashTableReset
*
@@ -2971,6 +3091,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
* up the Barrier.
*/
BarrierInit(&shared->batch_barrier, 0);
+ pg_atomic_init_u32(&shared->bucket, 0);
if (i == 0)
{
/* Batch 0 doesn't need to be loaded. */
@@ -3131,13 +3252,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..d34e87d575 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,6 +82,8 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
+ * PHJ_BATCH_SCAN_INNER -- scan unmatched inner tuples
+
* PHJ_BATCH_DONE -- end
*
* Batch 0 is a special case, because it starts out in phase
@@ -360,9 +362,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
- /* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +467,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node),
+ * but we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +531,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -742,6 +743,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
hjstate->hj_CurTuple = NULL;
+ hjstate->hj_AllocatedBucketRange = 0;
hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
(PlanState *) hjstate);
@@ -1173,13 +1175,15 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase
+ * so that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
+ case PHJ_BATCH_SCAN_INNER:
+ /* Fall through. */
case PHJ_BATCH_DONE:
@@ -1360,6 +1364,7 @@ ExecReScanHashJoin(HashJoinState *node)
node->hj_CurBucketNo = 0;
node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
node->hj_CurTuple = NULL;
+ node->hj_AllocatedBucketRange = 0;
node->hj_MatchedOuter = false;
node->hj_FirstOuterTupleSlot = NULL;
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index db54a6ba2e..d30ea4d86d 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1852,8 +1852,6 @@ hash_inner_and_outer(PlannerInfo *root,
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1887,9 +1885,12 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * at all because no one process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e6be2b7836..f6f242d806 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BATCH_LOAD:
event_name = "HashBatchLoad";
break;
+ case WAIT_EVENT_HASH_BATCH_PROBE:
+ event_name = "HashBatchProbe";
+ break;
case WAIT_EVENT_HASH_BUILD_ALLOCATE:
event_name = "HashBuildAllocate";
break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 3e200e02cc..5ebd18314f 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -204,6 +204,28 @@ BarrierArriveAndDetach(Barrier *barrier)
{
return BarrierDetachImpl(barrier, true);
}
+/*
+ * Upon arriving at the barrier, if this worker is not the last worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ * Another name I considered was BarrierUniqueify or BarrierSoloAssign
+ */
+bool
+BarrierDetachOrElect(Barrier *barrier)
+{
+ SpinLockAcquire(&barrier->mutex);
+ if (barrier->participants > 1)
+ {
+ --barrier->participants;
+ SpinLockRelease(&barrier->mutex);
+ return false;
+ }
+ Assert(barrier->participants == 1);
+ ++barrier->phase;
+ SpinLockRelease(&barrier->mutex);
+ return true;
+}
/*
* Attach to a barrier. All waiting participants will now wait for this
@@ -221,7 +243,6 @@ BarrierAttach(Barrier *barrier)
++barrier->participants;
phase = barrier->phase;
SpinLockRelease(&barrier->mutex);
-
return phase;
}
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..6299c9ce9f 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch
size_t old_ntuples; /* number of tuples before repartitioning */
bool space_exhausted;
+ pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */
+
/*
* Variable-sized SharedTuplestore objects follow this struct in memory.
* See the accessor macros below.
@@ -265,7 +267,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_SCAN_INNER 4
+#define PHJ_BATCH_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a5ab1aed14..a6b2446958 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1921,6 +1921,7 @@ typedef struct MergeJoinState
* tuple, or NULL if starting search
* (hj_CurXXX variables are undefined if
* OuterTupleSlot is empty!)
+ * hj_AllocatedBucketRange range allocated for parallel unmatched scan
* hj_OuterTupleSlot tuple slot for outer tuples
* hj_HashTupleSlot tuple slot for inner (hashed) tuples
* hj_NullOuterTupleSlot prepared null tuple for right/full outer joins
@@ -1947,6 +1948,7 @@ typedef struct HashJoinState
uint32 hj_CurHashValue;
int hj_CurBucketNo;
int hj_CurSkewBucketNo;
+ int hj_AllocatedBucketRange;
HashJoinTuple hj_CurTuple;
TupleTableSlot *hj_OuterTupleSlot;
TupleTableSlot *hj_HashTupleSlot;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac46b4..62d5f1d16b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -856,6 +856,7 @@ typedef enum
WAIT_EVENT_HASH_BATCH_ALLOCATE,
WAIT_EVENT_HASH_BATCH_ELECT,
WAIT_EVENT_HASH_BATCH_LOAD,
+ WAIT_EVENT_HASH_BATCH_PROBE,
WAIT_EVENT_HASH_BUILD_ALLOCATE,
WAIT_EVENT_HASH_BUILD_ELECT,
WAIT_EVENT_HASH_BUILD_HASH_INNER,
diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h
index d71927cc2f..8b563fa7e0 100644
--- a/src/include/storage/barrier.h
+++ b/src/include/storage/barrier.h
@@ -37,6 +37,7 @@ typedef struct Barrier
extern void BarrierInit(Barrier *barrier, int num_workers);
extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
extern bool BarrierArriveAndDetach(Barrier *barrier);
+extern bool BarrierDetachOrElect(Barrier *barrier);
extern int BarrierAttach(Barrier *barrier);
extern bool BarrierDetach(Barrier *barrier);
extern int BarrierPhase(Barrier *barrier);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.20.1
On Tue, Sep 22, 2020 at 8:49 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:
On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas.munro@gmail.com> wrote:
1. You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
in this patch (the scan for unmatched tuples) is executed by only one
process, using the "detach-and-see-if-you-were-last" trick. Melanie
proposed that for an equivalent problem in the looping hash join. I
think it probably works, but it gives up a lot of parallelism and thus
won't scale as nicely as the attached patch.I have attached a patch which implements this
(v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch).
Hi Melanie,
Thanks for working on this! I have a feeling this is going to be much
easier to land than the mighty hash loop patch. And it's good to get
one of our blocking design questions nailed down for both patches.
I took it for a very quick spin and saw simple cases working nicely,
but TPC-DS queries 51 and 97 (which contain full joins) couldn't be
convinced to use it. Hmm.
For starters, in order to support parallel FOJ and ROJ, I re-enabled
setting the match bit for the tuples in the hashtable which
3e4818e9dd5be294d97c disabled. I did so using the code suggested in [1],
reading the match bit to see if it is already set before setting it.
Cool. I'm quite keen to add a "fill_inner" parameter for
ExecHashJoinImpl() and have an N-dimensional lookup table of
ExecHashJoin variants, so that this and much other related branching
can be constant-folded out of existence by the compiler in common
cases, which is why I think this is all fine, but that's for another
day...
Then, workers except for the last worker detach after exhausting the
outer side of a batch, leaving one worker to proceed to HJ_FILL_INNER
and do the scan of the hash table and emit unmatched inner tuples.
+1
Doing better is pretty complicated within our current execution model,
and I think this is a good compromise for now.
Costing for uneven distribution is tricky; depending on your plan
shape, specifically whether there is something else to do afterwards
to pick up the slack, it might or might not affect the total run time
of the query. It seems like there's not much we can do about that.
I have also attached a variant on this patch which I am proposing to
replace it (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch)
which has a new ExecParallelScanHashTableForUnmatched() in which the
single worker doing the unmatched scan scans one HashMemoryChunk at a
time and then frees them as it goes. I thought this might perform better
than the version which uses the buckets because 1) it should do a bit
less pointer chasing and 2) it frees each chunk of the hash table as it
scans it which (maybe) would save a bit of time during
ExecHashTableDetachBatch() when it goes through and frees the hash
table, but, my preliminary tests showed a negligible difference between
this and the version using buckets. I will do a bit more testing,
though.
+1
I agree that it's the better of those two options.
[stuff about deadlocks]
The leader exclusion tactics and the spooling idea don't solve the
execution order deadlock possibility, so, this "all except last detach
and last does unmatched inner scan" seems like the best way to solve
both types of deadlock.
Agreed (at least as long as our threads of query execution are made
out of C call stacks and OS processes that block).
Some other notes on the patch:
Aside from the deadlock problem, there are some minor details to tidy
up (handling of late starters probably not quite right, rescans not
yet considered).These would not be an issue with only one worker doing the scan but
would have to be handled in a potential new parallel-enabled solution
like I suggested above.
Makes sense. Not sure why I thought anything special was needed for rescans.
There is a fun hard-coded parameter that controls
the parallel step size in terms of cache lines for the unmatched scan;
I found that 8 was a lot faster than 4, but no slower than 128 on my
laptop, so I set it to 8.I didn't add this cache line optimization to my chunk scanning method. I
could do so. Do you think it is more relevant, less relevant, or the
same if only one worker is doing the unmatched inner scan?
Yeah it's irrelevant for a single process, and even more irrelevant if
we go with your chunk-based version.
More thoughts along those micro-optimistic
lines: instead of match bit in the header, you could tag the pointer
and sometimes avoid having to follow it, and you could prefetch next
non-matching tuple's cacheline by looking a head a bit.I would be happy to try doing this once we get the rest of the patch
ironed out so that seeing how much of a performance difference it makes
is more straightforward.
Ignore that, I have no idea if the maintenance overhead for such an
every-tuple-in-this-chain-is-matched tag bit would be worth it, it was
just an idle thought. I think your chunk-scan plan seems sensible for
now.
From a quick peek:
+/*
+ * Upon arriving at the barrier, if this worker is not the last
worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true
to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ * Another name I considered was BarrierUniqueify or BarrierSoloAssign
+ */
+bool
+BarrierDetachOrElect(Barrier *barrier)
I tried to find some existing naming in writing about
barriers/phasers, but nothing is jumping out at me. I think a lot of
this stuff comes from super computing where I guess "make all of the
threads give up except one" isn't a primitive they'd be too excited
about :-)
BarrierArriveAndElectOrDetach()... gah, no.
+ last = BarrierDetachOrElect(&batch->batch_barrier);
I'd be nice to add some assertions after that, in the 'last' path,
that there's only one participant and that the phase is as expected,
just to make it even clearer to the reader, and a comment in the other
path that we are no longer attached.
+ hjstate->hj_AllocatedBucketRange = 0;
...
+ pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */
...
+ //volatile int mybp = 0; while (mybp == 0)
Some leftover fragments of the bucket-scan version and debugging stuff.
On Mon, Sep 21, 2020 at 8:34 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Tue, Sep 22, 2020 at 8:49 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas.munro@gmail.com>
wrote:
I took it for a very quick spin and saw simple cases working nicely,
but TPC-DS queries 51 and 97 (which contain full joins) couldn't be
convinced to use it. Hmm.
Thanks for taking a look, Thomas!
Both query 51 and query 97 have full outer joins of two CTEs, each of
which are aggregate queries.
During planning when constructing the joinrel and choosing paths, in
hash_inner_and_outer(), we don't consider parallel hash parallel hash
join paths because the outerrel and innerrel do not have
partial_pathlists.
This code
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
gates the code to generate partial paths for hash join.
My understanding of this is that if the inner and outerrel don't have
partial paths, then they can't be executed in parallel, so the join
could not be executed in parallel.
For the two TPC-DS queries, even if they use parallel aggs, the finalize
agg will have to be done by a single worker, so I don't think they could
be joined with a parallel hash join.
I added some logging inside the "if" statement and ran join_hash.sql in
regress to see what nodes were typically in the pathlist and partial
pathlist. All of them had basically just sequential scans as the outer
and inner rel paths. regress examples are definitely meant to be
minimal, so this probably wasn't the best place to look for examples of
more complex rels that can be joined with a parallel hash join.
Some other notes on the patch:
From a quick peek:
+/* + * Upon arriving at the barrier, if this worker is not the last worker attached, + * detach from the barrier and return false. If this worker is the last worker, + * remain attached and advance the phase of the barrier, return true to indicate + * you are the last or "elected" worker who is still attached to the barrier. + * Another name I considered was BarrierUniqueify or BarrierSoloAssign + */ +bool +BarrierDetachOrElect(Barrier *barrier)I tried to find some existing naming in writing about
barriers/phasers, but nothing is jumping out at me. I think a lot of
this stuff comes from super computing where I guess "make all of the
threads give up except one" isn't a primitive they'd be too excited
about :-)BarrierArriveAndElectOrDetach()... gah, no.
You're right that Arrive should be in there.
So, I went with BarrierArriveAndDetachExceptLast()
It's specific, if not clever.
+ last = BarrierDetachOrElect(&batch->batch_barrier);
I'd be nice to add some assertions after that, in the 'last' path,
that there's only one participant and that the phase is as expected,
just to make it even clearer to the reader, and a comment in the other
path that we are no longer attached.
Assert and comment added to the single worker path.
The other path is just back to HJ_NEED_NEW_BATCH and workers will detach
there as before, so I'm not sure where we could add the comment about
the other workers detaching.
+ hjstate->hj_AllocatedBucketRange = 0; ... + pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner scan */ ... + //volatile int mybp = 0; while (mybp == 0)Some leftover fragments of the bucket-scan version and debugging stuff.
cleaned up (and rebased).
I also changed ExecScanHashTableForUnmatched() to scan HashMemoryChunks
in the hashtable instead of using the buckets to align parallel and
serial hash join code.
Originally, I had that code freeing the chunks of the hashtable after
finishing scanning them, however, I noticed this query from regress
failing:
select * from
(values (1, array[10,20]), (2, array[20,30])) as v1(v1x,v1ys)
left join (values (1, 10), (2, 20)) as v2(v2x,v2y) on v2x = v1x
left join unnest(v1ys) as u1(u1y) on u1y = v2y;
It is because the hash join gets rescanned and because there is only one
batch, ExecReScanHashJoin reuses the same hashtable.
QUERY PLAN
-------------------------------------------------------------
Nested Loop Left Join
-> Values Scan on "*VALUES*"
-> Hash Right Join
Hash Cond: (u1.u1y = "*VALUES*_1".column2)
Filter: ("*VALUES*_1".column1 = "*VALUES*".column1)
-> Function Scan on unnest u1
-> Hash
-> Values Scan on "*VALUES*_1"
I was freeing the hashtable as I scanned each chunk, which clearly
doesn't work for a single batch hash join which gets rescanned.
I don't see anything specific to parallel hash join in ExecReScanHashJoin(),
so, it seems like the same rules apply to parallel hash join. So, I will
have to remove the logic that frees the hash table after scanning each
chunk from the parallel function as well.
In addition, I still need to go through the patch with a fine tooth comb
(refine the comments and variable names and such) but just wanted to
check that these changes were in line with what you were thinking first.
Regards,
Melanie (Microsoft)
Attachments:
v2-0001-Support-Parallel-FOJ-and-ROJ.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Support-Parallel-FOJ-and-ROJ.patchDownload
From 6d34cbee84b06aa27d6c73426f29ef0d50dadb3a Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 29 Sep 2020 13:47:57 -0700
Subject: [PATCH v2] Support Parallel FOJ and ROJ
To support parallel FOJ and ROJ,
- re-enable setting match bit for tuples in the hash table
- a single worker preps for unmatched inner tuple scan in
HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock.
ExecParallelScanHashTableForUnmatched() is no longer executed by
multiple workers. A single worker will scan each HashMemoryChunk
in the hash table, freeing it after finishing with it.
- To align parallel and serial hash join, change
ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
unmatched tuple scan instead of using the buckets
---
src/backend/executor/nodeHash.c | 195 ++++++++++++++++++------
src/backend/executor/nodeHashjoin.c | 61 ++++----
src/backend/optimizer/path/joinpath.c | 14 +-
src/backend/postmaster/pgstat.c | 3 +
src/backend/storage/ipc/barrier.c | 22 ++-
src/include/executor/hashjoin.h | 13 +-
src/include/executor/nodeHash.h | 3 +
src/include/pgstat.h | 1 +
src/include/storage/barrier.h | 1 +
src/test/regress/expected/join_hash.out | 56 ++++++-
src/test/regress/sql/join_hash.sql | 23 ++-
11 files changed, 302 insertions(+), 90 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..42cb8514d3 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
hashtable->chunks = NULL;
hashtable->current_chunk = NULL;
+ hashtable->current_chunk_idx = 0;
hashtable->parallel_state = state->parallel_state;
hashtable->area = state->ps.state->es_query_dsa;
hashtable->batches = NULL;
@@ -2053,9 +2054,56 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
* hj_CurTuple: last tuple returned, or NULL to start next bucket
*----------
*/
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = 0;
+ hjstate->hj_CurTuple = NULL;
+ hashtable->current_chunk = hashtable->chunks;
+ hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool last = false;
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ if (curbatch < 0)
+ return false;
+ last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+ if (!last)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak,batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ }
+ else
+ {
+ batch_accessor->shared_chunk = batch->chunks;
+ batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+ batch_accessor->current_chunk_idx = 0;
+ }
+ return last;
}
/*
@@ -2069,63 +2117,118 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
bool
ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
{
- HashJoinTable hashtable = hjstate->hj_HashTable;
- HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+ HashMemoryChunk next;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
- for (;;)
+ while (hashtable->current_chunk)
{
- /*
- * hj_CurTuple is the address of the tuple last returned from the
- * current bucket, or NULL if it's time to start scanning a new
- * bucket.
- */
- if (hashTuple != NULL)
- hashTuple = hashTuple->next.unshared;
- else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
{
- hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
- hjstate->hj_CurBucketNo++;
- }
- else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
- {
- int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+ HashJoinTuple hashTuple = (HashJoinTuple) (
+ HASH_CHUNK_DATA(hashtable->current_chunk) +
+ hashtable->current_chunk_idx
+ );
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+ int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* next tuple in this chunk */
+ hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- hashTuple = hashtable->skewBucket[j]->tuples;
- hjstate->hj_CurSkewBucketNo++;
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple =
+ ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it
+ * parallel to ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- else
- break; /* finished all buckets */
- while (hashTuple != NULL)
+ next = hashtable->current_chunk->next.unshared;
+ hashtable->current_chunk = next;
+ hashtable->current_chunk_idx = 0;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ dsa_pointer next;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+ /*
+ * Only one worker should execute this function.
+ * Since tuples have already been emitted, it is hazardous for workers
+ * to wait at the batch_barrier again. Instead, all workers except the last
+ * will detach and the last will conduct this unmatched inner tuple scan.
+ */
+ Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+ while (accessor->current_chunk)
+ {
+ while (accessor->current_chunk_idx < accessor->current_chunk->used)
{
- if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
- {
- TupleTableSlot *inntuple;
+ HashJoinTuple hashTuple = (HashJoinTuple) (
+ HASH_CHUNK_DATA(accessor->current_chunk) + accessor->current_chunk_idx
+ );
+ accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
- /* insert hashtable's tuple into exec slot */
- inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
- hjstate->hj_HashTupleSlot,
- false); /* do not pfree */
- econtext->ecxt_innertuple = inntuple;
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- /*
- * Reset temp memory each time; although this function doesn't
- * do any qual eval, the caller will, so let's keep it
- * parallel to ExecScanHashBucket.
- */
- ResetExprContext(econtext);
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple = ExecStoreMinimalTuple(
+ HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,false);
- hjstate->hj_CurTuple = hashTuple;
- return true;
- }
+ /*
+ * Reset temp memory each time; although this function doesn't
+ * do any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
- hashTuple = hashTuple->next.unshared;
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- /* allow this loop to be cancellable */
+ next = accessor->current_chunk->next.shared;
+ dsa_free(hashtable->area, accessor->shared_chunk);
+ accessor->shared_chunk = next;
+ accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+ accessor->current_chunk_idx = 0;
+
CHECK_FOR_INTERRUPTS();
}
+ accessor->shared->chunks = InvalidDsaPointer;
/*
* no more unmatched tuples
*/
@@ -3131,13 +3234,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
@@ -3271,6 +3367,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
hashtable->current_chunk = NULL;
hashtable->current_chunk_shared = InvalidDsaPointer;
hashtable->batches[batchno].at_least_one_chunk = false;
+ hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+ hashtable->batches[batchno].current_chunk = NULL;
+ hashtable->batches[batchno].current_chunk_idx = 0;
}
/*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..7a2b5275bb 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,9 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
- * PHJ_BATCH_DONE -- end
+
+ * PHJ_BATCH_DONE -- queries not requiring inner fill done
+ * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
*
* Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -99,7 +101,9 @@
* while attached to a barrier, unless the barrier has reached its final
* state. In the slightly special case of the per-batch barrier, we return
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or
+ * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires
+ * a scan for unmatched inner tuples, without waiting.
*
*-------------------------------------------------------------------------
*/
@@ -360,9 +364,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
- /* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +469,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node),
+ * but we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +533,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase
+ * so that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
-
case PHJ_BATCH_DONE:
+ /* Fall through. */
+
+ case PHJ_BATCH_FILL_INNER_DONE:
/*
* Already done. Detach and go around again (if any
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index db54a6ba2e..cbc8c2ad83 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root,
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
- * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
- * Hash, since in that case we're back to a single hash table with a
- * single set of match bits for each batch, but that will require
- * figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * (building the hash table in each backend) because no one process
+ * has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e6be2b7836..f6f242d806 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BATCH_LOAD:
event_name = "HashBatchLoad";
break;
+ case WAIT_EVENT_HASH_BATCH_PROBE:
+ event_name = "HashBatchProbe";
+ break;
case WAIT_EVENT_HASH_BUILD_ALLOCATE:
event_name = "HashBuildAllocate";
break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 3e200e02cc..2e7b0687ef 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -204,6 +204,27 @@ BarrierArriveAndDetach(Barrier *barrier)
{
return BarrierDetachImpl(barrier, true);
}
+/*
+ * Upon arriving at the barrier, if this worker is not the last worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ */
+bool
+BarrierArriveAndDetachExceptLast(Barrier *barrier)
+{
+ SpinLockAcquire(&barrier->mutex);
+ if (barrier->participants > 1)
+ {
+ --barrier->participants;
+ SpinLockRelease(&barrier->mutex);
+ return false;
+ }
+ Assert(barrier->participants == 1);
+ ++barrier->phase;
+ SpinLockRelease(&barrier->mutex);
+ return true;
+}
/*
* Attach to a barrier. All waiting participants will now wait for this
@@ -221,7 +242,6 @@ BarrierAttach(Barrier *barrier)
++barrier->participants;
phase = barrier->phase;
SpinLockRelease(&barrier->mutex);
-
return phase;
}
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..2af7228ef0 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,14 @@ typedef struct ParallelHashJoinBatchAccessor
bool at_least_one_chunk; /* has this backend allocated a chunk? */
bool done; /* flag to remember that a batch is done */
+ /*
+ * While doing the unmatched inner scan, the assigned worker may emit
+ * tuples. Thus, we must keep track of where it was in the hashtable
+ * so it can return to the correct offset within the correct chunk.
+ */
+ dsa_pointer shared_chunk;
+ HashMemoryChunk current_chunk;
+ size_t current_chunk_idx;
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
} ParallelHashJoinBatchAccessor;
@@ -265,7 +273,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_FILL_INNER_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
@@ -351,6 +360,8 @@ typedef struct HashJoinTableData
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
+ size_t current_chunk_idx; /* index of tuple within current chunk for serial unmatched inner scan */
+
/* Shared and private state for Parallel Hash. */
HashMemoryChunk current_chunk; /* this backend's current chunk */
dsa_area *area; /* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac46b4..62d5f1d16b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -856,6 +856,7 @@ typedef enum
WAIT_EVENT_HASH_BATCH_ALLOCATE,
WAIT_EVENT_HASH_BATCH_ELECT,
WAIT_EVENT_HASH_BATCH_LOAD,
+ WAIT_EVENT_HASH_BATCH_PROBE,
WAIT_EVENT_HASH_BUILD_ALLOCATE,
WAIT_EVENT_HASH_BUILD_ELECT,
WAIT_EVENT_HASH_BUILD_HASH_INNER,
diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h
index d71927cc2f..e0de24378b 100644
--- a/src/include/storage/barrier.h
+++ b/src/include/storage/barrier.h
@@ -37,6 +37,7 @@ typedef struct Barrier
extern void BarrierInit(Barrier *barrier, int num_workers);
extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
extern bool BarrierArriveAndDetach(Barrier *barrier);
+extern bool BarrierArriveAndDetachExceptLast(Barrier *barrier);
extern int BarrierAttach(Barrier *barrier);
extern bool BarrierDetach(Barrier *barrier);
extern int BarrierPhase(Barrier *barrier);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.25.1
I've attached a patch with the corrections I mentioned upthread.
I've gone ahead and run pgindent, though, I can't say that I'm very
happy with the result.
I'm still not quite happy with the name
BarrierArriveAndDetachExceptLast(). It's so literal. As you said, there
probably isn't a nice name for this concept, since it is a function with
the purpose of terminating parallelism.
Regards,
Melanie (Microsoft)
Attachments:
v3-0001-Support-Parallel-FOJ-and-ROJ.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Support-Parallel-FOJ-and-ROJ.patchDownload
From 0b13b62a8aac071393ac65b19dc1ec86daa967f2 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v3] Support Parallel FOJ and ROJ
Previously, parallel full and right outer join were not supported due to
a potential deadlock hazard posed by allowing workers to wait on a
barrier after barrier participants have started emitting tuples. More
details on the deadlock hazard can be found in the thread [1].
For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last worker to arrive at the barrier
preps for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.
To align parallel and serial hash join, change
ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
unmatched tuple scan instead of accessing tuples through the hash table
buckets.
[1] https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
src/backend/executor/nodeHash.c | 190 ++++++++++++++++++------
src/backend/executor/nodeHashjoin.c | 61 ++++----
src/backend/optimizer/path/joinpath.c | 14 +-
src/backend/postmaster/pgstat.c | 3 +
src/backend/storage/ipc/barrier.c | 21 +++
src/include/executor/hashjoin.h | 15 +-
src/include/executor/nodeHash.h | 3 +
src/include/pgstat.h | 1 +
src/include/storage/barrier.h | 1 +
src/test/regress/expected/join_hash.out | 56 ++++++-
src/test/regress/sql/join_hash.sql | 23 ++-
11 files changed, 300 insertions(+), 88 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..36cc752163 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
hashtable->chunks = NULL;
hashtable->current_chunk = NULL;
+ hashtable->current_chunk_idx = 0;
hashtable->parallel_state = state->parallel_state;
hashtable->area = state->ps.state->es_query_dsa;
hashtable->batches = NULL;
@@ -2053,9 +2054,58 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
* hj_CurTuple: last tuple returned, or NULL to start next bucket
*----------
*/
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ hashtable->current_chunk = hashtable->chunks;
+ hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool last = false;
+
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = 0;
+ hjstate->hj_CurTuple = NULL;
+ if (curbatch < 0)
+ return false;
+ last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+ if (!last)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ }
+ else
+ {
+ batch_accessor->shared_chunk = batch->chunks;
+ batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+ batch_accessor->current_chunk_idx = 0;
+ }
+ return last;
}
/*
@@ -2069,60 +2119,110 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
bool
ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
{
+ HashMemoryChunk next;
HashJoinTable hashtable = hjstate->hj_HashTable;
- HashJoinTuple hashTuple = hjstate->hj_CurTuple;
- for (;;)
+ while (hashtable->current_chunk)
{
- /*
- * hj_CurTuple is the address of the tuple last returned from the
- * current bucket, or NULL if it's time to start scanning a new
- * bucket.
- */
- if (hashTuple != NULL)
- hashTuple = hashTuple->next.unshared;
- else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
{
- hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
- hjstate->hj_CurBucketNo++;
- }
- else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
- {
- int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+ HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(hashtable->current_chunk) +
+ hashtable->current_chunk_idx);
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+ int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* next tuple in this chunk */
+ hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- hashTuple = hashtable->skewBucket[j]->tuples;
- hjstate->hj_CurSkewBucketNo++;
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple =
+ ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't do
+ * any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- else
- break; /* finished all buckets */
- while (hashTuple != NULL)
+ next = hashtable->current_chunk->next.unshared;
+ hashtable->current_chunk = next;
+ hashtable->current_chunk_idx = 0;
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+ /*
+ * Only one worker should execute this function. Since tuples have already
+ * been emitted, it is hazardous for workers to wait at the batch_barrier
+ * again. Instead, all workers except the last will detach and the last
+ * will conduct this unmatched inner tuple scan.
+ */
+ Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+ while (accessor->current_chunk)
+ {
+ while (accessor->current_chunk_idx < accessor->current_chunk->used)
{
- if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
- {
- TupleTableSlot *inntuple;
+ HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(accessor->current_chunk) +
+ accessor->current_chunk_idx);
- /* insert hashtable's tuple into exec slot */
- inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
- hjstate->hj_HashTupleSlot,
- false); /* do not pfree */
- econtext->ecxt_innertuple = inntuple;
+ accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
- /*
- * Reset temp memory each time; although this function doesn't
- * do any qual eval, the caller will, so let's keep it
- * parallel to ExecScanHashBucket.
- */
- ResetExprContext(econtext);
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- hjstate->hj_CurTuple = hashTuple;
- return true;
- }
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot, false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't do
+ * any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
- hashTuple = hashTuple->next.unshared;
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- /* allow this loop to be cancellable */
+ accessor->shared_chunk = accessor->current_chunk->next.shared;
+ accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+ accessor->current_chunk_idx = 0;
+
CHECK_FOR_INTERRUPTS();
}
@@ -3131,13 +3231,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
@@ -3271,6 +3364,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
hashtable->current_chunk = NULL;
hashtable->current_chunk_shared = InvalidDsaPointer;
hashtable->batches[batchno].at_least_one_chunk = false;
+ hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+ hashtable->batches[batchno].current_chunk = NULL;
+ hashtable->batches[batchno].current_chunk_idx = 0;
}
/*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..791c97e17d 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,9 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
- * PHJ_BATCH_DONE -- end
+
+ * PHJ_BATCH_DONE -- queries not requiring inner fill done
+ * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
*
* Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -99,7 +101,9 @@
* while attached to a barrier, unless the barrier has reached its final
* state. In the slightly special case of the per-batch barrier, we return
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or
+ * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires
+ * a scan for unmatched inner tuples, without waiting.
*
*-------------------------------------------------------------------------
*/
@@ -360,9 +364,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
- /* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +469,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node), but
+ * we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +533,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase so
+ * that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
-
case PHJ_BATCH_DONE:
+ /* Fall through. */
+
+ case PHJ_BATCH_FILL_INNER_DONE:
/*
* Already done. Detach and go around again (if any
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 4a35903b29..8f495cc6c2 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root,
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
- * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
- * Hash, since in that case we're back to a single hash table with a
- * single set of match bits for each batch, but that will require
- * figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * (building the hash table in each backend) because no one
+ * process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f1dca2f25b..1d7af3dd88 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3928,6 +3928,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BATCH_LOAD:
event_name = "HashBatchLoad";
break;
+ case WAIT_EVENT_HASH_BATCH_PROBE:
+ event_name = "HashBatchProbe";
+ break;
case WAIT_EVENT_HASH_BUILD_ALLOCATE:
event_name = "HashBuildAllocate";
break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 3e200e02cc..de901752c0 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -205,6 +205,27 @@ BarrierArriveAndDetach(Barrier *barrier)
return BarrierDetachImpl(barrier, true);
}
+/*
+ * Upon arriving at the barrier, if the caller is not the last worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, returning true.
+ */
+bool
+BarrierArriveAndDetachExceptLast(Barrier *barrier)
+{
+ SpinLockAcquire(&barrier->mutex);
+ if (barrier->participants > 1)
+ {
+ --barrier->participants;
+ SpinLockRelease(&barrier->mutex);
+ return false;
+ }
+ Assert(barrier->participants == 1);
+ ++barrier->phase;
+ SpinLockRelease(&barrier->mutex);
+ return true;
+}
+
/*
* Attach to a barrier. All waiting participants will now wait for this
* participant to call BarrierArriveAndWait(), BarrierDetach() or
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..24ba311141 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,15 @@ typedef struct ParallelHashJoinBatchAccessor
bool at_least_one_chunk; /* has this backend allocated a chunk? */
bool done; /* flag to remember that a batch is done */
+
+ /*
+ * While doing the unmatched inner scan, the assigned worker may emit
+ * tuples. Thus, we must keep track of where it was in the hashtable so it
+ * can return to the correct offset within the correct chunk.
+ */
+ dsa_pointer shared_chunk;
+ HashMemoryChunk current_chunk;
+ size_t current_chunk_idx;
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
} ParallelHashJoinBatchAccessor;
@@ -265,7 +274,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_FILL_INNER_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
@@ -351,6 +361,9 @@ typedef struct HashJoinTableData
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
+ /* index of tuple within current chunk for serial unmatched inner scan */
+ size_t current_chunk_idx;
+
/* Shared and private state for Parallel Hash. */
HashMemoryChunk current_chunk; /* this backend's current chunk */
dsa_area *area; /* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 257e515bfe..d9a0834e08 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -927,6 +927,7 @@ typedef enum
WAIT_EVENT_HASH_BATCH_ALLOCATE,
WAIT_EVENT_HASH_BATCH_ELECT,
WAIT_EVENT_HASH_BATCH_LOAD,
+ WAIT_EVENT_HASH_BATCH_PROBE,
WAIT_EVENT_HASH_BUILD_ALLOCATE,
WAIT_EVENT_HASH_BUILD_ELECT,
WAIT_EVENT_HASH_BUILD_HASH_INNER,
diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h
index d71927cc2f..e0de24378b 100644
--- a/src/include/storage/barrier.h
+++ b/src/include/storage/barrier.h
@@ -37,6 +37,7 @@ typedef struct Barrier
extern void BarrierInit(Barrier *barrier, int num_workers);
extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
extern bool BarrierArriveAndDetach(Barrier *barrier);
+extern bool BarrierArriveAndDetachExceptLast(Barrier *barrier);
extern int BarrierAttach(Barrier *barrier);
extern bool BarrierDetach(Barrier *barrier);
extern int BarrierPhase(Barrier *barrier);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.25.1
Hi Melanie,
On Thu, Nov 5, 2020 at 7:34 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:
I've attached a patch with the corrections I mentioned upthread.
I've gone ahead and run pgindent, though, I can't say that I'm very
happy with the result.I'm still not quite happy with the name
BarrierArriveAndDetachExceptLast(). It's so literal. As you said, there
probably isn't a nice name for this concept, since it is a function with
the purpose of terminating parallelism.
You sent in your patch, v3-0001-Support-Parallel-FOJ-and-ROJ.patch to
pgsql-hackers on Nov 5, but you did not post it to the next
CommitFest[1]https://commitfest.postgresql.org/31/. If this was intentional, then you need to take no
action. However, if you want your patch to be reviewed as part of the
upcoming CommitFest, then you need to add it yourself before
2021-01-01 AOE[2]https://en.wikipedia.org/wiki/Anywhere_on_Earth. Also, rebasing to the current HEAD may be required
as almost two months passed since when this patch is submitted. Thanks
for your contributions.
Regards,
[1]: https://commitfest.postgresql.org/31/
[2]: https://en.wikipedia.org/wiki/Anywhere_on_Earth
Regards,
--
Masahiko Sawada
EnterpriseDB: https://www.enterprisedb.com/
On Mon, Dec 28, 2020 at 9:49 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Nov 5, 2020 at 7:34 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:I've attached a patch with the corrections I mentioned upthread.
I've gone ahead and run pgindent, though, I can't say that I'm very
happy with the result.I'm still not quite happy with the name
BarrierArriveAndDetachExceptLast(). It's so literal. As you said, there
probably isn't a nice name for this concept, since it is a function with
the purpose of terminating parallelism.You sent in your patch, v3-0001-Support-Parallel-FOJ-and-ROJ.patch to
pgsql-hackers on Nov 5, but you did not post it to the next
CommitFest[1]. If this was intentional, then you need to take no
action. However, if you want your patch to be reviewed as part of the
upcoming CommitFest, then you need to add it yourself before
2021-01-01 AOE[2]. Also, rebasing to the current HEAD may be required
as almost two months passed since when this patch is submitted. Thanks
for your contributions.
Thanks for this reminder Sawada-san. I had some feedback I meant to
post in November but didn't get around to:
+bool
+BarrierArriveAndDetachExceptLast(Barrier *barrier)
I committed this part (7888b099). I've attached a rebase of the rest
of Melanie's v3 patch.
+ WAIT_EVENT_HASH_BATCH_PROBE,
That new wait event isn't needed (we can't and don't wait).
* PHJ_BATCH_PROBING -- all probe
- * PHJ_BATCH_DONE -- end
+
+ * PHJ_BATCH_DONE -- queries not requiring inner fill done
+ * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
Would it be better/tidier to keep _DONE as the final phase? That is,
to switch around these two final phases. Or does that make it too
hard to coordinate the detach-and-cleanup logic?
+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the
unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
Comment name doesn't match function name.
Attachments:
v4-0001-Parallel-Hash-Full-Right-Join.patchtext/x-patch; charset=US-ASCII; name=v4-0001-Parallel-Hash-Full-Right-Join.patchDownload
From 9199bfcfa84acbcfeb9a8d3c21962096c7ff645c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 5 Nov 2020 16:20:24 +1300
Subject: [PATCH v4 1/2] Parallel Hash Full/Right Join.
Previously we allowed PHJ only for inner and left outer joins.
Share the hash table match bits between backends, so we can also also
handle full and right joins. In order to do that without introducing
any deadlock risks, for now we drop down to a single process for the
unmatched scan at the end of each batch. Other processes detach and
look for another batch to help with. If there aren't any more batches,
they'll finish the hash join early, making work distribution suboptimal.
Improving that might require bigger executor changes.
Also switch the unmatched tuple scan to work in memory-chunk order,
rather than bucket order. This prepares for potential later
improvements that would use chunks as parallel grain, and seems to be a
little cache-friendlier than the bucket-scan scheme scheme in the
meantime.
Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
src/backend/executor/nodeHash.c | 190 ++++++++++++++++++------
src/backend/executor/nodeHashjoin.c | 61 ++++----
src/backend/optimizer/path/joinpath.c | 14 +-
src/include/executor/hashjoin.h | 15 +-
src/include/executor/nodeHash.h | 3 +
src/test/regress/expected/join_hash.out | 56 ++++++-
src/test/regress/sql/join_hash.sql | 23 ++-
7 files changed, 274 insertions(+), 88 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..36cc752163 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
hashtable->chunks = NULL;
hashtable->current_chunk = NULL;
+ hashtable->current_chunk_idx = 0;
hashtable->parallel_state = state->parallel_state;
hashtable->area = state->ps.state->es_query_dsa;
hashtable->batches = NULL;
@@ -2053,9 +2054,58 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
* hj_CurTuple: last tuple returned, or NULL to start next bucket
*----------
*/
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ hashtable->current_chunk = hashtable->chunks;
+ hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool last = false;
+
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = 0;
+ hjstate->hj_CurTuple = NULL;
+ if (curbatch < 0)
+ return false;
+ last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+ if (!last)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ }
+ else
+ {
+ batch_accessor->shared_chunk = batch->chunks;
+ batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+ batch_accessor->current_chunk_idx = 0;
+ }
+ return last;
}
/*
@@ -2069,60 +2119,110 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
bool
ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
{
+ HashMemoryChunk next;
HashJoinTable hashtable = hjstate->hj_HashTable;
- HashJoinTuple hashTuple = hjstate->hj_CurTuple;
- for (;;)
+ while (hashtable->current_chunk)
{
- /*
- * hj_CurTuple is the address of the tuple last returned from the
- * current bucket, or NULL if it's time to start scanning a new
- * bucket.
- */
- if (hashTuple != NULL)
- hashTuple = hashTuple->next.unshared;
- else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+ while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
{
- hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
- hjstate->hj_CurBucketNo++;
- }
- else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
- {
- int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+ HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(hashtable->current_chunk) +
+ hashtable->current_chunk_idx);
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+ int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* next tuple in this chunk */
+ hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- hashTuple = hashtable->skewBucket[j]->tuples;
- hjstate->hj_CurSkewBucketNo++;
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple =
+ ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't do
+ * any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
+
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- else
- break; /* finished all buckets */
- while (hashTuple != NULL)
+ next = hashtable->current_chunk->next.unshared;
+ hashtable->current_chunk = next;
+ hashtable->current_chunk_idx = 0;
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+ /*
+ * Only one worker should execute this function. Since tuples have already
+ * been emitted, it is hazardous for workers to wait at the batch_barrier
+ * again. Instead, all workers except the last will detach and the last
+ * will conduct this unmatched inner tuple scan.
+ */
+ Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+ while (accessor->current_chunk)
+ {
+ while (accessor->current_chunk_idx < accessor->current_chunk->used)
{
- if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
- {
- TupleTableSlot *inntuple;
+ HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(accessor->current_chunk) +
+ accessor->current_chunk_idx);
- /* insert hashtable's tuple into exec slot */
- inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
- hjstate->hj_HashTupleSlot,
- false); /* do not pfree */
- econtext->ecxt_innertuple = inntuple;
+ accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
- /*
- * Reset temp memory each time; although this function doesn't
- * do any qual eval, the caller will, so let's keep it
- * parallel to ExecScanHashBucket.
- */
- ResetExprContext(econtext);
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- hjstate->hj_CurTuple = hashTuple;
- return true;
- }
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot, false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't do
+ * any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
- hashTuple = hashTuple->next.unshared;
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- /* allow this loop to be cancellable */
+ accessor->shared_chunk = accessor->current_chunk->next.shared;
+ accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+ accessor->current_chunk_idx = 0;
+
CHECK_FOR_INTERRUPTS();
}
@@ -3131,13 +3231,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
@@ -3271,6 +3364,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
hashtable->current_chunk = NULL;
hashtable->current_chunk_shared = InvalidDsaPointer;
hashtable->batches[batchno].at_least_one_chunk = false;
+ hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+ hashtable->batches[batchno].current_chunk = NULL;
+ hashtable->batches[batchno].current_chunk_idx = 0;
}
/*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..791c97e17d 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,9 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
- * PHJ_BATCH_DONE -- end
+
+ * PHJ_BATCH_DONE -- queries not requiring inner fill done
+ * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
*
* Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -99,7 +101,9 @@
* while attached to a barrier, unless the barrier has reached its final
* state. In the slightly special case of the per-batch barrier, we return
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or
+ * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires
+ * a scan for unmatched inner tuples, without waiting.
*
*-------------------------------------------------------------------------
*/
@@ -360,9 +364,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
- /* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +469,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node), but
+ * we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +533,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase so
+ * that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
-
case PHJ_BATCH_DONE:
+ /* Fall through. */
+
+ case PHJ_BATCH_FILL_INNER_DONE:
/*
* Already done. Detach and go around again (if any
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 4a35903b29..8f495cc6c2 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root,
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
- * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
- * Hash, since in that case we're back to a single hash table with a
- * single set of match bits for each batch, but that will require
- * figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * (building the hash table in each backend) because no one
+ * process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..24ba311141 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,15 @@ typedef struct ParallelHashJoinBatchAccessor
bool at_least_one_chunk; /* has this backend allocated a chunk? */
bool done; /* flag to remember that a batch is done */
+
+ /*
+ * While doing the unmatched inner scan, the assigned worker may emit
+ * tuples. Thus, we must keep track of where it was in the hashtable so it
+ * can return to the correct offset within the correct chunk.
+ */
+ dsa_pointer shared_chunk;
+ HashMemoryChunk current_chunk;
+ size_t current_chunk_idx;
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
} ParallelHashJoinBatchAccessor;
@@ -265,7 +274,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_FILL_INNER_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
@@ -351,6 +361,9 @@ typedef struct HashJoinTableData
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
+ /* index of tuple within current chunk for serial unmatched inner scan */
+ size_t current_chunk_idx;
+
/* Shared and private state for Parallel Hash. */
HashMemoryChunk current_chunk; /* this backend's current chunk */
dsa_area *area; /* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.20.1
On Tue, Dec 29, 2020 at 03:28:12PM +1300, Thomas Munro wrote:
I had some feedback I meant to
post in November but didn't get around to:* PHJ_BATCH_PROBING -- all probe - * PHJ_BATCH_DONE -- end + + * PHJ_BATCH_DONE -- queries not requiring inner fill done + * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries doneWould it be better/tidier to keep _DONE as the final phase? That is,
to switch around these two final phases. Or does that make it too
hard to coordinate the detach-and-cleanup logic?
I updated this to use your suggestion. My rationale for having
PHJ_BATCH_DONE and then PHJ_BATCH_FILL_INNER_DONE was that, for a worker
attaching to the batch for the first time, it might be confusing that it
is in the PHJ_BATCH_FILL_INNER state (not the DONE state) and yet that
worker still just detaches and moves on. It didn't seem intuitive.
Anyway, I think that is all sort of confusing and unnecessary. I changed
it to PHJ_BATCH_FILLING_INNER -- then when a worker who hasn't ever been
attached to this batch before attaches, it will be in the
PHJ_BATCH_FILLING_INNER phase, which it cannot help with and it will
detach and move on.
+/* + * ExecPrepHashTableForUnmatched + * set up for a series of ExecScanHashTableForUnmatched calls + * return true if this worker is elected to do the unmatched inner scan + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)Comment name doesn't match function name.
Updated -- and a few other comment updates too.
I just attached the diff.
Attachments:
v4-0002-Update-comments-and-phase-naming.patchtext/x-diff; charset=us-asciiDownload
From 213c36f9e125f52eb6731005d5dcdadccccca73a Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Thu, 11 Feb 2021 16:31:37 -0500
Subject: [PATCH v4 2/2] Update comments and phase naming
---
src/backend/executor/nodeHash.c | 19 +++++++++++++------
src/backend/executor/nodeHashjoin.c | 4 ++--
src/include/executor/hashjoin.h | 4 ++--
3 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index dd8d12203a..6305688efd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2047,11 +2047,10 @@ void
ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
{
/*----------
- * During this scan we use the HashJoinState fields as follows:
+ * During this scan we use the HashJoinTable fields as follows:
*
- * hj_CurBucketNo: next regular bucket to scan
- * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
- * hj_CurTuple: last tuple returned, or NULL to start next bucket
+ * current_chunk: current HashMemoryChunk to scan
+ * current_chunk_idx: index in current HashMemoryChunk
*----------
*/
HashJoinTable hashtable = hjstate->hj_HashTable;
@@ -2064,13 +2063,21 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
}
/*
- * ExecPrepHashTableForUnmatched
- * set up for a series of ExecScanHashTableForUnmatched calls
+ * ExecParallelPrepHashTableForUnmatched
+ * set up for a series of ExecParallelScanHashTableForUnmatched calls
* return true if this worker is elected to do the unmatched inner scan
*/
bool
ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
{
+ /*----------
+ * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+ * current batch as follows:
+ *
+ * current_chunk: current HashMemoryChunk to scan
+ * current_chunk_idx: index in current HashMemoryChunk
+ *----------
+ */
HashJoinTable hashtable = hjstate->hj_HashTable;
int curbatch = hashtable->curbatch;
ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 37b49369aa..40c483cd0c 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1183,10 +1183,10 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
- case PHJ_BATCH_DONE:
+ case PHJ_BATCH_FILLING_INNER:
/* Fall through. */
- case PHJ_BATCH_FILL_INNER_DONE:
+ case PHJ_BATCH_DONE:
/*
* Already done. Detach and go around again (if any
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 634a212142..66fea4ac58 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -274,8 +274,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
-#define PHJ_BATCH_FILL_INNER_DONE 5
+#define PHJ_BATCH_FILLING_INNER 4
+#define PHJ_BATCH_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
--
2.25.1
On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:
I just attached the diff.
Squashed into one patch for the cfbot to chew on, with a few minor
adjustments to a few comments.
Attachments:
v5-0001-Parallel-Hash-Full-Right-Outer-Join.patchtext/x-patch; charset=US-ASCII; name=v5-0001-Parallel-Hash-Full-Right-Outer-Join.patchDownload
From 87c74af25940b0fc85186b0defe6e21ea2324c28 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v5] Parallel Hash {Full,Right} Outer Join.
Previously, parallel full and right outer joins were not supported due
to a potential deadlock hazard posed by allowing workers to wait on a
barrier after barrier participants have started emitting tuples. More
details on the deadlock hazard can be found in the referenced
discussion.
For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last process to arrive at the barrier
prepares for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.
To align parallel and serial hash join, change
ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
unmatched tuple scan instead of accessing tuples through the hash table
buckets.
Author: Melanie Plageman <melanieplageman@gmail.com>
Author: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
src/backend/executor/nodeHash.c | 205 ++++++++++++++++++------
src/backend/executor/nodeHashjoin.c | 58 +++----
src/backend/optimizer/path/joinpath.c | 14 +-
src/include/executor/hashjoin.h | 15 +-
src/include/executor/nodeHash.h | 3 +
src/test/regress/expected/join_hash.out | 56 ++++++-
src/test/regress/sql/join_hash.sql | 23 ++-
7 files changed, 283 insertions(+), 91 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index c5f2d1d22b..6305688efd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
hashtable->chunks = NULL;
hashtable->current_chunk = NULL;
+ hashtable->current_chunk_idx = 0;
hashtable->parallel_state = state->parallel_state;
hashtable->area = state->ps.state->es_query_dsa;
hashtable->batches = NULL;
@@ -2046,16 +2047,72 @@ void
ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
{
/*----------
- * During this scan we use the HashJoinState fields as follows:
+ * During this scan we use the HashJoinTable fields as follows:
*
- * hj_CurBucketNo: next regular bucket to scan
- * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
- * hj_CurTuple: last tuple returned, or NULL to start next bucket
+ * current_chunk: current HashMemoryChunk to scan
+ * current_chunk_idx: index in current HashMemoryChunk
*----------
*/
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurSkewBucketNo = 0;
hjstate->hj_CurTuple = NULL;
+ hashtable->current_chunk = hashtable->chunks;
+ hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ * set up for a series of ExecParallelScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+ /*----------
+ * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+ * current batch as follows:
+ *
+ * current_chunk: current HashMemoryChunk to scan
+ * current_chunk_idx: index in current HashMemoryChunk
+ *----------
+ */
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+ bool last = false;
+
+ hjstate->hj_CurBucketNo = 0;
+ hjstate->hj_CurSkewBucketNo = 0;
+ hjstate->hj_CurTuple = NULL;
+ if (curbatch < 0)
+ return false;
+ last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+ if (!last)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /*
+ * Track the largest batch we've been attached to. Though each
+ * backend might see a different subset of batches, explain.c will
+ * scan the results from all backends to find the largest value.
+ */
+ hashtable->spacePeak =
+ Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+ hashtable->curbatch = -1;
+ }
+ else
+ {
+ batch_accessor->shared_chunk = batch->chunks;
+ batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+ batch_accessor->current_chunk_idx = 0;
+ }
+ return last;
}
/*
@@ -2069,60 +2126,110 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
bool
ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
{
+ HashMemoryChunk next;
HashJoinTable hashtable = hjstate->hj_HashTable;
- HashJoinTuple hashTuple = hjstate->hj_CurTuple;
- for (;;)
+ while (hashtable->current_chunk)
{
- /*
- * hj_CurTuple is the address of the tuple last returned from the
- * current bucket, or NULL if it's time to start scanning a new
- * bucket.
- */
- if (hashTuple != NULL)
- hashTuple = hashTuple->next.unshared;
- else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
- {
- hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
- hjstate->hj_CurBucketNo++;
- }
- else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
+ while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
{
- int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+ HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(hashtable->current_chunk) +
+ hashtable->current_chunk_idx);
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+ int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* next tuple in this chunk */
+ hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
+
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple =
+ ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't do
+ * any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
- hashTuple = hashtable->skewBucket[j]->tuples;
- hjstate->hj_CurSkewBucketNo++;
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- else
- break; /* finished all buckets */
- while (hashTuple != NULL)
+ next = hashtable->current_chunk->next.unshared;
+ hashtable->current_chunk = next;
+ hashtable->current_chunk_idx = 0;
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * no more unmatched tuples
+ */
+ return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ * scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+ /*
+ * Only one worker should execute this function. Since tuples have already
+ * been emitted, it is hazardous for workers to wait at the batch_barrier
+ * again. Instead, all workers except the last will detach and the last
+ * will conduct this unmatched inner tuple scan.
+ */
+ Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+ while (accessor->current_chunk)
+ {
+ while (accessor->current_chunk_idx < accessor->current_chunk->used)
{
- if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
- {
- TupleTableSlot *inntuple;
+ HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(accessor->current_chunk) +
+ accessor->current_chunk_idx);
- /* insert hashtable's tuple into exec slot */
- inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
- hjstate->hj_HashTupleSlot,
- false); /* do not pfree */
- econtext->ecxt_innertuple = inntuple;
+ accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
- /*
- * Reset temp memory each time; although this function doesn't
- * do any qual eval, the caller will, so let's keep it
- * parallel to ExecScanHashBucket.
- */
- ResetExprContext(econtext);
+ if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+ continue;
- hjstate->hj_CurTuple = hashTuple;
- return true;
- }
+ /* insert hashtable's tuple into exec slot */
+ econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot, false);
+
+ /*
+ * Reset temp memory each time; although this function doesn't do
+ * any qual eval, the caller will, so let's keep it parallel to
+ * ExecScanHashBucket.
+ */
+ ResetExprContext(econtext);
- hashTuple = hashTuple->next.unshared;
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
}
- /* allow this loop to be cancellable */
+ accessor->shared_chunk = accessor->current_chunk->next.shared;
+ accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+ accessor->current_chunk_idx = 0;
+
CHECK_FOR_INTERRUPTS();
}
@@ -3131,13 +3238,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
{
- /*
- * Technically we shouldn't access the barrier because we're no
- * longer attached, but since there is no way it's moving after
- * this point it seems safe to make the following assertion.
- */
- Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
/* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks))
{
@@ -3271,6 +3371,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
hashtable->current_chunk = NULL;
hashtable->current_chunk_shared = InvalidDsaPointer;
hashtable->batches[batchno].at_least_one_chunk = false;
+ hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+ hashtable->batches[batchno].current_chunk = NULL;
+ hashtable->batches[batchno].current_chunk_idx = 0;
}
/*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 510bdd39ad..dc526b49bd 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,8 @@
* PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBING -- all probe
- * PHJ_BATCH_DONE -- end
+ * PHJ_BATCH_FILLING_INNER -- full/right outer scan
+ * PHJ_BATCH_DONE -- done
*
* Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -99,7 +100,9 @@
* while attached to a barrier, unless the barrier has reached its final
* state. In the slightly special case of the per-batch barrier, we return
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or
+ * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires
+ * a scan for unmatched inner tuples, without waiting.
*
*-------------------------------------------------------------------------
*/
@@ -360,9 +363,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
- /* set up to scan for unmatched inner tuples */
- ExecPrepHashTableForUnmatched(node);
- node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ if (parallel)
+ {
+ if (ExecParallelPrepHashTableForUnmatched(node))
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ else
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+ }
+ else
+ {
+ /* set up to scan for unmatched inner tuples */
+ ExecPrepHashTableForUnmatched(node);
+ node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+ }
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +468,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
node->hj_MatchedOuter = true;
- if (parallel)
- {
- /*
- * Full/right outer joins are currently not supported
- * for parallel joins, so we don't need to set the
- * match bit. Experiments show that it's worth
- * avoiding the shared memory traffic on large
- * systems.
- */
- Assert(!HJ_FILL_INNER(node));
- }
- else
- {
- /*
- * This is really only needed if HJ_FILL_INNER(node),
- * but we'll avoid the branch and just set it always.
- */
+
+ /*
+ * This is really only needed if HJ_FILL_INNER(node), but
+ * we'll avoid the branch and just set it always.
+ */
+ if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
- }
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +532,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
- if (!ExecScanHashTableForUnmatched(node, econtext))
+ if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+ : ExecScanHashTableForUnmatched(node, econtext)))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1173,13 +1175,15 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* hash table stays alive until everyone's finished
* probing it, but no participant is allowed to wait at
* this barrier again (or else a deadlock could occur).
- * All attached participants must eventually call
- * BarrierArriveAndDetach() so that the final phase
- * PHJ_BATCH_DONE can be reached.
+ * All attached participants must eventually detach from
+ * the barrier and one worker must advance the phase so
+ * that the final phase is reached.
*/
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true;
+ case PHJ_BATCH_FILLING_INNER:
+ /* Fall through. */
case PHJ_BATCH_DONE:
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 57ce97fd53..dd9e26dcd3 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root,
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
- * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
- * Hash, since in that case we're back to a single hash table with a
- * single set of match bits for each batch, but that will require
- * figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
- save_jointype != JOIN_FULL &&
- save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
@@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root,
* total inner path will also be parallel-safe, but if not, we'll
* have to search for the cheapest safe, unparameterized inner
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
- * inner path.
+ * inner path. If full or right join, we can't use parallelism
+ * (building the hash table in each backend) because no one
+ * process has all the match bits.
*/
- if (cheapest_total_inner->parallel_safe)
+ if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+ cheapest_safe_inner = NULL;
+ else if (cheapest_total_inner->parallel_safe)
cheapest_safe_inner = cheapest_total_inner;
else if (save_jointype != JOIN_UNIQUE_INNER)
cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index d74034f64f..66fea4ac58 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,15 @@ typedef struct ParallelHashJoinBatchAccessor
bool at_least_one_chunk; /* has this backend allocated a chunk? */
bool done; /* flag to remember that a batch is done */
+
+ /*
+ * While doing the unmatched inner scan, the assigned worker may emit
+ * tuples. Thus, we must keep track of where it was in the hashtable so it
+ * can return to the correct offset within the correct chunk.
+ */
+ dsa_pointer shared_chunk;
+ HashMemoryChunk current_chunk;
+ size_t current_chunk_idx;
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
} ParallelHashJoinBatchAccessor;
@@ -265,7 +274,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
-#define PHJ_BATCH_DONE 4
+#define PHJ_BATCH_FILLING_INNER 4
+#define PHJ_BATCH_DONE 5
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
@@ -351,6 +361,9 @@ typedef struct HashJoinTableData
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
+ /* index of tuple within current chunk for serial unmatched inner scan */
+ size_t current_chunk_idx;
+
/* Shared and private state for Parallel Hash. */
HashMemoryChunk current_chunk; /* this backend's current chunk */
dsa_area *area; /* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 3fbe02e80d..7460dfff64 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+ ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id);
20000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r full outer join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
rollback to settings;
-- An full outer join where every record is not matched.
-- non-parallel
@@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
(1 row)
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
40000
(1 row)
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Full Join
+ Hash Cond: ((0 - s.id) = r.id)
+ -> Parallel Seq Scan on simple s
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple r
+(9 rows)
+
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count
+-------
+ 40000
+(1 row)
+
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
select count(*) from simple r full outer join simple s using (id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s using (id);
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
savepoint settings;
+set enable_parallel_hash = off;
set local max_parallel_workers_per_gather = 2;
explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
--
2.30.1
On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
<melanieplageman@gmail.com> wrote:I just attached the diff.
Squashed into one patch for the cfbot to chew on, with a few minor
adjustments to a few comments.
I did some more minor tidying of comments and naming. It's been on my
to-do-list to update some phase names after commit 3048898e, and while
doing that I couldn't resist the opportunity to change DONE to FREE,
which somehow hurts my brain less, and makes much more obvious sense
after the bugfix in CF #3031 that splits DONE into two separate
phases. It also pairs obviously with ALLOCATE. I include a copy of
that bugix here too as 0001, because I'll likely commit that first, so
I rebased the stack of patches that way. 0002 includes the renaming I
propose (master only). Then 0003 is Melanie's patch, using the name
SCAN for the new match bit scan phase. I've attached an updated
version of my "phase diagram" finger painting, to show how it looks
with these three patches. "scan*" is new.
Attachments:
phj-phases-with-full-scan.pngimage/png; name=phj-phases-with-full-scan.pngDownload
�PNG
IHDR � � ?�ez sBIT|d� tEXtSoftware gnome-screenshot��>