Parallel Hash take II

Started by Thomas Munroover 8 years ago70 messageshackers
Jump to latest
#1Thomas Munro
thomas.munro@gmail.com

Hi hackers,

Here is a new version of my parallel-aware hash join patchset. I've
dropped 'shared' from the feature name and EXPLAIN output since that's
now implied by the word "Parallel" (that only made sense in earlier
versions that had Shared Hash and Parallel Shared Hash, but a Shared
Hash with just one participant building it didn't turn out to be very
useful so I dropped it a few versions ago). I figured for this new
round I should create a new thread, but took the liberty of copying
the CC list from the previous one[1]/messages/by-id/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com.

The main changes are:

1. Implemented the skew optimisation for parallel-aware mode. The
general approach is the same as the regular hash table: insert with a
CAS loop. The details of memory budget management are different
though. It grants chunks of budget to participants as needed even
though allocation is still per-tuple, and it has to deal with
concurrent bucket removal. I removed one level of indirection from
the skew hash table: in this version hashtable->skewBucket is an array
of HashSkewBucket instead of pointers to HashSkewBuckets allocated
separately. That makes the hash table array twice as big but avoids
one pointer hop when probing an active bucket; that refactoring was
not strictly necessary but made the changes to support parallel build
simpler.

2. Simplified costing. There is now just one control knob
"parallel_synchronization_cost", which I charge for each time the
participants will wait for each other at a barrier, to be set high
enough to dissuade the planner from using Parallel Hash for tiny hash
tables that would be faster in a parallel-oblivious hash join.
Earlier ideas about modelling the cost of shared memory access didn't
work out.

Status: I think there are probably some thinkos in the new skew
stuff. I think I need some new ideas about how to refactor things so
that there isn't quite so much "if-shared-then-this-else-that". I
think I should build some kind of test mode to control barriers so
that I can test the permutations of participant arrival phase
exhaustively. I need to propose an empirically derived default for
the GUC. There are several other details I would like to tidy up and
improve. That said, I wanted to post what I have as a checkpoint now
that I have the major remaining piece (skew optimisation) more-or-less
working and the costing at a place that I think make sense.

I attach some queries to exercise various interesting cases. I would
like to get something like these into fast-running regression test
format.

Note that this patch requires the shared record typmod patch[2]/messages/by-id/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com in
theory, since shared hash table tuples might reference bless record
types, but there is no API dependency so you can use this patch set
without applying that one. If anyone knows how to actually provoke a
parallel hash join that puts RECORD types into the hash table, I'd be
very interested to hear about it, but certainly for TPC and similar
testing that other patch set is not necessary.

Of the TPC-H queries, I find that Q3, Q5, Q7, Q8, Q9, Q10, Q12, Q14,
Q16, Q18, Q20 and Q21 make use of Parallel Hash nodes (I tested with
neqjoinsel-fix-v3.patch[3]/messages/by-id/CAEepm=3=NHHko3oOzpik+ggLy17AO+px3rGYrg3x_x05+Br9-A@mail.gmail.com also applied, which avoids some but not all
craziness in Q21). For examples that also include a
parallel-oblivious Hash see Q8 and Q10: in those queries you can see
the planner deciding that it's not worth paying
parallel_synchronization_cost = 10 to load the 25 row "nation" table.
I'll report on performance separately.

[1]: /messages/by-id/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
[2]: /messages/by-id/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
[3]: /messages/by-id/CAEepm=3=NHHko3oOzpik+ggLy17AO+px3rGYrg3x_x05+Br9-A@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v16.patchset.tgzapplication/x-gzip; name=parallel-hash-v16.patchset.tgzDownload+0-2
hj-test-queries.sqlapplication/octet-stream; name=hj-test-queries.sqlDownload
hj-skew.sqlapplication/octet-stream; name=hj-skew.sqlDownload
hj-skew-unmatched.sqlapplication/octet-stream; name=hj-skew-unmatched.sqlDownload
hj-skew-overflow.sqlapplication/octet-stream; name=hj-skew-overflow.sqlDownload
#2Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#1)
Re: Parallel Hash take II

Hi,

On 2017-07-26 20:12:56 +1200, Thomas Munro wrote:

Here is a new version of my parallel-aware hash join patchset.

Yay!

Working on reviewing this. Will send separate emails for individual
patch reviews.

2. Simplified costing. There is now just one control knob
"parallel_synchronization_cost", which I charge for each time the
participants will wait for each other at a barrier, to be set high
enough to dissuade the planner from using Parallel Hash for tiny hash
tables that would be faster in a parallel-oblivious hash join.
Earlier ideas about modelling the cost of shared memory access didn't
work out.

Hm. You say, "didn't work out" - could you expand a bit on that? I'm
quite doubtful that justaccounting for barriers will be good enough.

I'll report on performance separately.

Looking forward to that ;)

Greetings,

Andres Freund

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#2)
Re: Parallel Hash take II

On Tue, Aug 1, 2017 at 9:28 AM, Andres Freund <andres@anarazel.de> wrote:

On 2017-07-26 20:12:56 +1200, Thomas Munro wrote:

2. Simplified costing. There is now just one control knob
"parallel_synchronization_cost", which I charge for each time the
participants will wait for each other at a barrier, to be set high
enough to dissuade the planner from using Parallel Hash for tiny hash
tables that would be faster in a parallel-oblivious hash join.
Earlier ideas about modelling the cost of shared memory access didn't
work out.

Hm. You say, "didn't work out" - could you expand a bit on that? I'm
quite doubtful that justaccounting for barriers will be good enough.

The earlier approach and some variants I played with were based on the
idea that we should try to estimate the cost of using shared memory.
But there's no precedent for costing the cache hierarchy beyond disk
vs memory, and it depends so much on your hardware (NUMA vs UMA) and
the data distribution. I have no doubt that variations in memory
access costs are important (for example, it was data distribution that
determined whether big-cache-oblivious-shared-hash-table or
MonetDB-style cache-aware approach won in that paper I've mentioned
here before[1]http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.225.3495), but it seems like a hard problem and I didn't feel
like it was necessary. Or do you have a different idea here?

Another point is that in the earlier versions I was trying to teach
the planner how to choose among Hash, Shared Hash and Parallel Shared
Hash. The difference in costing between Hash and Shared Hash (one
worker builds, all workers probe) was important and sensitive, because
the only difference between them would be the cost of memory sharing.
When I dropped Shared Hash from the patch set, it no longer seemed
necessary to try to deal with such subtle costing, because Hash and
Parallel Hash (now without the word 'Shared') already have wildly
different costs: the latter is divided over N CPUs. So I felt I could
get away with a much blunter instrument: just something to avoid
parallel build overheads for tiny tables like TPC-H "nation".

I still wanted something that makes intuitive sense and that could be
set using experimental evidence though. Parallel_synchronization_cost
is an estimate of how long the average backend will have to wait for
the last backend to complete the phase and arrive at each barrier.
The most interesting case is the build phase: how long will the the
last backend make us wait before probing can begin? Well, that
depends on the parallel grain. Currently, the ultimate source of all
parallelism in our executor is Parallel Seq Scan and Parallel Index
Scan, and they hand out a page at a time. Of course, any number of
nodes may sit between the hash join and the scan, and one of them
might include a function that sleeps for 100 years in one backend or
performs a join that generates wildly different numbers of tuples in
each backend. I don't know what to do about that, other than to
assume we have perfectly spherical cows and reason on the basis of an
expected parallel grain reaching us from the scans.

One thing to note about parallel_synchronization_cost is that the cost
units, where 1 is traditionally the cost of scanning a page, actually
make *some* kind of sense here, though it's a bit tenuous: the last
worker to complete is the one that scans the final pages, while the
others see the scan finished. What's really wanted here is not simply
page scanning cost but rather a percentage of the total cost that
represents how much extra work the lanterne rouge of backends has to
do.

Two relevant projects here are:

1. David Rowley proposes changing the seq scan grain[2]/messages/by-id/CAKJS1f-XhfQ2-=85wgYo5b3WtEs=ys=2Rsq=NuvnmaV4ZsM1XQ@mail.gmail.com, perhaps
adaptively. I suppose as this number increases the time at which two
workers finish can vary more greatly.
2. The parallel-append project introduces a completely different type
of granularity based on unrelated and separately costed subplans
rather than pages. Perhaps there are things that could be done here
to model the fact that some workers might finish a long time before
others, but I don't know.

Perhaps what parallel hash really needs is not a user-controlled
parallel_synchronization_cost, but some number produced by the planner
to describe the expected distribution of tuple counts over workers.
Armed with something like that and the cost per tuple you might be
able to estimate how long we expect hash join barriers to make you
wait without introducing any new GUCs at all. I thought about some of
these things a bit but it seemed like a big research project of its
own and I was persuaded in an off-list discussion by Robert to try to
find the simplest thing that would avoid parallel-aware hash for
little tables that are already built very cheaply.

[1]: http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.225.3495
[2]: /messages/by-id/CAKJS1f-XhfQ2-=85wgYo5b3WtEs=ys=2Rsq=NuvnmaV4ZsM1XQ@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#1)
Re: Parallel Hash take II

From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Wed 26 Jul 2017 19:58:20 NZST
Subject: [PATCH] Add support for parallel-aware hash joins.

Hi,

WRT the main patch:

- Echoing concerns from other threads (Robert: ping): I'm doubtful that
it makes sense to size the number of parallel workers solely based on
the parallel scan node's size. I don't think it's this patch's job to
change that, but to me it seriously amplifys that - I'd bet there's a
lot of cases with nontrivial joins where the benefit from parallelism
on the join level is bigger than on the scan level itself. And the
number of rows in the upper nodes might also be bigger than on the
scan node level, making it more important to have higher number of
nodes.

- If I understand the code in initial_cost_hashjoin() correctly, we
count the synchronization overhead once, independent of the number of
workers. But on the other hand we calculate the throughput by
dividing by the number of workers. Do you think that's right?

- I haven't really grokked the deadlock issue you address. Could you
expand the comments on that? Possibly somewhere central referenced by
the various parts.

- maybe I'm overly paranoid, but it might not be bad to add some extra
checks for ExecReScanHashJoin ensuring that it doesn't get called when
workers are still doing something.

- seems like you're dereffing tuple unnecessarily here:

+	/*
+	 * If we detached a chain of tuples, transfer them to the main hash table
+	 * or batch storage.
+	 */
+	if (regainable_space > 0)
+	{
+		HashJoinTuple tuple;
+
+		tuple = (HashJoinTuple)
+			dsa_get_address(hashtable->area, detached_chain_shared);
+		ExecHashTransferSkewTuples(hashtable, detached_chain,
+								   detached_chain_shared);
+
+		/* Remove from the total space used. */
+		LWLockAcquire(&hashtable->shared->chunk_lock, LW_EXCLUSIVE);
+		Assert(hashtable->shared->size >= regainable_space);
+		hashtable->shared->size -= regainable_space;
+		LWLockRelease(&hashtable->shared->chunk_lock);
+
+		/*
+		 * If the bucket we removed is the same as the bucket the caller just
+		 * overflowed, then we can forget about the overflowing part of the
+		 * tuple.  It's been moved out of the skew hash table.  Otherwise, the
+		 * caller will call again; eventually we'll either succeed in
+		 * allocating space for the overflow or reach this case.
+		 */
+		if (bucket_to_remove == bucketno)
+		{
+			hashtable->spaceUsedSkew = 0;
+			hashtable->spaceAllowedSkew = 0;
+		}
+	}
- The names here could probably improved some:
+		case WAIT_EVENT_HASH_SHRINKING1:
+			event_name = "Hash/Shrinking1";
+			break;
+		case WAIT_EVENT_HASH_SHRINKING2:
+			event_name = "Hash/Shrinking2";
+			break;
+		case WAIT_EVENT_HASH_SHRINKING3:
+			event_name = "Hash/Shrinking3";
+			break;
+		case WAIT_EVENT_HASH_SHRINKING4:
+			event_name = "Hash/Shrinking4";

- why are we restricting rows_total bit to parallel aware?

+	/*
+	 * If parallel-aware, the executor will also need an estimate of the total
+	 * number of rows expected from all participants so that it can size the
+	 * shared hash table.
+	 */
+	if (best_path->jpath.path.parallel_aware)
+	{
+		hash_plan->plan.parallel_aware = true;
+		hash_plan->rows_total = best_path->inner_rows_total;
+	}
+

- seems we need a few more test - I don't think the existing tests are
properly going to exercise the skew stuff, multiple batches, etc?
This is nontrivial code, I'd really like to see a high test coverage
of the new code.

- might not hurt to reindent before the final submission

- Unsurprisingly, please implement the FIXME ;)

Regards,

Andres

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Robert Haas
robertmhaas@gmail.com
In reply to: Andres Freund (#4)
Re: Parallel Hash take II

On Mon, Jul 31, 2017 at 9:11 PM, Andres Freund <andres@anarazel.de> wrote:

- Echoing concerns from other threads (Robert: ping): I'm doubtful that
it makes sense to size the number of parallel workers solely based on
the parallel scan node's size. I don't think it's this patch's job to
change that, but to me it seriously amplifys that - I'd bet there's a
lot of cases with nontrivial joins where the benefit from parallelism
on the join level is bigger than on the scan level itself. And the
number of rows in the upper nodes might also be bigger than on the
scan node level, making it more important to have higher number of
nodes.

Well, I feel like a broken record here but ... yeah, I agree we need
to improve that. It's probably generally true that the more parallel
operators we add, the more potential benefit there is in doing
something about that problem. But, like you say, not in this patch.

/messages/by-id/CA+TgmoYL-SQZ2gRL2DpenAzOBd5+SW30QB=A4CseWtOgejz4aQ@mail.gmail.com

I think we could improve things significantly by generating multiple
partial paths with different number of parallel workers, instead of
just picking a number of workers based on the table size and going
with it. For that to work, though, you'd need something built into
the costing to discourage picking paths with too many workers. And
you'd need to be OK with planning taking a lot longer when parallelism
is involved, because you'd be carrying around more paths for longer.
There are other problems to solve, too.

I still think, though, that it's highly worthwhile to get at least a
few more parallel operators - and this one in particular - done before
we attack that problem in earnest. Even with a dumb calculation of
the number of workers, this helps a lot.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#4)
Re: Parallel Hash take II

On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <andres@anarazel.de> wrote:

WRT the main patch:

Thanks for the review. I will respond soon, but for now I just wanted
to post a rebased version (no changes) because v16 no longer applies.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v17.patchset.tgzapplication/x-gzip; name=parallel-hash-v17.patchset.tgzDownload
#7Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#6)
Re: Parallel Hash take II

On Wed, Aug 2, 2017 at 10:06 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <andres@anarazel.de> wrote:

WRT the main patch:

Thanks for the review. I will respond soon, but for now I just wanted
to post a rebased version (no changes) because v16 no longer applies.

Rebased with better commit messages. Sorry for the changed patch
names, I switched to using git-format properly... (I'll be posting a
new version with some bigger changes to the 0010 patch and some
answers to good questions you've asked soon.)

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v18.patchset.tgzapplication/x-gzip; name=parallel-hash-v18.patchset.tgzDownload
#8Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#4)
Re: Parallel Hash take II

Here's a new rebased and debugged patch set.

On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <andres@anarazel.de> wrote:

- Echoing concerns from other threads (Robert: ping): I'm doubtful that
it makes sense to size the number of parallel workers solely based on
the parallel scan node's size. I don't think it's this patch's job to
change that, but to me it seriously amplifys that - I'd bet there's a
lot of cases with nontrivial joins where the benefit from parallelism
on the join level is bigger than on the scan level itself. And the
number of rows in the upper nodes might also be bigger than on the
scan node level, making it more important to have higher number of
nodes.

Agreed that this is bogus. The number of workers is really determined
by the outer path (the probe side), except that if the inner path (the
build side) is not big enough to warrant parallel workers at all then
parallelism is inhibited on that side. That prevents small tables
from being loaded by Parallel Hash. That is something we want, but
it's probably not doing it for the right reasons with the right
threshold -- about which more below.

- If I understand the code in initial_cost_hashjoin() correctly, we
count the synchronization overhead once, independent of the number of
workers. But on the other hand we calculate the throughput by
dividing by the number of workers. Do you think that's right?

It's how long you think the average participant will have to wait for
the last participant to arrive, and I think that's mainly determined
by the parallel grain, not the number of workers. If you're a work
that has reached the end of a scan, the best case is that every other
worker has already reached the end too and the worst case is that
another worker read the last granule (currently page) just before you
hit the end, so you'll have to wait for it to process a granule's
worth of work.

To show this I used dtrace to measure the number of microseconds spent
waiting at the barrier before probing while running a 5 million row
self-join 100 times, and got the following histograms:

1 worker:

value ------------- Distribution ------------- count
< 0 | 0
0 |@@@@@@@@@@@@@@@@@@@@@@ 110
20 | 1
40 |@ 5
60 |@@@@@ 24
80 |@@@ 14
100 |@ 5
120 |@@@ 16
140 |@@@ 17
160 |@@ 8
180 | 0

2 workers:

value ------------- Distribution ------------- count
< 0 | 0
0 |@@@@@@@@@@@@@@ 107
20 | 1
40 |@@@ 21
60 |@@@ 25
80 |@@ 16
100 |@@ 14
120 |@@@@@ 38
140 |@@@@@@@ 51
160 |@@@ 20
180 | 3
200 | 1
220 | 3
240 | 0

3 workers:

value ------------- Distribution ------------- count
< 0 | 0
0 |@@@@@@@@@@@ 113
20 |@@ 15
40 |@@@ 29
60 |@@@@ 35
80 |@@@@ 37
100 |@@@@@@ 56
120 |@@@@@ 51
140 |@@@ 31
160 |@@ 21
180 |@ 6

4 workers:

value ------------- Distribution ------------- count
< 0 | 0
0 |@@@@@@@@@@ 121
20 | 4
40 |@@@ 39
60 |@@ 29
80 |@@ 24
100 |@@@@@@@ 88
120 |@@@@@@@ 82
140 |@@@@@ 58
160 |@@ 26
180 |@ 15
200 |@ 9
220 | 4
240 | 1
260 | 0

I didn't know what to expect above my machine's core count of 4, but
this is for 8:

value ------------- Distribution ------------- count
< 0 | 0
0 |@@@@@ 116
20 | 2
40 |@@ 36
60 |@@@ 69
80 |@@@@ 95
100 |@@@@@ 113
120 |@@@ 74
140 |@@@ 71
160 |@@ 44
180 |@@ 36
200 |@ 30
220 |@ 14
240 |@ 18
260 | 8
280 | 3
300 | 4

It's true that the fraction of waits that go into the 0-20us bucket
(because the last to arrive at a barrier doesn't have to wait at all)
decreases as you add more workers, but above 1 worker the main story
is the bell curve (?) we see clustered around 100-120us, and it
doesn't seem to be moving.

If we call the fraction of samples outside the 0-20us bucket
"wait_probability" and call their average wait time
"expected_wait_cost", then one way to estimate this is something like:

wait_probability * expected_wait_cost
= (1 - 1 / participants) * (tuples_per_grain * cost_per_tuple * 0.5)

I don't think we can do that today, because we don't have access to
tuples_per_grain from the subplan. That would in theory come
ultimately from the scan, adjusted as appropriate by selectivity
estimates. The grain could in future be more than one page at a time
as proposed by David Rowley and others, or "it's complicated" for a
Parallel Append. But I'm not sure if that's correct, doable or worth
doing, hence my attempt to provide a single knob to model this for
now.

I did some experiments to find a value of
parallel_synchronization_cost that avoids Parallel Hash when it won't
pay off, like this:

* a "big" table with 1 million rows to be the outer relation
* a "small" table with a range of sizes from 5k to 100k rows to hash
* both tables have a unique integer key "a" and a 60 byte text column "b"
* query (a): SELECT COUNT(*) FROM big JOIN small USING (a)
* query (b): ... WHERE length(small.b) * 2 - len(small.b) = length(small.b)
* work_mem set high enough that we never have multiple batches
* one warmup run and then the median of 3 measurements
* all default except min_parallel_table_scan_size = 0
* 4 core developer machine
* -O2, no asserts

Just to be clear: The following number aren't supposed to be
impressive and are way shorter than the queries that Parallel Hash
feature is really intended to help with. That's because we're
searching for the threshold below which Parallel Hash *doesn't* help,
and that involves running queries where there isn't much to hash. The
times are for the complete query (ie include probing too, not just the
hash table build), and show "parallel-oblivious-hash-join-time ->
parallel-aware-hash-join-time" for queries "a" and "b" on patched
master. I also compared with unpatched master to confirm that the
parallel-oblivious times on the left of the arrows match unpatched
master's, modulo a bit of noise.

1 worker:

5,000 rows hashed: (a) 157ms -> 166ms, (b) 166ms -> 183ms
7,500 rows hashed: (a) 162ms -> 174ms, (b) 176ms -> 182ms
10,000 rows hashed: (a) 161ms -> 170ms, (b) 181ms -> 210ms
12,500 rows hashed: (a) 169ms -> 175ms, (b) 194ms -> 188ms
15,000 rows hashed: (a) 175ms -> 181ms, (b) 199ms -> 195ms
17,500 rows hashed: (a) 173ms -> 175ms, (b) 201ms -> 202ms
20,000 rows hashed: (a) 179ms -> 179ms, (b) 210ms -> 195ms <== a & b threshold
30,000 rows hashed: (a) 196ms -> 192ms, (b) 244ms -> 218ms
40,000 rows hashed: (a) 201ms -> 197ms, (b) 265ms -> 228ms
50,000 rows hashed: (a) 217ms -> 251ms, (b) 294ms -> 249ms
60,000 rows hashed: (a) 228ms -> 222ms, (b) 324ms -> 268ms
70,000 rows hashed: (a) 230ms -> 214ms, (b) 338ms -> 275ms
80,000 rows hashed: (a) 243ms -> 229ms, (b) 366ms -> 291ms
90,000 rows hashed: (a) 256ms -> 239ms, (b) 391ms -> 311ms
100,000 rows hashed: (a) 266ms -> 248ms, (b) 420ms -> 326ms

2 workers:

5,000 rows hashed: (a) 110ms -> 115ms, (b) 118ms -> 127ms
7,500 rows hashed: (a) 115ms -> 128ms, (b) 131ms -> 128ms
10,000 rows hashed: (a) 114ms -> 116ms, (b) 135ms -> 148ms
12,500 rows hashed: (a) 126ms -> 126ms, (b) 145ms -> 131ms
15,000 rows hashed: (a) 134ms -> 142ms, (b) 151ms -> 134ms
17,500 rows hashed: (a) 125ms -> 122ms, (b) 153ms -> 147ms <== a & b threshold
20,000 rows hashed: (a) 126ms -> 124ms, (b) 160ms -> 136ms
30,000 rows hashed: (a) 144ms -> 132ms, (b) 191ms -> 152ms
40,000 rows hashed: (a) 165ms -> 151ms, (b) 213ms -> 158ms
50,000 rows hashed: (a) 161ms -> 143ms, (b) 240ms -> 171ms
60,000 rows hashed: (a) 171ms -> 150ms, (b) 266ms -> 186ms
70,000 rows hashed: (a) 176ms -> 151ms, (b) 283ms -> 190ms
80,000 rows hashed: (a) 181ms -> 156ms, (b) 315ms -> 204ms
90,000 rows hashed: (a) 189ms -> 164ms, (b) 338ms -> 214ms
100,000 rows hashed: (a) 207ms -> 177ms, (b) 362ms -> 232ms

3 workers:

5,000 rows hashed: (a) 90ms -> 103ms, (b) 107ms -> 118ms
7,500 rows hashed: (a) 106ms -> 104ms, (b) 115ms -> 118ms
10,000 rows hashed: (a) 100ms -> 95ms, (b) 121ms -> 110ms <== b threshold
12,500 rows hashed: (a) 103ms -> 120ms, (b) 134ms -> 113ms
15,000 rows hashed: (a) 134ms -> 110ms, (b) 142ms -> 116ms <== a threshold
17,500 rows hashed: (a) 110ms -> 104ms, (b) 146ms -> 123ms
20,000 rows hashed: (a) 107ms -> 103ms, (b) 151ms -> 120ms
30,000 rows hashed: (a) 124ms -> 110ms, (b) 183ms -> 135ms
40,000 rows hashed: (a) 125ms -> 108ms, (b) 209ms -> 137ms
50,000 rows hashed: (a) 133ms -> 115ms, (b) 238ms -> 150ms
60,000 rows hashed: (a) 143ms -> 119ms, (b) 266ms -> 159ms
70,000 rows hashed: (a) 146ms -> 120ms, (b) 288ms -> 165ms
80,000 rows hashed: (a) 150ms -> 129ms, (b) 316ms -> 176ms
90,000 rows hashed: (a) 159ms -> 126ms, (b) 343ms -> 187ms
100,000 rows hashed: (a) 176ms -> 136ms, (b) 370ms -> 195ms

4 workers:

5,000 rows hashed: (a) 93ms -> 103ms, (b) 109ms -> 117ms
7,500 rows hashed: (a) 106ms -> 102ms, (b) 121ms -> 115ms <== b threshold
10,000 rows hashed: (a) 99ms -> 100ms, (b) 126ms -> 113ms
12,500 rows hashed: (a) 107ms -> 102ms, (b) 137ms -> 117ms <== a threshold
15,000 rows hashed: (a) 111ms -> 107ms, (b) 145ms -> 115ms
17,500 rows hashed: (a) 110ms -> 10ms, (b) 151ms -> 118ms
20,000 rows hashed: (a) 108ms -> 103ms, (b) 160ms -> 120ms
30,000 rows hashed: (a) 120ms -> 108ms, (b) 196ms -> 127ms
40,000 rows hashed: (a) 129ms -> 109ms, (b) 225ms -> 134ms
50,000 rows hashed: (a) 140ms -> 121ms, (b) 262ms -> 148ms
60,000 rows hashed: (a) 152ms -> 123ms, (b) 294ms -> 154ms
70,000 rows hashed: (a) 157ms -> 122ms, (b) 322ms -> 165ms
80,000 rows hashed: (a) 154ms -> 138ms, (b) 372ms -> 201ms
90,000 rows hashed: (a) 186ms -> 122ms, (b) 408ms -> 180ms
100,000 rows hashed: (a) 170ms -> 124ms, (b) 421ms -> 186ms

I found that a good value of parallel_synchronization_cost that
enables Parallel Hash somewhere around those thresholds is 250 for
these test queries, so I have set that as the default in the new patch
set.

All of this might be considered moot, because I still needed to frob
min_parallel_table_scan_size to get a Parallel Hash below 90,0000 rows
anyway due to the policies in compute_parallel_worker(). So really
there is no danger of tables like the TPC-H "nation" and "region"
tables being loaded by Parallel Hash even if you set
parallel_synchronization_cost to 0, and probably no reason to worry to
much about its default value for now. It could probably be argued
that we shouldn't have the GUC at all, but at least it provides a
handy way to enable and disable Parallel Hash!

One hidden factor here is that it takes a while for workers to start
up and the leader can scan thousands of rows before they arrive. This
effect will presumably be exaggerated on systems with slow
fork/equivalent (Windows, some commercial Unices IIRC), and minimised
by someone writing a patch to reuse parallel workers. I haven't tried
to investigate that effect because it doesn't seem very interesting or
likely to persist but it may contribute the experimental thresholds I
observed.

- I haven't really grokked the deadlock issue you address. Could you
expand the comments on that? Possibly somewhere central referenced by
the various parts.

The central place is leader_gate.c. What's wrong with the explanation in there?

Let me restate the problem here, and the three solutions I considered:

Problem: The leader must never be allowed to wait for other
participants that have emitted tuples (it doesn't matter whether that
waiting takes the form of latches, condition variables, barriers,
shm_queues or anything else). Any participant that has emitted tuples
might currently be blocked waiting for the leader to drain the tuple
queue, so a deadlock could be created.

Concrete example: In this case, once we get past PHJ_PHASE_PROBING we
have to allow only the leader or the workers to continue. Otherwise
some worker might be trying to finish probing by emitting tuples,
while the leader might be in BarrierWait() waiting for everyone to
finish probing. This problems affects only outer joins (they have
wait to start PHJ_PHASE_UNMATCHED after probing) and multibatch joins
(they wait to be able to load the next batch).

Solution 1: LeaderGate is a simple mechanism for reaching consensus
on whether the leader or a set of workers will be allowed to run after
a certain point, in this case the end of probing. Concretely this
means that either the leader or any workers will drop out early at
that point, leaving nothing left to do. This is made slightly more
complicated by the fact that we don't know up front if there are any
workers yet.

Solution 2: Teach tuple queues to spill to disk instead of blocking
when full. I think this behaviour should probably only be activated
while the leader is running the plan rather than draining tuple
queues; the current block-when-full behaviour would still be
appropriate if the leader is simply unable to drain queues fast
enough. Then the deadlock risk would go away.

Solution 3: An asynchronous executor model where you don't actually
wait synchronously at barriers -- instead you detach and go and do
something else, but come back and reattach when there is progress to
be made. I have some ideas about that but they are dependent on the
async execution project reaching a fairly advanced state first.

When I wrote it, I figured that leader_gate.c was cheap and would do
for now, but I have to admit that it's quite confusing and it sucks
that later batches lose a core. I'm now thinking that 2 may be a
better idea. My first thought is that Gather needs a way to advertise
that it's busy while running the plan, shm_mq needs a slightly
different all-or-nothing nowait mode, and TupleQueue needs to write to
a shared tuplestore or other temp file-backed mechanism when
appropriate. Thoughts?

- maybe I'm overly paranoid, but it might not be bad to add some extra
checks for ExecReScanHashJoin ensuring that it doesn't get called when
workers are still doing something.

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold. I am not sure what it would mean
though and we don't generate any such plans today to my knowledge. It
doesn't seem to make sense for the inner side of Nested Loop to be
partial. Have I missed something here?

It looks like some details may have changed here due to 41b0dd98 and
nearby commits, and I may need to implement at least ReInitializeDSM.

I also need a regression test to hit the rescan but I'm not sure how
to write one currently. In an earlier version of this patch set I
could do it by setting shared_tuple_cost (a GUC I no longer have) to a
negative number, which essentially turned our optimiser into a
pessimiser capable of producing a nested loop that rescans a gather
node, forking workers for every row...

- seems like you're dereffing tuple unnecessarily here:

+               tuple = (HashJoinTuple)
+                       dsa_get_address(hashtable->area, detached_chain_shared);
+               ExecHashTransferSkewTuples(hashtable, detached_chain,

Yes, here lurked a bug, fixed.

- The names here could probably improved some:
+               case WAIT_EVENT_HASH_SHRINKING1:
+                       event_name = "Hash/Shrinking1";
+                       break;
+               case WAIT_EVENT_HASH_SHRINKING2:
+                       event_name = "Hash/Shrinking2";
+                       break;
+               case WAIT_EVENT_HASH_SHRINKING3:
+                       event_name = "Hash/Shrinking3";
+                       break;
+               case WAIT_EVENT_HASH_SHRINKING4:
+                       event_name = "Hash/Shrinking4";

Fixed.

- why are we restricting rows_total bit to parallel aware?

+       /*
+        * If parallel-aware, the executor will also need an estimate of the total
+        * number of rows expected from all participants so that it can size the
+        * shared hash table.
+        */
+       if (best_path->jpath.path.parallel_aware)
+       {
+               hash_plan->plan.parallel_aware = true;
+               hash_plan->rows_total = best_path->inner_rows_total;
+       }
+

I could set it unconditionally and then skip this bit that receives the number:

rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;

Do you think it would be better to push plan_rows_total into Plan instead?

- seems we need a few more test - I don't think the existing tests are
properly going to exercise the skew stuff, multiple batches, etc?
This is nontrivial code, I'd really like to see a high test coverage
of the new code.

I've added some regression tests in a patch to apply before making any
changes. You would have to change the "explain (costs off)" to
"explain analyze" to verify the claims I put in comments about the
number of batches and peak memory usage in the work_mem management
tests. I chose to put them into join.sql, and then a later patch adds
parallel-aware versions. (An alternative would be to put them into
select.sql and select_parallel.sql, but it seemed better to keep the
non-parallel, parallel with parallel-oblivious join and parallel-aware
cases next to each other.)

While testing I found a timing bug that could produce incorrect query
results because of the empty hash table optimisation, because it had
an incorrect value hashtable->totalTuples == 0. Fixed (see code in
the "finish:" case in MultiExecHash()).

Last week I finally figured out a way to test different startup
timings, considering the complexity created by the "flash mob" problem
I described when I first proposed dynamic barriers[1]/messages/by-id/CAEepm=3yJ65sQZUAhfF3S7UfEv83X_rnH5a4-JXmqxGQRQ+7qQ@mail.gmail.com. If you build
with -DBARRIER_DEBUG in the attached patch set you get a new GUC
"barrier_attach_sequence" which you can set like this:

SET barrier_attach_phases = 'HashJoin.barrier:2,7,0';

That list of number tells it which phase each participant should
simulate attaching at. In that example the leader will attach at
phase 2 (PHJ_PHASE_BUILDING), worker 0 will attach at 7
(PHJ_PHASE_RESETTING_BATCH(1)) and worker 1 will attach at 0
(PHJ_PHASE_BEGINNING). Note that *someone* has to start at 0 or bad
things will happen.

Using this technique I can now use simple scripts to test every case
in the switch statements that appear in three places in the patch.
See attached file parallel-hash-attach-phases.sql.

I'm not sure whether, and if so how, to package any such tests for the
regression suite, since they require a special debug build.

Ideally I would also like to find a way to tell Gather not to run the
plan in the leader (a bit like single_copy mode, except allowing
multiple workers to run the plan, and raising an error out if no
workers could be launched).

- might not hurt to reindent before the final submission

Will do.

- Unsurprisingly, please implement the FIXME ;)

This must refer to a note about cleaning up skew buckets after they're
not needed, which I've now done.

Some other things:

Previously I failed to initialise the atomics in the shared skew hash
table correctly, and also I used memset to overwrite atomics when
loading a new batch. This worked on modern systems but would of
course fail when using emulated atomics. Fixed in the attached.

In the process I discovered that initialising and clearing large hash
tables this way is quite a lot slower than memset on my machine under
simple test conditions. I think it might be worth experimenting with
a array-oriented atomic operations that have a specialisation for 0
that just uses memset if it can (something like
pg_atomic_init_u64_array(base, stride, n, 0)). I also think it may be
interesting to parallelise the initialisation and reset of the hash
table, since I've seen cases where I have 7 backends waiting on a
barrier while one initialises a couple of GB of memory for several
seconds. Those are just small optimisations though and I'm not
planning to investigate them until after the basic patch is in
committable form.

[1]: /messages/by-id/CAEepm=3yJ65sQZUAhfF3S7UfEv83X_rnH5a4-JXmqxGQRQ+7qQ@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v19.patchset.tgzapplication/x-gzip; name=parallel-hash-v19.patchset.tgzDownload+0-2
parallel-hash-attach-phases.sqlapplication/octet-stream; name=parallel-hash-attach-phases.sqlDownload
#9Robert Haas
robertmhaas@gmail.com
In reply to: Thomas Munro (#8)
Re: Parallel Hash take II

On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Solution 2: Teach tuple queues to spill to disk instead of blocking
when full. I think this behaviour should probably only be activated
while the leader is running the plan rather than draining tuple
queues; the current block-when-full behaviour would still be
appropriate if the leader is simply unable to drain queues fast
enough. Then the deadlock risk would go away.

When I wrote it, I figured that leader_gate.c was cheap and would do
for now, but I have to admit that it's quite confusing and it sucks
that later batches lose a core. I'm now thinking that 2 may be a
better idea. My first thought is that Gather needs a way to advertise
that it's busy while running the plan, shm_mq needs a slightly
different all-or-nothing nowait mode, and TupleQueue needs to write to
a shared tuplestore or other temp file-backed mechanism when
appropriate. Thoughts?

The problem with solution 2 is that it might lead to either (a)
unbounded amounts of stuff getting spooled to files or (b) small spool
files being repeatedly created and deleted depending on how the leader
is spending its time. If you could spill only when the leader is
actually waiting for the worker, that might be OK.

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold. I am not sure what it would mean
though and we don't generate any such plans today to my knowledge. It
doesn't seem to make sense for the inner side of Nested Loop to be
partial. Have I missed something here?

I bet this could happen, although recent commits have demonstrated
that my knowledge of how PostgreSQL handles rescans is less than
compendious. Suppose there's a Nested Loop below the Gather and above
the Hash Join, implementing a join condition that can't give rise to a
parameterized path, like a.x + b.x = 0.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Thomas Munro
thomas.munro@gmail.com
In reply to: Robert Haas (#9)
Re: Parallel Hash take II

On Sat, Sep 2, 2017 at 5:13 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold. I am not sure what it would mean
though and we don't generate any such plans today to my knowledge. It
doesn't seem to make sense for the inner side of Nested Loop to be
partial. Have I missed something here?

I bet this could happen, although recent commits have demonstrated
that my knowledge of how PostgreSQL handles rescans is less than
compendious. Suppose there's a Nested Loop below the Gather and above
the Hash Join, implementing a join condition that can't give rise to a
parameterized path, like a.x + b.x = 0.

Hmm. I still don't see how that could produce a rescan of a partial
path without an intervening Gather, and I would really like to get to
the bottom of this.

At the risk of mansplaining the code that you wrote and turning out to
be wrong: A Nested Loop can't ever have a partial path on the inner
side. Under certain circumstances it can have a partial path on the
outer side, because its own results are partial, but for each outer
row it needs to do a total (non-partial) scan of the inner side so
that it can reliably find or not find matches. Therefore we'll never
rescan partial paths directly, we'll only ever rescan partial paths
indirectly via a Gatheroid node that will synchronise the rescan of
all children to produce a non-partial result.

There may be more reasons to rescan that I'm not thinking of. But the
whole idea of a rescan seems to make sense only for non-partial paths.
What would it even mean for a worker process to decide to rescan (say)
a Seq Scan without any kind of consensus?

Thought experiment: I suppose we could consider replacing Gather's
clunky shut-down-and-relaunch-workers synchronisation technique with a
new protocol where the Gather node sends a 'rescan!' message to each
worker and then discards their tuples until it receives 'OK, rescan
starts here', and then each parallel-aware node type supplies its own
rescan synchronisation logic as appropriate. For example, Seq Scan
would somehow need to elect one participant to run
heap_parallelscan_reinitialize and others would wait until it has
done. This might not be worth the effort, but thinking about this
problem helped me see that rescan of a partial plan without a Gather
node to coordinate doesn't make any sense.

Am I wrong?

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#11Robert Haas
robertmhaas@gmail.com
In reply to: Thomas Munro (#10)
Re: Parallel Hash take II

On Fri, Sep 1, 2017 at 6:32 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Sat, Sep 2, 2017 at 5:13 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold. I am not sure what it would mean
though and we don't generate any such plans today to my knowledge. It
doesn't seem to make sense for the inner side of Nested Loop to be
partial. Have I missed something here?

I bet this could happen, although recent commits have demonstrated
that my knowledge of how PostgreSQL handles rescans is less than
compendious. Suppose there's a Nested Loop below the Gather and above
the Hash Join, implementing a join condition that can't give rise to a
parameterized path, like a.x + b.x = 0.

Hmm. I still don't see how that could produce a rescan of a partial
path without an intervening Gather, and I would really like to get to
the bottom of this.

I'm thinking about something like this:

Gather
-> Nested Loop
-> Parallel Seq Scan
-> Hash Join
-> Seq Scan
-> Parallel Hash
-> Parallel Seq Scan

The hash join has to be rescanned for every iteration of the nested loop.

Maybe I'm confused.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Thomas Munro
thomas.munro@gmail.com
In reply to: Robert Haas (#11)
Re: Parallel Hash take II

On Sat, Sep 2, 2017 at 10:45 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Fri, Sep 1, 2017 at 6:32 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Sat, Sep 2, 2017 at 5:13 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Aug 31, 2017 at 8:53 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold. I am not sure what it would mean
though and we don't generate any such plans today to my knowledge. It
doesn't seem to make sense for the inner side of Nested Loop to be
partial. Have I missed something here?

I bet this could happen, although recent commits have demonstrated
that my knowledge of how PostgreSQL handles rescans is less than
compendious. Suppose there's a Nested Loop below the Gather and above
the Hash Join, implementing a join condition that can't give rise to a
parameterized path, like a.x + b.x = 0.

Hmm. I still don't see how that could produce a rescan of a partial
path without an intervening Gather, and I would really like to get to
the bottom of this.

I'm thinking about something like this:

Gather
-> Nested Loop
-> Parallel Seq Scan
-> Hash Join
-> Seq Scan
-> Parallel Hash
-> Parallel Seq Scan

The hash join has to be rescanned for every iteration of the nested loop.

I think you mean:

Gather
-> Nested Loop
-> Parallel Seq Scan
-> Parallel Hash Join
-> Parallel Seq Scan
-> Parallel Hash
-> Parallel Seq Scan

... but we can't make plans like that and they would produce nonsense
output. The Nested Loop's inner plan is partial, but
consider_parallel_nestloop only makes plans with parallel-safe but
non-partial ("complete") inner paths.

/*
* consider_parallel_nestloop
* Try to build partial paths for a joinrel by joining a
partial path for the
* outer relation to a complete path for the inner relation.
*

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#13Robert Haas
robertmhaas@gmail.com
In reply to: Thomas Munro (#12)
Re: Parallel Hash take II

On Fri, Sep 1, 2017 at 7:42 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I'm thinking about something like this:

Gather
-> Nested Loop
-> Parallel Seq Scan
-> Hash Join
-> Seq Scan
-> Parallel Hash
-> Parallel Seq Scan

The hash join has to be rescanned for every iteration of the nested loop.

I think you mean:

Gather
-> Nested Loop
-> Parallel Seq Scan
-> Parallel Hash Join
-> Parallel Seq Scan
-> Parallel Hash
-> Parallel Seq Scan

I don't, though, because that's nonsense. Maybe what I wrote is also
nonsense, but it is at least different nonsense.

Let's try it again with some table names:

Gather
-> Nested Loop
-> Parallel Seq Scan on a
-> (Parallel?) Hash Join
-> Seq Scan on b (NOT A PARALLEL SEQ SCAN)
-> Parallel Hash
-> Parallel Seq Scan on c

I argue that this is a potentially valid plan. b, of course, has to
be scanned in its entirety by every worker every time through, which
is why it's not a Parallel Seq Scan, but that requirement does not
apply to c. If we take all the rows in c and stick them into a
DSM-based hash table, we can reuse them every time the hash join is
rescanned and, AFAICS, that should work just fine, and it's probably a
win over letting each worker build a separate copy of the hash table
on c, too.

Of course, there's the "small" problem that I have no idea what to do
if the b-c join is (or becomes) multi-batch. When I was thinking
about this before, I was imagining that this case might Just Work with
your patch provided that you could generate a plan shaped like this,
but now I see that that's not actually true, because of multiple
batches.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Prabhat Sahu
prabhat.sahu@enterprisedb.com
In reply to: Thomas Munro (#8)
Re: Parallel Hash take II

On Thu, Aug 31, 2017 at 6:23 PM, Thomas Munro <thomas.munro@enterprisedb.com

wrote:

Here's a new rebased and debugged patch set.

Hi Thomas,

I have applied the recent patch (v19) and started testing on this feature
and i got a crash with below testcase.

with default setting on "postgres.conf" file

create table tab1 (a int, b text);
create table tab2 (a int, b text);
insert into tab1 (select x, x||'_b' from generate_series(1,200000) x);
insert into tab2 (select x%20000, x%20000||'_b' from
generate_series(1,200000) x);
ANALYZE;
select * from tab1 t1, tab2 t2, tab1 t3 where t1.a = t2.a and t2.b = t3.b
order by 1;

WARNING: terminating connection because of crash of another server process
DETAIL: The postmaster has commanded this server process to roll back the
current transaction and exit, because another server process exited
abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database and
repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!>

Kindly check, if you can reproduce this at your end.

*Thanks & Regards,*

*Prabhat Kumar Sahu*
Mob: 7758988455
Skype ID: prabhat.sahu1984

www.enterprisedb.co <http://www.enterprisedb.com/&gt;m
<http://www.enterprisedb.com/&gt;

#15Prabhat Sahu
prabhat.sahu@enterprisedb.com
In reply to: Prabhat Sahu (#14)
Re: Parallel Hash take II

Hi Thomas,

Setting with lower "shared_buffers" and "work_mem" as below, query getting
crash but able to see explain plan.

shared_buffers = 1MB
work_mem = 1MB
max_parallel_workers_per_gather = 4
max_parallel_workers = 8
enable_mergejoin = off
enable_nestloop = off
enable_hashjoin = on
force_parallel_mode = on
seq_page_cost = 0.1
random_page_cost = 0.1
effective_cache_size = 128MB
parallel_tuple_cost = 0
parallel_setup_cost = 0
parallel_synchronization_cost = 0

CREATE TABLE t1 (a int, b text);
INSERT INTO t1 (SELECT x%20000, x%20000||'_b' FROM
generate_series(1,200000) x);
ANALYZE;

postgres=# explain select * from t1, t1 t2 where t1.a = t2.a;
QUERY PLAN

------------------------------------------------------------
-----------------------------
Gather (cost=2852.86..16362.74 rows=2069147 width=22)
Workers Planned: 1
-> Parallel Hash Join (cost=2852.86..16362.74 rows=1217145 width=22)
Hash Cond: (t1.a = t2.a)
-> Parallel Seq Scan on t1 (cost=0.00..1284.57 rows=117647
width=11)
-> Parallel Hash (cost=1284.57..1284.57 rows=117647 width=11)
-> Parallel Seq Scan on t1 t2 (cost=0.00..1284.57
rows=117647 width=11)
(7 rows)

postgres=# select * from t1, t1 t2 where t1.a = t2.a;
WARNING: terminating connection because of crash of another server process
DETAIL: The postmaster has commanded this server process to roll back the
current transaction and exit, because another server process exited
abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database and
repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!>

-- After assigning more "shared_buffers(10MB)" and "work_mem(10MB)" query
execute successfully.

Kindly check, if you can reproduce this at your end.

*Thanks & Regards,*

*Prabhat Kumar Sahu*
Mob: 7758988455
Skype ID: prabhat.sahu1984

www.enterprisedb.co <http://www.enterprisedb.com/&gt;m
<http://www.enterprisedb.com/&gt;

On Wed, Sep 13, 2017 at 12:34 PM, Prabhat Sahu <
prabhat.sahu@enterprisedb.com> wrote:

Show quoted text

On Thu, Aug 31, 2017 at 6:23 PM, Thomas Munro <
thomas.munro@enterprisedb.com> wrote:

Here's a new rebased and debugged patch set.

Hi Thomas,

I have applied the recent patch (v19) and started testing on this feature
and i got a crash with below testcase.

with default setting on "postgres.conf" file

create table tab1 (a int, b text);
create table tab2 (a int, b text);
insert into tab1 (select x, x||'_b' from generate_series(1,200000) x);
insert into tab2 (select x%20000, x%20000||'_b' from
generate_series(1,200000) x);
ANALYZE;
select * from tab1 t1, tab2 t2, tab1 t3 where t1.a = t2.a and t2.b = t3.b
order by 1;

WARNING: terminating connection because of crash of another server process
DETAIL: The postmaster has commanded this server process to roll back the
current transaction and exit, because another server process exited
abnormally and possibly corrupted shared memory.
HINT: In a moment you should be able to reconnect to the database and
repeat your command.
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
!>

Kindly check, if you can reproduce this at your end.

*Thanks & Regards,*

*Prabhat Kumar Sahu*
Mob: 7758988455
Skype ID: prabhat.sahu1984

www.enterprisedb.co <http://www.enterprisedb.com/&gt;m
<http://www.enterprisedb.com/&gt;

#16Thomas Munro
thomas.munro@gmail.com
In reply to: Prabhat Sahu (#15)
Re: Parallel Hash take II

On Thu, Sep 14, 2017 at 12:51 AM, Prabhat Sahu
<prabhat.sahu@enterprisedb.com> wrote:

Setting with lower "shared_buffers" and "work_mem" as below, query getting crash but able to see explain plan.

Thanks Prabhat. A small thinko in the batch reset code means that it
sometimes thinks the shared skew hash table is present and tries to
probe it after batch 1. I have a fix for that and I will post a new
patch set just as soon as I have a good regression test figured out.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#16)
Re: Parallel Hash take II

On Thu, Sep 14, 2017 at 11:57 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Thu, Sep 14, 2017 at 12:51 AM, Prabhat Sahu
<prabhat.sahu@enterprisedb.com> wrote:

Setting with lower "shared_buffers" and "work_mem" as below, query getting crash but able to see explain plan.

Thanks Prabhat. A small thinko in the batch reset code means that it
sometimes thinks the shared skew hash table is present and tries to
probe it after batch 1. I have a fix for that and I will post a new
patch set just as soon as I have a good regression test figured out.

Fixed in the attached version, by adding a missing
"hashtable->shared->num_skew_buckets = 0;" to ExecHashFreeSkewTable().
I did some incidental tidying of the regression tests, but didn't
manage to find a version of your example small enough to put in a
regression tests. I also discovered some other things:

1. Multi-batch Parallel Hash Join could occasionally produce a
resowner warning about a leaked temporary File associated with
SharedTupleStore objects. Fixed by making sure we call routines that
close all files handles in ExecHashTableDetach().

2. Since last time I tested, a lot fewer TPCH queries choose a
Parallel Hash plan. Not sure why yet. Possibly because Gather Merge
and other things got better. Will investigate.

3. Gather Merge and Parallel Hash Join may have a deadlock problem.
Since Gather Merge needs to block waiting for tuples, but workers wait
for all participants (including the leader) to reach barriers. TPCH
Q18 (with a certain set of indexes and settings, YMMV) has Gather
Merge over Sort over Parallel Hash Join, and although it usually runs
successfully I have observed one deadlock. Ouch. This seems to be a
more fundamental problem than the blocked TupleQueue scenario. Not
sure what to do about that.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v20.patchset.tgzapplication/x-gzip; name=parallel-hash-v20.patchset.tgzDownload+3-0
#18Robert Haas
robertmhaas@gmail.com
In reply to: Thomas Munro (#17)
Re: Parallel Hash take II

On Thu, Sep 14, 2017 at 10:01 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

3. Gather Merge and Parallel Hash Join may have a deadlock problem.
Since Gather Merge needs to block waiting for tuples, but workers wait
for all participants (including the leader) to reach barriers. TPCH
Q18 (with a certain set of indexes and settings, YMMV) has Gather
Merge over Sort over Parallel Hash Join, and although it usually runs
successfully I have observed one deadlock. Ouch. This seems to be a
more fundamental problem than the blocked TupleQueue scenario. Not
sure what to do about that.

Thomas and I spent about an hour and a half brainstorming about this
just now. Parallel query doesn't really have a documented deadlock
avoidance strategy, yet all committed and proposed patches other than
this one manage to avoid deadlock. This one has had a number of
problems crop up in this area, so it struck me that it might be
violating a rule which every other patch was following. I struggled
for a bit and finally managed to articulate what I think the
deadlock-avoidance rule is that is generally followed by other
committed and proposed patches:

<rule>
Once you enter a state in which other participants might wait for you,
you must exit that state before doing anything that might wait for
another participant.
</rule>

From this, it's easy to see that the waits-for graph can't contain any
cycles: if every parallel query node obeys the above rule, then a
given node can have in-arcs or out-arcs, but not both. I also believe
it to be the case that every existing node follows this rule. For
instance, Gather and Gather Merge wait for workers, but they aren't at
that point doing anything that can make the workers wait for them.
Parallel Bitmap Heap Scan waits for the leader to finish building the
bitmap, but that leader never waits for anyone else while building the
bitmap. Parallel Index(-Only) Scan waits for the process advancing
the scan to reach the next page, but that process never waits for any
other while so doing. Other types of parallel nodes -- including the
proposed Parallel Append node, which is an interesting case because
like Parallel Hash it appears in the "middle" of the parallel portion
of the plan tree rather than the root like Gather or the leaves like a
parallel scan -- don't wait at all, except for short
spinlock-protected or LWLock-protected critical sections during which
they surely don't go into any sort of long-term wait (which would be
unacceptable for other reasons anyway).

Parallel hash violates this rule only in the case of a multi-batch
hash join, and for only one reason: to avoid blowing out work_mem.
Since, consistent with resource management decisions elsewhere, each
participant is entitled to an amount of memory equal to work_mem, the
shared hash table can and does use up to (participants * work_mem),
which means that we must wait for everybody to be done with the hash
table for batch N before building the hash table for batch N+1. More
properly, if the hash table for the current batch happens to be
smaller than the absolute maximum amount of memory we can use, we can
build the hash table for the next batch up to the point where all the
memory is used, but must then pause and wait for the old hash table to
go away before continuing. But that means that the process for which
we are waiting violated the rule mentioned above: by not being done
with the memory, it's making other processes wait, and by returning a
tuple, it's allowing other parts of the executor to do arbitrary
computations which can themselves wait. So, kaboom.

One simple and stupid way to avoid this deadlock is to reduce the
memory budget for the shared hash table to work_mem and remove the
barriers that prevent more than one such hash table from existing at a
time. In the worst case, we still use (participants * work_mem),
frequently we'll use less, but there are no longer any waits for
processes that might not even be running the parallel has node
(ignoring the moment the problem of right and full parallel hash
joins, which might need more thought). So no deadlock.

We can do better. First, as long as nbatches == 1, we can use a hash
table of up to size (participants * work_mem); if we have to switch to
multiple batches, then just increase the number of batches enough that
the current memory usage drops below work_mem. Second, following an
idea originally by Ashutosh Bapat whose relevance to this issue Thomas
Munro realized during our discussion, we can make all the batches
small enough to fit in work_mem (rather than participants * work_mem
as the current patch does) and spread them across the workers (in the
style of Parallel Append, including potentially deploying multiple
workers against the same batch if there are fewer batches than
workers). Then, single-batch parallel hash joins use the maximum
allowable memory always, and multi-batch parallel hash joins use the
maximum allowable memory after the first batch. Not perfect, but not
bad, and definitely better than deadlocking. Further refinements
might be possible.

If we don't adopt some approach along these lines, then I think we've
got to articulate some alternative deadlock-avoidance rule and make
sure every parallel query facility follows it. I welcome ideas on
that front, but I don't think the rule mentioned above is a bad one,
and I'd obviously like to minimize the amount of rework that we need
to do. Assuming we do settle on the above rule, it clearly needs to
be documented someplace -- not sure of the place. I think that it
doesn't belong in README.parallel because it's an executor-specific
rule, not necessarily a general rule to which other users of
parallelism must adhere; they can choose their own strategies.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

In reply to: Robert Haas (#18)
Re: Parallel Hash take II

On Mon, Sep 18, 2017 at 1:06 PM, Robert Haas <robertmhaas@gmail.com> wrote:

If we don't adopt some approach along these lines, then I think we've
got to articulate some alternative deadlock-avoidance rule and make
sure every parallel query facility follows it. I welcome ideas on
that front, but I don't think the rule mentioned above is a bad one,
and I'd obviously like to minimize the amount of rework that we need
to do. Assuming we do settle on the above rule, it clearly needs to
be documented someplace -- not sure of the place. I think that it
doesn't belong in README.parallel because it's an executor-specific
rule, not necessarily a general rule to which other users of
parallelism must adhere; they can choose their own strategies.

+1

Graefe's "Query Evaluation Techniques for Large Databases" has several
pages on deadlock avoidance strategies. It was written almost 25 years
ago, but still has some good insights IMV (you'll recall that Graefe
is the author of the Volcano paper; this reference paper seems like
his follow-up). Apparently, deadlock avoidance strategy becomes
important for parallel sort with partitioning. You may be able to get
some ideas from there. And even if you don't, his handling of the
topic is very deliberate and high level, which suggests that ours
should be, too.

--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#20Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#19)
Re: Parallel Hash take II

On Thu, Sep 21, 2017 at 5:49 AM, Peter Geoghegan <pg@bowt.ie> wrote:

Graefe's "Query Evaluation Techniques for Large Databases" has several
pages on deadlock avoidance strategies. It was written almost 25 years
ago, but still has some good insights IMV (you'll recall that Graefe
is the author of the Volcano paper; this reference paper seems like
his follow-up). Apparently, deadlock avoidance strategy becomes
important for parallel sort with partitioning. You may be able to get
some ideas from there. And even if you don't, his handling of the
topic is very deliberate and high level, which suggests that ours
should be, too.

Very interesting and certainly relevant (the parts I've read so far),
though we don't have multiple consumers. Multiplexing one thread so
that it is both a consumer and a producer is an extra twist though.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#21Rushabh Lathia
rushabh.lathia@gmail.com
In reply to: Thomas Munro (#20)
#22Thomas Munro
thomas.munro@gmail.com
In reply to: Rushabh Lathia (#21)
#23Prabhat Sahu
prabhat.sahu@enterprisedb.com
In reply to: Thomas Munro (#22)
#24Thomas Munro
thomas.munro@gmail.com
In reply to: Prabhat Sahu (#23)
#25Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#24)
#26Rushabh Lathia
rushabh.lathia@gmail.com
In reply to: Thomas Munro (#25)
#27Thomas Munro
thomas.munro@gmail.com
In reply to: Rushabh Lathia (#26)
#28Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#2)
#29Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#27)
#30Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#29)
In reply to: Andres Freund (#30)
#32Robert Haas
robertmhaas@gmail.com
In reply to: Andres Freund (#30)
#33Thomas Munro
thomas.munro@gmail.com
In reply to: Robert Haas (#32)
#34Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#31)
#35Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#29)
#36Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#29)
#37Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#36)
#38Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#37)
#39Robert Haas
robertmhaas@gmail.com
In reply to: Andres Freund (#38)
#40Andres Freund
andres@anarazel.de
In reply to: Robert Haas (#39)
In reply to: Andres Freund (#40)
#42Andres Freund
andres@anarazel.de
In reply to: Peter Geoghegan (#41)
#43Robert Haas
robertmhaas@gmail.com
In reply to: Andres Freund (#40)
#44Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#40)
#45Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#41)
#46Thomas Munro
thomas.munro@gmail.com
In reply to: Robert Haas (#43)
#47Andres Freund
andres@anarazel.de
In reply to: Robert Haas (#43)
#48Robert Haas
robertmhaas@gmail.com
In reply to: Andres Freund (#47)
#49Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#37)
In reply to: Andres Freund (#49)
#51Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#37)
#52Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#51)
#53Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#52)
#54Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#53)
#55Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#54)
#56Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#53)
#57Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#53)
#58Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#56)
#59Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#58)
#60Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#58)
#61Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#59)
#62Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#61)
#63Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#61)
#64Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#63)
#65Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#64)
#66Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#65)
#67Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#64)
#68Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#66)
#69Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#67)
#70Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#69)