From 6d34cbee84b06aa27d6c73426f29ef0d50dadb3a Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 29 Sep 2020 13:47:57 -0700
Subject: [PATCH v2] Support Parallel FOJ and ROJ

To support parallel FOJ and ROJ,
- re-enable setting match bit for tuples in the hash table
- a single worker preps for unmatched inner tuple scan in
  HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock.
	ExecParallelScanHashTableForUnmatched() is no longer executed by
	multiple workers. A single worker will scan each HashMemoryChunk
	in the hash table, freeing it after finishing with it.

- To align parallel and serial hash join, change
  ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
  unmatched tuple scan instead of using the buckets
---
 src/backend/executor/nodeHash.c         | 195 ++++++++++++++++++------
 src/backend/executor/nodeHashjoin.c     |  61 ++++----
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/backend/postmaster/pgstat.c         |   3 +
 src/backend/storage/ipc/barrier.c       |  22 ++-
 src/include/executor/hashjoin.h         |  13 +-
 src/include/executor/nodeHash.h         |   3 +
 src/include/pgstat.h                    |   1 +
 src/include/storage/barrier.h           |   1 +
 src/test/regress/expected/join_hash.out |  56 ++++++-
 src/test/regress/sql/join_hash.sql      |  23 ++-
 11 files changed, 302 insertions(+), 90 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..42cb8514d3 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2053,9 +2054,56 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
 	 *----------
 	 */
+    HashJoinTable hashtable = hjstate->hj_HashTable;
+
+    hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ *		set up for a series of ExecScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor  = &hashtable->batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool last = false;
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+	last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak,batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2069,63 +2117,118 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
-	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+	HashMemoryChunk next;
+	HashJoinTable   hashtable = hjstate->hj_HashTable;
 
-	for (;;)
+	while (hashtable->current_chunk)
 	{
-		/*
-		 * hj_CurTuple is the address of the tuple last returned from the
-		 * current bucket, or NULL if it's time to start scanning a new
-		 * bucket.
-		 */
-		if (hashTuple != NULL)
-			hashTuple = hashTuple->next.unshared;
-		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+		while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
 		{
-			hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
-			hjstate->hj_CurBucketNo++;
-		}
-		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
-		{
-			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+			HashJoinTuple hashTuple     = (HashJoinTuple) (
+				HASH_CHUNK_DATA(hashtable->current_chunk) +
+					hashtable->current_chunk_idx
+			);
+			MinimalTuple  tuple         = HJTUPLE_MINTUPLE(hashTuple);
+			int           hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			/* next tuple in this chunk */
+			hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
 
-			hashTuple = hashtable->skewBucket[j]->tuples;
-			hjstate->hj_CurSkewBucketNo++;
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+				                      hjstate->hj_HashTupleSlot,
+				                      false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't
+			 * do any qual eval, the caller will, so let's keep it
+			 * parallel to ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
+
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
-		else
-			break;				/* finished all buckets */
 
-		while (hashTuple != NULL)
+		next = hashtable->current_chunk->next.unshared;
+		hashtable->current_chunk     = next;
+		hashtable->current_chunk_idx = 0;
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	dsa_pointer next;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+	/*
+	 * Only one worker should execute this function.
+	 * Since tuples have already been emitted, it is hazardous for workers
+	 * to wait at the batch_barrier again. Instead, all workers except the last
+	 * will detach and the last will conduct this unmatched inner tuple scan.
+	 */
+	Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+	while (accessor->current_chunk)
+	{
+		while (accessor->current_chunk_idx < accessor->current_chunk->used)
 		{
-			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
-			{
-				TupleTableSlot *inntuple;
+			HashJoinTuple hashTuple = (HashJoinTuple) (
+				HASH_CHUNK_DATA(accessor->current_chunk) + accessor->current_chunk_idx
+			);
+			accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
 
-				/* insert hashtable's tuple into exec slot */
-				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
-												 hjstate->hj_HashTupleSlot,
-												 false);	/* do not pfree */
-				econtext->ecxt_innertuple = inntuple;
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
 
-				/*
-				 * Reset temp memory each time; although this function doesn't
-				 * do any qual eval, the caller will, so let's keep it
-				 * parallel to ExecScanHashBucket.
-				 */
-				ResetExprContext(econtext);
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple = ExecStoreMinimalTuple(
+				HJTUPLE_MINTUPLE(hashTuple),
+				hjstate->hj_HashTupleSlot,false);
 
-				hjstate->hj_CurTuple = hashTuple;
-				return true;
-			}
+			/*
+			 * Reset temp memory each time; although this function doesn't
+			 * do any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashTuple->next.unshared;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
 
-		/* allow this loop to be cancellable */
+		next = accessor->current_chunk->next.shared;
+		dsa_free(hashtable->area, accessor->shared_chunk);
+		accessor->shared_chunk = next;
+		accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+		accessor->current_chunk_idx = 0;
+
 		CHECK_FOR_INTERRUPTS();
 	}
 
+	accessor->shared->chunks = InvalidDsaPointer;
 	/*
 	 * no more unmatched tuples
 	 */
@@ -3131,13 +3234,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		/* Detach from the batch we were last working on. */
 		if (BarrierArriveAndDetach(&batch->batch_barrier))
 		{
-			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
-			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
 			{
@@ -3271,6 +3367,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
 	hashtable->current_chunk = NULL;
 	hashtable->current_chunk_shared = InvalidDsaPointer;
 	hashtable->batches[batchno].at_least_one_chunk = false;
+	hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+	hashtable->batches[batchno].current_chunk = NULL;
+	hashtable->batches[batchno].current_chunk_idx = 0;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..7a2b5275bb 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,9 @@
  *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
  *  PHJ_BATCH_LOADING        -- all load the hash table from disk
  *  PHJ_BATCH_PROBING        -- all probe
- *  PHJ_BATCH_DONE           -- end
+
+ *  PHJ_BATCH_DONE           -- queries not requiring inner fill done
+ *  PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
  *
  * Batch 0 is a special case, because it starts out in phase
  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -99,7 +101,9 @@
  * while attached to a barrier, unless the barrier has reached its final
  * state.  In the slightly special case of the per-batch barrier, we return
  * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or
+ * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires
+ * a scan for unmatched inner tuples, without waiting.
  *
  *-------------------------------------------------------------------------
  */
@@ -360,9 +364,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					/* end of batch, or maybe whole join */
 					if (HJ_FILL_INNER(node))
 					{
-						/* set up to scan for unmatched inner tuples */
-						ExecPrepHashTableForUnmatched(node);
-						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						if (parallel)
+						{
+							if (ExecParallelPrepHashTableForUnmatched(node))
+								node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+							else
+								node->hj_JoinState = HJ_NEED_NEW_BATCH;
+						}
+						else
+						{
+							/* set up to scan for unmatched inner tuples */
+							ExecPrepHashTableForUnmatched(node);
+							node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						}
 					}
 					else
 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +469,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 
-					if (parallel)
-					{
-						/*
-						 * Full/right outer joins are currently not supported
-						 * for parallel joins, so we don't need to set the
-						 * match bit.  Experiments show that it's worth
-						 * avoiding the shared memory traffic on large
-						 * systems.
-						 */
-						Assert(!HJ_FILL_INNER(node));
-					}
-					else
-					{
-						/*
-						 * This is really only needed if HJ_FILL_INNER(node),
-						 * but we'll avoid the branch and just set it always.
-						 */
+
+					/*
+					 * This is really only needed if HJ_FILL_INNER(node),
+					 * but we'll avoid the branch and just set it always.
+					 */
+					if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-					}
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +533,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+							   : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * hash table stays alive until everyone's finished
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
-					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_DONE can be reached.
+					 * All attached participants must eventually detach from
+					 * the barrier and one worker must advance the phase
+					 * so that the final phase is reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
-
 				case PHJ_BATCH_DONE:
+					/* Fall through. */
+
+				case PHJ_BATCH_FILL_INNER_DONE:
 
 					/*
 					 * Already done.  Detach and go around again (if any
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index db54a6ba2e..cbc8c2ad83 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root,
 		 * able to properly guarantee uniqueness.  Similarly, we can't handle
 		 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
 		 * extended rows.  Also, the resulting path must not be parameterized.
-		 * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
-		 * Hash, since in that case we're back to a single hash table with a
-		 * single set of match bits for each batch, but that will require
-		 * figuring out a deadlock-free way to wait for the probe to finish.
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * (building the hash table in each backend) because no one process
+			 * has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e6be2b7836..f6f242d806 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_BATCH_LOAD:
 			event_name = "HashBatchLoad";
 			break;
+		case WAIT_EVENT_HASH_BATCH_PROBE:
+			event_name = "HashBatchProbe";
+			break;
 		case WAIT_EVENT_HASH_BUILD_ALLOCATE:
 			event_name = "HashBuildAllocate";
 			break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 3e200e02cc..2e7b0687ef 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -204,6 +204,27 @@ BarrierArriveAndDetach(Barrier *barrier)
 {
 	return BarrierDetachImpl(barrier, true);
 }
+/*
+ * Upon arriving at the barrier, if this worker is not the last worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ */
+bool
+BarrierArriveAndDetachExceptLast(Barrier *barrier)
+{
+	SpinLockAcquire(&barrier->mutex);
+	if (barrier->participants > 1)
+	{
+		--barrier->participants;
+		SpinLockRelease(&barrier->mutex);
+		return false;
+	}
+	Assert(barrier->participants == 1);
+	++barrier->phase;
+	SpinLockRelease(&barrier->mutex);
+	return true;
+}
 
 /*
  * Attach to a barrier.  All waiting participants will now wait for this
@@ -221,7 +242,6 @@ BarrierAttach(Barrier *barrier)
 	++barrier->participants;
 	phase = barrier->phase;
 	SpinLockRelease(&barrier->mutex);
-
 	return phase;
 }
 
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..2af7228ef0 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,14 @@ typedef struct ParallelHashJoinBatchAccessor
 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
 
 	bool		done;			/* flag to remember that a batch is done */
+	/*
+	 * While doing the unmatched inner scan, the assigned worker may emit
+	 * tuples. Thus, we must keep track of where it was in the hashtable
+	 * so it can return to the correct offset within the correct chunk.
+	 */
+	dsa_pointer shared_chunk;
+	HashMemoryChunk current_chunk;
+	size_t current_chunk_idx;
 	SharedTuplestoreAccessor *inner_tuples;
 	SharedTuplestoreAccessor *outer_tuples;
 } ParallelHashJoinBatchAccessor;
@@ -265,7 +273,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATING			1
 #define PHJ_BATCH_LOADING				2
 #define PHJ_BATCH_PROBING				3
-#define PHJ_BATCH_DONE					4
+#define PHJ_BATCH_DONE			        4
+#define PHJ_BATCH_FILL_INNER_DONE		5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECTING		0
@@ -351,6 +360,8 @@ typedef struct HashJoinTableData
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
+	size_t current_chunk_idx; /* index of tuple within current chunk for serial unmatched inner scan */
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac46b4..62d5f1d16b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -856,6 +856,7 @@ typedef enum
 	WAIT_EVENT_HASH_BATCH_ALLOCATE,
 	WAIT_EVENT_HASH_BATCH_ELECT,
 	WAIT_EVENT_HASH_BATCH_LOAD,
+	WAIT_EVENT_HASH_BATCH_PROBE,
 	WAIT_EVENT_HASH_BUILD_ALLOCATE,
 	WAIT_EVENT_HASH_BUILD_ELECT,
 	WAIT_EVENT_HASH_BUILD_HASH_INNER,
diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h
index d71927cc2f..e0de24378b 100644
--- a/src/include/storage/barrier.h
+++ b/src/include/storage/barrier.h
@@ -37,6 +37,7 @@ typedef struct Barrier
 extern void BarrierInit(Barrier *barrier, int num_workers);
 extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
 extern bool BarrierArriveAndDetach(Barrier *barrier);
+extern bool BarrierArriveAndDetachExceptLast(Barrier *barrier);
 extern int	BarrierAttach(Barrier *barrier);
 extern bool BarrierDetach(Barrier *barrier);
 extern int	BarrierPhase(Barrier *barrier);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select  count(*) from simple r full outer join simple s using (id);
  20000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
 rollback to settings;
 -- An full outer join where every record is not matched.
 -- non-parallel
@@ -812,8 +838,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.25.1

