WIP: [[Parallel] Shared] Hash

Started by Thomas Munroover 9 years ago83 messageshackers
Jump to latest
#1Thomas Munro
thomas.munro@gmail.com

Hi hackers,

In PostgreSQL 9.6, hash joins can be parallelised under certain
conditions, but a copy of the hash table is built in every
participating backend. That means that memory and CPU time are
wasted. In many cases, that's OK: if the hash table contents are
small and cheap to compute, then we don't really care, we're just
happy that the probing can be done in parallel. But in cases where
the hash table is large and/or expensive to build, we could do much
better. I am working on that problem.

To recap the situation in 9.6, a hash join can appear below a Gather
node and it looks much the same as a non-parallel hash join except
that it has a partial outer plan:

-> Hash Join
-> <partial outer plan>
-> Hash
-> <non-partial parallel-safe inner plan>

A partial plan is one that has some kind of 'scatter' operation as its
ultimate source of tuples. Currently the only kind of scatter
operation is a Parallel Seq Scan (but see also the Parallel Index Scan
and Parallel Bitmap Scan proposals). The scatter operation enables
parallelism in all the executor nodes above it, as far as the
enclosing 'gather' operation which must appear somewhere above it.
Currently the only kind of gather operation is a Gather node (but see
also the Gather Merge proposal which adds a new one).

The inner plan is built from a non-partial parallel-safe path and will
be run in every worker.

Note that a Hash Join node in 9.6 isn't parallel-aware itself: it's
not doing anything special at execution time to support parallelism.
The planner has determined that correct partial results will be
produced by this plan, but the executor nodes are blissfully unaware
of parallelism.

PROPOSED NEW PLAN VARIANTS

Shortly I will post a patch which introduces two new hash join plan
variants that are parallel-aware:

1. Parallel Hash Join with Shared Hash

-> Parallel Hash Join
-> <partial outer plan>
-> Shared Hash
-> <non-partial parallel-safe inner plan>

In this case, there is only one copy of the hash table and only one
participant loads it. The other participants wait patiently for one
chosen backend to finish building the hash table, and then they all
wake up and probe.

Call the number of participants P, being the number of workers + 1
(for the leader). Compared to a non-shared hash plan, we avoid
wasting CPU and IO resources running P copies of the inner plan in
parallel (something that is not well captured in our costing model for
parallel query today), and we can allow ourselves to use a hash table
P times larger while sticking to the same overall space target of
work_mem * P.

2. Parallel Hash Join with Parallel Shared Hash

-> Parallel Hash Join
-> <partial outer plan>
-> Parallel Shared Hash
-> <partial inner plan>

In this case, the inner plan is run in parallel by all participants.
We have the advantages of a shared hash table as described above, and
now we can also divide the work of running the inner plan and hashing
the resulting tuples by P participants. Note that Parallel Shared
Hash is acting as a special kind of gather operation that is the
counterpart to the scatter operation contained in the inner plan.

PERFORMANCE

So far I have been unable to measure any performance degradation
compared with unpatched master for hash joins with non-shared hash.
That's good because it means that I didn't slow existing plans down
when I introduced a bunch of conditional branches to existing hash
join code.

Laptop testing shows greater than 2x speedups on several of the TPC-H
queries with single batches, and no slowdowns. I will post test
numbers on big rig hardware in the coming weeks when I have the
batching code in more complete and stable shape.

IMPLEMENTATION

I have taken the approach of extending the existing hash join
algorithm, rather than introducing separate hash join executor nodes
or a fundamentally different algorithm. Here's a short description of
what the patch does:

1. SHARED HASH TABLE

To share data between participants, the patch uses two other patches I
have proposed: DSA areas[1]/messages/by-id/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com, which provide a higher level interface
to DSM segments to make programming with processes a little more like
programming with threads, and in particular a per-parallel-query DSA
area[2]/messages/by-id/CAEepm=0HmRefi1+xDJ99Gj5APHr8Qr05KZtAxrMj8b+ay3o6sA@mail.gmail.com that is made available for any executor node that needs some
shared work space.

The patch uses atomic operations to push tuples into the hash table
buckets while building, rehashing and loading, and then the hash table
is immutable during probing (except for match flags used to implement
outer joins). The existing memory chunk design is retained for dense
allocation of tuples, which provides a convenient way to rehash the
table when its size changes.

2. WORK COORDINATION

To coordinate parallel work, this patch uses two other patches:
barriers[3]/messages/by-id/CAEepm=2_y7oi01OjA_wLvYcWMc9_d=LaoxrY3eiROCZkB_qakA@mail.gmail.com, to implement a 'barrier' or 'phaser' synchronisation
primitive, and those in turn use the condition variables proposed by
Robert Haas.

Barriers provide a way for participants to break work up into phases
that they unanimously agree to enter together, which is a basic
requirement for parallelising hash joins. It is not safe to insert
into the hash table until exactly one participant has created it; it
is not safe to probe the hash table until all participants have
finished inserting into it; it is not safe to scan it for unmatched
tuples until all participants have finished probing it; it is not safe
to discard it and start loading the next batch until ... you get the
idea. You could also construct appropriate synchronisation using
various other interlocking primitives or flow control systems, but
fundamentally these wait points would exist at some level, and I think
this way is quite clean and simple. YMMV.

If we had exactly W workers and the leader didn't participate, then we
could use a simple simple pthread- or MPI-style barrier without an
explicit notion of 'phase'. We would simply take the existing hash
join code, add the shared hash table, add barrier waits at various
points and make sure that all participants always hit all of those
points in the same order, and it should All Just Work. But we have a
variable party size and a dual-role leader process, and I want to
highlight the specific problems that causes here because they increase
the patch size significantly:

Problem 1: We don't know how many workers will actually start. We
know how many were planned, but at execution time we may have
exhausted limits and actually get a smaller number. So we can't use
"static" barriers like the classic barriers in POSIX or MPI where the
group size is known up front. We need "dynamic" barriers with attach
and detach operations. As soon as you have varying party size you
need some kind of explicit model of the current phase, so that a new
participant can know what to do when it joins. For that reason, this
patch uses a phase number to track progress through the parallel hash
join. See MultiExecHash and ExecHashJoin which have switch statements
allowing a newly joined participant to synchronise their own state
machine and program counter with the phase.

Problem 2: One participant is not like the others: Gather may or may
not decide to run its subplan directly if the worker processes aren't
producing any tuples (and the proposed Gather Merge is the same). The
problem is that it also needs to consume tuples from the fixed-size
queues of the regular workers. A deadlock could arise if the leader's
plan blocks waiting for other participants while another participant
has filled its output queue and is waiting for the leader to consume.
One way to avoid such deadlocks is to follow the rule that the leader
should never wait for other participants if there is any possibility
that they have emitted tuples. The simplest way to do that would be
to have shared hash plans refuse to run in the leader by returning
NULL to signal the end of this partial tuple stream, but then we'd
lose a CPU compared to non-shared hash plans. The latest point the
leader can exit while respecting that rule is at the end of probing
the first batch. That is the approach taken by the patch currently.
See ExecHashCheckForEarlyExit for logic and discussion. It would be
better to be able to use the leader in later batches too, but as far
as I can see that'd require changes that are out of scope for this
patch. One idea would be an executor protocol change allowing plans
running in the leader to detach and yield, saying 'I have no further
tuples right now, but I'm not finished; try again later', and then
reattach when you call it back. Clearly that sails close to
asynchronous execution territory.

Problem 3: If the leader drops out after the first batch to solve
problem 2, then it may leave behind batch files which must be
processed by other participants. I had originally planned to defer
work on batch file sharing until a later iteration, thinking that it
would be a nice performance improvement to redistribute work from
uneven batch files, but it turns out to be necessary for correct
results because of participants exiting early. I am working on a very
simple batch sharing system to start with... Participants still
generate their own batch files, and then new operations BufFileExport
and BufFileImport are used to grant read-only access to the BufFile to
other participants. Each participant reads its own batch files
entirely and then tries to read from every other participant's batch
files until they are all exhausted, using a shared read head. The
per-tuple locking granularity, extra seeking and needless buffering in
every backend on batch file reads aren't great, and I'm still figuring
out temporary file cleanup/ownership semantics. There may be an
opportunity to make use of 'unified' BufFile concepts from Peter
Geoghegan's work, or create some new reusable shared tuple spilling
infrastructure.

3. COSTING

For now, I have introduced a GUC called cpu_shared_tuple_cost which
provides a straw-man model of the overhead of exchanging tuples via a
shared hash table, and the extra process coordination required. If
it's zero then a non-shared hash plan (ie multiple copies) has the
same cost as a shared hash plan, even though the non-shared hash plan
wastefully runs P copies of the plan. If cost represents runtime and
and we assume perfectly spherical cows running without interference
from each other, that makes some kind of sense, but it doesn't account
for the wasted resources and contention caused by running the same
plan in parallel. I don't know what to do about that yet. If
cpu_shared_tuple_cost is a positive number, as it probably should be
(more on that later), then shared hash tables look more expensive than
non-shared ones, which is technically true (CPU cache sharing etc) but
unhelpful because what you lose there you tend to gain by not running
all those plans in parallel. In other words cpu_shared_tuple_cost
doesn't really model the cost situation at all well, but it's a useful
GUC for development purposes for now as positive and negative numbers
can be used to turn the feature on and off for testing... As for
work_mem, it seems to me that 9.6 already established that work_mem is
a per participant limit, and it would be only fair to let a shared
plan use a total of work_mem * P too. I am still working on work_mem
accounting and reporting. Accounting for the parallelism in parallel
shared hash plans is easy though: their estimated tuple count is
already divided by P in the underlying partial path, and that is a
fairly accurate characterisation of what's going to happen at
execution time: it's often going to go a lot faster, and those plans
are the real goal of this work.

STATUS

Obviously this is a work in progress. I am actively working on the following:

* rescan
* batch number increases
* skew buckets
* costing model and policy/accounting for work_mem
* shared batch file reading
* preloading next batch
* debugging and testing
* tidying and refactoring

The basic approach is visible and simple cases are working though, so
I am submitting this WIP work for a round of review in the current
commitfest and hoping to get some feedback and ideas. I will post the
patch in a follow-up email shortly... Thanks for reading!

[1]: /messages/by-id/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com
[2]: /messages/by-id/CAEepm=0HmRefi1+xDJ99Gj5APHr8Qr05KZtAxrMj8b+ay3o6sA@mail.gmail.com
[3]: /messages/by-id/CAEepm=2_y7oi01OjA_wLvYcWMc9_d=LaoxrY3eiROCZkB_qakA@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#2Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#1)
Re: WIP: [[Parallel] Shared] Hash

Thomas Munro <thomas.munro@enterprisedb.com> wrote:

The basic approach is visible and simple cases are working though, so
I am submitting this WIP work for a round of review in the current
commitfest and hoping to get some feedback and ideas. I will post the
patch in a follow-up email shortly...

Aloha,

Please find a WIP patch attached. Everything related to batch reading
is not currently in a working state, which breaks multi-batch joins,
but many single batch cases work correctly. In an earlier version I
had multi-batch joins working but was before I started tackling
problems 2 and 3 listed in my earlier message. There is some error
handling and resource cleanup missing, and doubtless some cases not
handled correctly. But I thought it would be good to share this
development snapshot for discussion, so I'm posting this as is, and
will post an updated version when I've straightened out the batching
code some more.

To apply parallel-hash-v1, first apply the following patches, in this order:

condition-variable-v3.patch [1]/messages/by-id/CA+Tgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr=C56Xng@mail.gmail.com
remove-useless-barrier-header-v2.patch [2]/messages/by-id/CAEepm=1wrrzxh=SRCF_Hk4SZQ9BULy1vWsicx0EbgUf0B85vZQ@mail.gmail.com
barrier-v3.patch [2]/messages/by-id/CAEepm=1wrrzxh=SRCF_Hk4SZQ9BULy1vWsicx0EbgUf0B85vZQ@mail.gmail.com
dsa-v4.patch [3]/messages/by-id/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com
dsa-area-for-executor-v1.patch [4]/messages/by-id/CAEepm=0HmRefi1+xDJ99Gj5APHr8Qr05KZtAxrMj8b+ay3o6sA@mail.gmail.com

When applying dsa-v4 on top of barrier-v3, it will reject a hunk in
src/backend/storage/ipc/Makefile where they both add their object
file. Simply add dsa.o to OBJS manually.

Then you can apply parallel-hash-v1.patch, which is attached to this message.

[1]: /messages/by-id/CA+Tgmoaj2aPti0yho7FeEf2qt-JgQPRWb0gci_o1Hfr=C56Xng@mail.gmail.com
[2]: /messages/by-id/CAEepm=1wrrzxh=SRCF_Hk4SZQ9BULy1vWsicx0EbgUf0B85vZQ@mail.gmail.com
[3]: /messages/by-id/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com
[4]: /messages/by-id/CAEepm=0HmRefi1+xDJ99Gj5APHr8Qr05KZtAxrMj8b+ay3o6sA@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v1.patchapplication/octet-stream; name=parallel-hash-v1.patchDownload+2242-174
#3Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#2)
Re: WIP: [[Parallel] Shared] Hash

On Tue, Nov 1, 2016 at 5:33 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Please find a WIP patch attached. Everything related to batch reading
is not currently in a working state, which breaks multi-batch joins,
but many single batch cases work correctly. In an earlier version I
had multi-batch joins working but was before I started tackling
problems 2 and 3 listed in my earlier message.

Here is a better version with code to handle multi-batch joins. The
BufFile sharing approach to reading other participants' batch files is
a straw-man (perhaps what we really want would look more like a shared
tuplestore?), but solves the immediate problem I described earlier so
I can focus on other aspects of the problem. There may be some issues
with cleanup though, more on that soon.

Here's a summary of how this patch chops the hash join up into phases.
The 'phase' is an integer that encodes the step we're up to in the
algorithm, including the current batch number, and I represent that
with macros like PHJ_PHASE_HASHING and PHJ_PHASE_PROBING_BATCH(42).
Each phase is either serial, meaning that one participant does
something special, or parallel meaning that all participants do the
same thing. It goes like this:

* PHJ_PHASE_INIT
The initial phase established by the leader before launching workers.

* PHJ_PHASE_CREATING
Serial: One participant creates the hash table.

* PHJ_PHASE_HASHING
Serial or parallel: Depending on plan, one or all participants
execute the inner plan to completion, building the hash table for
batch 0 and possibly writing tuples to batch files on disk for future
batches.

* PHJ_PHASE_RESIZING
Serial: One participant resizes the hash table if necessary.

* PHJ_PHASE_REBUCKETING
Parallel: If the hash table was resized, all participants rehash all
the tuples in it and insert them into the buckets of the new larger
hash table.

* PHJ_PHASE_PROBING_BATCH(0)
Parallel: All participants execute the outer plan to completion. For
each tuple they either probe the hash table if it's for the current
batch, or write it out to a batch file if it's for a future batch.
For each tuple matched in the hash table, they set the matched bit.
When they are finished probing batch 0, they also preload tuples from
inner batch 1 into a secondary hash table until work_mem is exhausted
(note that at this time work_mem is occupied by the primary hash
table: this is just a way to use any remaining work_mem and extract a
little bit more parallelism, since otherwise every participant would
be waiting for all participants to finish probing; instead we wait for
all paticipants to finish probing AND for spare work_mem to run out).

* PHJ_PHASE_UNMATCHED_BATCH(0)
Parallel: For right/full joins, all participants then scan the hash
table looking for unmatched tuples.

... now we are ready for batch 1 ...

* PHJ_PHASE_PROMOTING_BATCH(1)
Serial: One participant promotes the secondary hash table to become
the new primary hash table.

* PHJ_PHASE_LOADING_BATCH(1)
Parallel: All participants finish loading inner batch 1 into the hash
table (work that was started in the previous probing phase).

* PHJ_PHASE_PREPARING_BATCH(1)
Serial: One participant resets the batch reading heads, so that we
are ready to read from outer batch 1, and inner batch 2.

* PHJ_PHASE_PROBING_BATCH(1)
Parallel: All participants read from outer batch 1 to probe the hash
table, then read from inner batch 2 to preload tuples into the
secondary hash table.

* PHJ_PHASE_UNMATCHED_BATCH(1)
Parallel: For right/full joins, all participants then scan the hash
table looking for unmatched tuples.

... now we are ready for batch 2 ...

Then all participants synchronise a final time to enter batch
PHJ_PHASE_PROMOTING_BATCH(nbatch), which is one past the end and is
the point at which it is safe to clean up. (There may be an
optimisation where I can clean up after the last participant detaches
instead, more on that soon).

Obviously I'm actively working on developing and stabilising all this.
Some of the things I'm working on are: work_mem accounting, batch
increases, rescans and figuring out if the resource management for
those BufFiles is going to work. There are quite a lot of edge cases
some of which I'm still figuring out, but I feel like this approach is
workable. At this stage I want to share what I'm doing to see if
others have feedback, ideas, blood curdling screams of horror, etc. I
will have better patches and a set of test queries soon. Thanks for
reading.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v2.patchapplication/octet-stream; name=parallel-hash-v2.patchDownload+2360-231
#4Haribabu Kommi
kommi.haribabu@gmail.com
In reply to: Thomas Munro (#3)
Re: WIP: [[Parallel] Shared] Hash

On Thu, Nov 3, 2016 at 4:19 PM, Thomas Munro <thomas.munro@enterprisedb.com>
wrote:

Obviously I'm actively working on developing and stabilising all this.
Some of the things I'm working on are: work_mem accounting, batch
increases, rescans and figuring out if the resource management for
those BufFiles is going to work. There are quite a lot of edge cases
some of which I'm still figuring out, but I feel like this approach is
workable. At this stage I want to share what I'm doing to see if
others have feedback, ideas, blood curdling screams of horror, etc. I
will have better patches and a set of test queries soon. Thanks for
reading.

This patch doesn't receive any review. Patch is not applying properly to
HEAD.
Moved to next CF with "waiting on author" status.

Regards,
Hari Babu
Fujitsu Australia

#5Thomas Munro
thomas.munro@gmail.com
In reply to: Haribabu Kommi (#4)
Re: WIP: [[Parallel] Shared] Hash

On Sat, Dec 3, 2016 at 1:38 AM, Haribabu Kommi <kommi.haribabu@gmail.com> wrote:

Moved to next CF with "waiting on author" status.

Unfortunately it's been a bit trickier than I anticipated to get the
interprocess batch file sharing and hash table shrinking working
correctly and I don't yet have a new patch in good enough shape to
post in time for the January CF. More soon.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

In reply to: Thomas Munro (#5)
Re: WIP: [[Parallel] Shared] Hash

On Sat, Dec 31, 2016 at 2:52 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Unfortunately it's been a bit trickier than I anticipated to get the
interprocess batch file sharing and hash table shrinking working
correctly and I don't yet have a new patch in good enough shape to
post in time for the January CF. More soon.

I noticed a bug in your latest revision:

+   /*
+    * In HJ_NEED_NEW_OUTER, we already selected the current inner batch for
+    * reading from.  If there is a shared hash table, we may have already
+    * partially loaded the hash table in ExecHashJoinPreloadNextBatch.
+    */
+   Assert(hashtable->batch_reader.batchno = curbatch);
+   Assert(hashtable->batch_reader.inner);

Obviously this isn't supposed to be an assignment.

--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#6)
Re: WIP: [[Parallel] Shared] Hash

On Mon, Jan 2, 2017 at 3:17 PM, Peter Geoghegan <pg@heroku.com> wrote:

I noticed a bug in your latest revision:

+   /*
+    * In HJ_NEED_NEW_OUTER, we already selected the current inner batch for
+    * reading from.  If there is a shared hash table, we may have already
+    * partially loaded the hash table in ExecHashJoinPreloadNextBatch.
+    */
+   Assert(hashtable->batch_reader.batchno = curbatch);
+   Assert(hashtable->batch_reader.inner);

Obviously this isn't supposed to be an assignment.

Right, thanks! I will post a new rebased version soon with that and
some other nearby problems fixed.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#7)
Re: WIP: [[Parallel] Shared] Hash

On Tue, Jan 3, 2017 at 10:53 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I will post a new rebased version soon with that and
some other nearby problems fixed.

Here is a new WIP patch. I have plenty of things to tidy up (see note
at end), but the main ideas are now pretty clear and I'd appreciate
some feedback. The main changes since the last patch, other than
debugging, are:

* the number of batches now increases if work_mem would be exceeded;
the work of 'shrinking' the hash table in memory in that case is done
in parallel

* work_mem accounting is done at chunk level, instead of tuples

* interlocking has been rethought

Previously, I had some ideas about using some lock free tricks for
managing chunks of memory, but you may be relieved to hear that I
abandoned those plans. Now, atomic ops are used only for one thing:
pushing tuples into the shared hash table buckets. An LWLock called
'chunk_lock' protects various linked lists of chunks of memory, and
also the shared work_mem accounting. The idea is that backends can
work independently on HASH_CHUNK_SIZE blocks of tuples at a time in
between needing to acquire that lock briefly. Also, there is now a
second barrier, used to coordinate hash table shrinking. This can
happen any number of times during PHJ_PHASE_HASHING and
PHJ_PHASE_LOADING_BATCH(n) phases as required to stay under work_mem,
so it needed to be a separate barrier.

The communication in this patch is a bit more complicated than other
nearby parallel query projects I've looked at; probably the worst bit
is the leader deadlock avoidance stuff (see
ExecHashCheckForEarlyExit), and the second worst bit is probably the
switch statements for allowing participants to show up late and get in
sync, which makes that other problem even more annoying; without those
problems and with just the right kind of reusable shared tuplestore,
this would be a vastly simpler patch. Those are not really
fundamental problems of parallel joins using a shared hash tables, but
they're problems I don't have a better solution to right now.

Stepping back a bit, I am aware of the following approaches to hash
join parallelism:

1. Run the inner plan and build a private hash table in each
participant, and then scatter the outer plan arbitrarily across
participants. This is what 9.6 does, and it's a good plan for small
hash tables with fast inner plans, but a terrible plan for expensive
or large inner plans. Communication overhead: zero; CPU overhead:
runs the inner plan in k workers simultaneously; memory overhead:
builds k copies of the hashtable; disk overhead: may need to spill k
copies of all batches to disk if work_mem exceeded; restrictions:
Can't do right/full joins because no shared 'matched' flags.

2. Run a partition-wise hash join[1]/messages/by-id/CAFjFpRfQ8GrQvzp3jA2wnLqrHmaXna-urjm_UY9BqXj=EaDTSA@mail.gmail.com. Communication overhead: zero;
CPU overhead: zero; memory overhead: zero; disk overhead: zero;
restrictions: the schema must include compatible partition keys, and
potential parallelism is limited by the number of partitions.

3. Repartition the data on the fly, and then run a partition-wise
hash join. Communication overhead: every tuple on at least one and
possibly both sides must be rerouted to the correct participant; CPU
overhead: zero, once repartitioning is done; memory overhead: none;
disk overhead: may need to spill partitions to disk if work_mem is
exceeded

4. Scatter both the inner and outer plans arbitrarily across
participants (ie uncorrelated partitioning), and build a shared hash
table. Communication overhead: synchronisation of build/probe phases,
but no tuple rerouting; CPU overhead: none; memory overhead: none;
disk overhead: may need to spill batches to disk; restrictions: none
in general, but currently we have to drop the leader after the first
batch of a multi-batch join due to our consumer/producer leader
problem mentioned in earlier messages.

We have 1. This proposal aims to provide 4. It seems we have 2 on
the way (that technique works for all 3 join algorithms without any
changes to the join operators and looks best by any measure, but is
limited by the user's schema, ie takes careful planning on the user's
part instead of potentially helping any join). Other databases
including SQL Server offer 3. I suspect that 4 is probably a better
fit than 3 for Postgres today, because the communication overhead of
shovelling nearly all tuples through extra tuple queues to route them
to the right hash table would surely be very high, though I can see
that it's very attractive to have a reusable tuple repartitioning
operator and then run k disjoint communication-free joins (again,
without code change to the join operator, and to the benefit of all
join operators).

About the shared batch reading code: this patch modifies BufFile so
that a temporary file can be shared read-only with other participants,
and then introduces a mechanism for coordinating shared reads. Each
worker starts out reading all the tuples from the file that it wrote,
before attempting to steal tuples from the files written by other
participants, until there are none left anywhere. In the best case
they all write out and then read back in just their own files with
minimal contention, and contention rises as tuples are less evenly
distributed among participants, but we never quite get the best case
because the leader always leaves behind a bunch of batches for the
others to deal with when it quits early. Maybe I should separate all
the batch reader stuff into another patch so it doesn't clutter the
hash join code up so much? I will start reviewing Parallel Tuplesort
shortly, which includes some related ideas.

Some assorted notes on the status: I need to do some thinking about
the file cleanup logic: both explicit deletes at the earliest possible
time, and failure/error paths. Currently the creator of each file is
responsible for cleaning it up, but I guess if the creator aborts
early the file disappears underneath the others' feet, and then I
guess they might raise a confusing error report that races against the
root cause error report; I'm looking into that. Rescans and skew
buckets not finished yet. The new chunk-queue based
ExecScanHashTableForUnmatched isn't tested yet (it replaces and
earlier version that was doing a bucket-by-bucket parallel scan).
There are several places where I haven't changed the private hash
table code to match the shared version because I'm not sure about
that, in particular the idea of chunk-based accounting (which happens
to be convenient for this code, but I also believe it to be more
correct). I'm still trying to decide how to report the hash table
tuple count and size: possibly the grand totals. Generally I need to
do some tidying and provide a suite of queries that hits interesting
cases. I hope to move on these things fairly quickly now that I've
got the hash table resizing and batch sharing stuff working (a puzzle
that kept me very busy for a while) though I'm taking a break for a
bit to do some reviewing.

The test query I've been looking at recently is TPCH Q9. With scale
1GB and work_mem = 64KB, I get a query plan that includes three
different variants of Hash node: Hash (run in every backend, duplicate
hash tables), Shared Hash (run in just one backend, but allowed to use
the sum of work_mem of all the backends, so usually wins by avoiding
batching), and Parallel Shared Hash (run in parallel and using sum of
work_mem). As an anecdatum, I see around 2.5x speedup against master,
using only 2 workers in both cases, though it seems to be bimodal,
either 2x or 2.8x, which I expect has something to do with that leader
exit stuff and I'm looking into that.. More on performance soon.

Thanks for reading!

[1]: /messages/by-id/CAFjFpRfQ8GrQvzp3jA2wnLqrHmaXna-urjm_UY9BqXj=EaDTSA@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-hash-v3.patchapplication/octet-stream; name=parallel-hash-v3.patchDownload+3157-294
#9Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#8)
Re: WIP: [[Parallel] Shared] Hash

On Sat, Jan 7, 2017 at 9:01 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Tue, Jan 3, 2017 at 10:53 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I will post a new rebased version soon with that and
some other nearby problems fixed.

Here is a new WIP patch.

I forgot to mention: this applies on top of barrier-v5.patch, over here:

/messages/by-id/CAEepm=3g3EC734kgriWseiJPfUQZeoMWdhAfzOc0ecewAa5uXg@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#8)
Re: WIP: [[Parallel] Shared] Hash

On Sat, Jan 7, 2017 at 9:01 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Tue, Jan 3, 2017 at 10:53 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

I will post a new rebased version soon with that and
some other nearby problems fixed.

Here is a new WIP patch.

To make this easier to understand and harmonise the logic used in a
few places, I'm now planning to chop it up into a patch series,
probably something like this:

1. Change existing hash join code to use chunk-based accounting
2. Change existing hash join code to use a new interface for dealing
with batches
3. Add shared hash join support, single batch only
4. Add components for doing shared batch reading (unused)
5. Add multi-batch shared hash join support

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

In reply to: Thomas Munro (#8)
Re: WIP: [[Parallel] Shared] Hash

On Fri, Jan 6, 2017 at 12:01 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Here is a new WIP patch. I have plenty of things to tidy up (see note
at end), but the main ideas are now pretty clear and I'd appreciate
some feedback.

I have some review feedback for your V3. I've chosen to start with the
buffile.c stuff, since of course it might share something with my
parallel tuplesort patch. This isn't comprehensive, but I will have
more comprehensive feedback soon.

I'm not surprised that you've generally chosen to make shared BufFile
management as simple as possible, with no special infrastructure other
than the ability to hold open other backend temp files concurrently
within a worker, and no writing to another worker's temp file, or
shared read pointer. As you put it, everything is immutable. I
couldn't see much opportunity for adding a lot of infrastructure that
wasn't written explicitly as parallel hash join code/infrastructure.
My sense is that that was a good decision. I doubted that you'd ever
want some advanced, generic shared BufFile thing with multiple read
pointers, built-in cache coherency, etc. (Robert seemed to think that
you'd go that way, though.)

Anyway, some more specific observations:

* ISTM that this is the wrong thing for shared BufFiles:

+BufFile *
+BufFileImport(BufFileDescriptor *descriptor)
+{

...

+ file->isInterXact = true; /* prevent cleanup by this backend */

There is only one user of isInterXact = true BufFiles at present,
tuplestore.c. It, in turn, only does so for cases that require
persistent tuple stores. A quick audit of these tuplestore.c callers
show this to just be cursor support code within portalmem.c. Here is
the relevant tuplestore_begin_heap() rule that that code adheres to,
unlike the code I've quoted above:

* interXact: if true, the files used for on-disk storage persist beyond the
* end of the current transaction. NOTE: It's the caller's responsibility to
* create such a tuplestore in a memory context and resource owner that will
* also survive transaction boundaries, and to ensure the tuplestore is closed
* when it's no longer wanted.

I don't think it's right for buffile.c to know anything about file
paths directly -- I'd say that that's a modularity violation.
PathNameOpenFile() is called by very few callers at the moment, all of
them very low level (e.g. md.c), but you're using it within buffile.c
to open a path to the file that you obtain from shared memory
directly. This is buggy because the following code won't be reached in
workers that call your BufFileImport() function:

/* Mark it for deletion at close */
VfdCache[file].fdstate |= FD_TEMPORARY;

/* Register it with the current resource owner */
if (!interXact)
{
VfdCache[file].fdstate |= FD_XACT_TEMPORARY;

ResourceOwnerEnlargeFiles(CurrentResourceOwner);
ResourceOwnerRememberFile(CurrentResourceOwner, file);
VfdCache[file].resowner = CurrentResourceOwner;

/* ensure cleanup happens at eoxact */
have_xact_temporary_files = true;
}

Certainly, you don't want the "Mark it for deletion at close" bit.
Deletion should not happen at eoxact for non-owners-but-sharers
(within FileClose()), but you *do* want CleanupTempFiles() to call
FileClose() for the virtual file descriptors you've opened in the
backend, to do some other cleanup. In general, you want to buy into
resource ownership for workers. As things stand, I think that this
will leak virtual file descriptors. That's really well hidden because
there is a similar CleanupTempFiles() call at proc exit, I think.
(Didn't take the time to make sure that that's what masked problems.
I'm sure that you want minimal divergence with serial cases,
resource-ownership-wise, in any case.)

Instead of all this, I suggest copying some of my changes to fd.c, so
that resource ownership within fd.c differentiates between a vfd that
is owned by the backend in the conventional sense, including having a
need to delete at eoxact, as well as a lesser form of ownership where
deletion should not happen. Maybe you'll end up using my BufFileUnify
interface [1]https://wiki.postgresql.org/wiki/Parallel_External_Sort#buffile.c.2C_and_BufFile_unification -- Peter Geoghegan within workers (instead of just within the leader, as
with parallel tuplesort), and have it handle all of that for you.
Currently, that would mean that there'd be an unused/0 sized "local"
segment for the unified BufFile, but I was thinking of making that not
happen unless and until a new segment is actually needed, so even that
minor wart wouldn't necessarily affect you.

Some assorted notes on the status: I need to do some thinking about
the file cleanup logic: both explicit deletes at the earliest possible
time, and failure/error paths. Currently the creator of each file is
responsible for cleaning it up, but I guess if the creator aborts
early the file disappears underneath the others' feet, and then I
guess they might raise a confusing error report that races against the
root cause error report; I'm looking into that. Rescans and skew
buckets not finished yet.

The rescan code path seems to segfault when the regression tests are
run. There is a NULL pointer dereference here:

@@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node)
node->hj_HashTable = NULL;
node->hj_JoinState = HJ_BUILD_HASHTABLE;

+           if (HashJoinTableIsShared(node->hj_HashTable))
+           {
+               /* Coordinate a rewind to the shared hash table creation phase. */
+               BarrierWaitSet(&hashNode->shared_table_data->barrier,
+                              PHJ_PHASE_BEGINNING,
+                              WAIT_EVENT_HASHJOIN_REWINDING3);
+           }
+

Clearly, HashJoinTableIsShared() should not be called when its
argument (in this case node->hj_HashTable) is NULL.

In general, I think you should try to set expectations about what
happens when the regression tests run up front, because that's usually
the first thing reviewers do.

Various compiler warnings on my system:

/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHash.c:1376:7:
warning: variable ‘size_before_shrink’ set but not used
[-Wunused-but-set-variable]
Size size_before_shrink = 0;
^
...

/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:
In function ‘ExecHashJoinCloseBatch’:
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1548:28:
warning: variable ‘participant’ set but not used
[-Wunused-but-set-variable]
HashJoinParticipantState *participant;
^
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:
In function ‘ExecHashJoinRewindBatches’:
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1587:23:
warning: variable ‘batch_reader’ set but not used
[-Wunused-but-set-variable]
HashJoinBatchReader *batch_reader;
^

Is this change really needed?:

--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -31,6 +31,8 @@
#include "executor/nodeSeqscan.h"
#include "utils/rel.h"
+#include <unistd.h>
+
static void InitScanRelation(SeqScanState *node, EState *estate, int eflags);
static TupleTableSlot *SeqNext(SeqScanState *node);

That's all I have for now...

[1]: https://wiki.postgresql.org/wiki/Parallel_External_Sort#buffile.c.2C_and_BufFile_unification -- Peter Geoghegan
--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Robert Haas
robertmhaas@gmail.com
In reply to: Peter Geoghegan (#11)
Re: WIP: [[Parallel] Shared] Hash

On Tue, Jan 10, 2017 at 8:56 PM, Peter Geoghegan <pg@heroku.com> wrote:

Instead of all this, I suggest copying some of my changes to fd.c, so
that resource ownership within fd.c differentiates between a vfd that
is owned by the backend in the conventional sense, including having a
need to delete at eoxact, as well as a lesser form of ownership where
deletion should not happen.

If multiple processes are using the same file via the BufFile
interface, I think that it is absolutely necessary that there should
be a provision to track the "attach count" of the BufFile. Each
process that reaches EOXact decrements the attach count and when it
reaches 0, the process that reduced it to 0 removes the BufFile. I
think anything that's based on the notion that leaders will remove
files and workers won't is going to be fragile and limiting, and I am
going to push hard against any such proposal.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

In reply to: Robert Haas (#12)
Re: WIP: [[Parallel] Shared] Hash

On Wed, Jan 11, 2017 at 10:57 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Tue, Jan 10, 2017 at 8:56 PM, Peter Geoghegan <pg@heroku.com> wrote:

Instead of all this, I suggest copying some of my changes to fd.c, so
that resource ownership within fd.c differentiates between a vfd that
is owned by the backend in the conventional sense, including having a
need to delete at eoxact, as well as a lesser form of ownership where
deletion should not happen.

If multiple processes are using the same file via the BufFile
interface, I think that it is absolutely necessary that there should
be a provision to track the "attach count" of the BufFile. Each
process that reaches EOXact decrements the attach count and when it
reaches 0, the process that reduced it to 0 removes the BufFile. I
think anything that's based on the notion that leaders will remove
files and workers won't is going to be fragile and limiting, and I am
going to push hard against any such proposal.

Okay. My BufFile unification approach happens to assume that backends
clean up after themselves, but that isn't a ridged assumption (of
course, these are always temp files, so we reason about them as temp
files). It could be based on a refcount fairly easily, such that, as
you say here, deletion of files occurs within workers (that "own" the
files) only as a consequence of their being the last backend with a
reference, that must therefore "turn out the lights" (delete the
file). That seems consistent with what I've done within fd.c, and what
I suggested to Thomas (that he more or less follow that approach).
You'd probably still want to throw an error when workers ended up not
deleting BufFile segments they owned, though, at least for parallel
tuplesort.

This idea is something that's much more limited than the
SharedTemporaryFile() API that you sketched on the parallel sort
thread, because it only concerns resource management, and not how to
make access to the shared file concurrency safe in any special,
standard way. I think that this resource management is something that
should be managed by buffile.c (and the temp file routines within fd.c
that are morally owned by buffile.c, their only caller). It shouldn't
be necessary for a client of this new infrastructure, such as parallel
tuplesort or parallel hash join, to know anything about file paths.
Instead, they should be passing around some kind of minimal
private-to-buffile state in shared memory that coordinates backends
participating in BufFile unification. Private state created by
buffile.c, and passed back to buffile.c. Everything should be
encapsulated within buffile.c, IMV, making parallel implementations as
close as possible to their serial implementations.

--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Robert Haas
robertmhaas@gmail.com
In reply to: Peter Geoghegan (#13)
Re: WIP: [[Parallel] Shared] Hash

On Wed, Jan 11, 2017 at 2:20 PM, Peter Geoghegan <pg@heroku.com> wrote:

You'd probably still want to throw an error when workers ended up not
deleting BufFile segments they owned, though, at least for parallel
tuplesort.

Don't see why.

This idea is something that's much more limited than the
SharedTemporaryFile() API that you sketched on the parallel sort
thread, because it only concerns resource management, and not how to
make access to the shared file concurrency safe in any special,
standard way.

Actually, I only intended that sketch to be about resource management.
Sounds like I didn't explain very well.

Instead, they should be passing around some kind of minimal
private-to-buffile state in shared memory that coordinates backends
participating in BufFile unification. Private state created by
buffile.c, and passed back to buffile.c. Everything should be
encapsulated within buffile.c, IMV, making parallel implementations as
close as possible to their serial implementations.

That seems reasonable although I haven't studied the details carefully as yet.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

In reply to: Peter Geoghegan (#13)
Re: WIP: [[Parallel] Shared] Hash

On Wed, Jan 11, 2017 at 11:20 AM, Peter Geoghegan <pg@heroku.com> wrote:

If multiple processes are using the same file via the BufFile
interface, I think that it is absolutely necessary that there should
be a provision to track the "attach count" of the BufFile. Each
process that reaches EOXact decrements the attach count and when it
reaches 0, the process that reduced it to 0 removes the BufFile. I
think anything that's based on the notion that leaders will remove
files and workers won't is going to be fragile and limiting, and I am
going to push hard against any such proposal.

Okay. My BufFile unification approach happens to assume that backends
clean up after themselves, but that isn't a ridged assumption (of
course, these are always temp files, so we reason about them as temp
files).

Also, to be clear, and to avoid confusion: I don't think anyone wants
an approach "that's based on the notion that leaders will remove files
and workers won't". All that has been suggested is that the backend
that creates the file should be responsible for deleting the file, by
definition. And, that any other backend that may have files owned by
another backend must be sure to not try to access them after the owner
deletes them. (Typically, that would be ensured by some barrier
condition, some dependency, inherent to how the parallel operation is
implemented.)

I will implement the reference count thing.
--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

In reply to: Robert Haas (#14)
Re: WIP: [[Parallel] Shared] Hash

On Wed, Jan 11, 2017 at 12:05 PM, Robert Haas <robertmhaas@gmail.com> wrote:

On Wed, Jan 11, 2017 at 2:20 PM, Peter Geoghegan <pg@heroku.com> wrote:

You'd probably still want to throw an error when workers ended up not
deleting BufFile segments they owned, though, at least for parallel
tuplesort.

Don't see why.

Simply because that's not expected as things stand -- why should the
file go away in that context? (Admittedly, that doesn't seem like an
excellent reason now.)

I actually like the idea of a reference count, the more I think about
it, since it doesn't actually have any tension with my original idea
of ownership. If something like a randomAccess parallel tuplesort
leader merge needs to write new segments (which it almost certainly
*won't* anyway, due to my recent V7 changes), then it can still own
those new segments itself, alone, and delete them on its own in the
manner of conventional temp files, because we can still restrict the
shared refcount mechanism to the deletion of "initial" segments. The
refcount == 0 deleter only deletes those initial segments, and not any
same-BufFile segments that might have been added (added to append to
our unified BufFile within leader). I think that parallel hash join
won't use this at all, and, as I said, it's only a theoretical
requirement for parallel tuplesort, which will generally recycle
blocks from worker temp files for its own writes all the time for
randomAccess cases, the only cases that ever write within logtape.c.

So, the only BufFile shared state needed, that must be maintained over
time, is the refcount variable itself. The size of the "initial"
BufFile (from which we derive number of new segments during
unification) is passed, but it doesn't get maintained in shared
memory. BufFile size remains a one way, one time message needed during
unification. I only really need to tweak things in fd.c temp routines
to make all this work.

This is something I like because it makes certain theoretically useful
things easier. Say, for example, we wanted to have tuplesort workers
merge worker final materialized tapes (their final output), in order
to arrange for the leader to have fewer than $NWORKER runs to merge at
the end -- that's made easier by the refcount stuff. (I'm still not
convinced that that's actually going to make CREATE INDEX faster.
Still, it should, on general principle, be easy to write a patch that
makes it happen -- a good overall design should leave things so that
writing that prototype patch is easy.)

This idea is something that's much more limited than the
SharedTemporaryFile() API that you sketched on the parallel sort
thread, because it only concerns resource management, and not how to
make access to the shared file concurrency safe in any special,
standard way.

Actually, I only intended that sketch to be about resource management.
Sounds like I didn't explain very well.

I'm glad to hear that, because I was very puzzled by what you said. I
guess I was thrown off by "shared read pointers". I don't want to get
into the business of flushing out dirty buffers, or making sure that
every backend stays in lockstep about what the total size of the
BufFile needs to be. It's so much simpler to just have clear
"barriers" for each parallel operation, where backends present a large
amount of immutable state to one other backend at the end, and tells
it how big its BufFile is only once. (It's not quite immutable, since
randomAccess recycle of temp files can happen within logtape.c, but
the point is that there should be very little back and forth -- that
needs to be severely restricted.)

--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#11)
Re: WIP: [[Parallel] Shared] Hash

On Wed, Jan 11, 2017 at 2:56 PM, Peter Geoghegan <pg@heroku.com> wrote:

On Fri, Jan 6, 2017 at 12:01 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Here is a new WIP patch. I have plenty of things to tidy up (see note
at end), but the main ideas are now pretty clear and I'd appreciate
some feedback.

I have some review feedback for your V3. I've chosen to start with the
buffile.c stuff, since of course it might share something with my
parallel tuplesort patch. This isn't comprehensive, but I will have
more comprehensive feedback soon.

Thanks!

I'm not surprised that you've generally chosen to make shared BufFile
management as simple as possible, with no special infrastructure other
than the ability to hold open other backend temp files concurrently
within a worker, and no writing to another worker's temp file, or
shared read pointer. As you put it, everything is immutable. I
couldn't see much opportunity for adding a lot of infrastructure that
wasn't written explicitly as parallel hash join code/infrastructure.
My sense is that that was a good decision. I doubted that you'd ever
want some advanced, generic shared BufFile thing with multiple read
pointers, built-in cache coherency, etc. (Robert seemed to think that
you'd go that way, though.)

Right, this is extremely minimalist infrastructure. fd.c is
unchanged. buffile.c only gains the power to export/import read-only
views of BufFiles. There is no 'unification' of BufFiles: each hash
join participant simply reads from the buffile it wrote, and then
imports and reads from its peers' BufFiles, until all are exhausted;
so the 'unification' is happening in caller code which knows about the
set of participants and manages shared read positions. Clearly there
are some ownership/cleanup issues to straighten out, but I think those
problems are fixable (probably involving refcounts).

I'm entirely willing to throw that away and use the unified BufFile
concept, if it can be extended to support multiple readers of the
data, where every participant unifies the set of files. I have so far
assumed that it would be most efficient for each participant to read
from the file that it wrote before trying to read from files written
by other participants. I'm reading your patch now; more soon.

Anyway, some more specific observations:

* ISTM that this is the wrong thing for shared BufFiles:

+BufFile *
+BufFileImport(BufFileDescriptor *descriptor)
+{

...

+ file->isInterXact = true; /* prevent cleanup by this backend */

There is only one user of isInterXact = true BufFiles at present,
tuplestore.c. It, in turn, only does so for cases that require
persistent tuple stores. A quick audit of these tuplestore.c callers
show this to just be cursor support code within portalmem.c. Here is
the relevant tuplestore_begin_heap() rule that that code adheres to,
unlike the code I've quoted above:

* interXact: if true, the files used for on-disk storage persist beyond the
* end of the current transaction. NOTE: It's the caller's responsibility to
* create such a tuplestore in a memory context and resource owner that will
* also survive transaction boundaries, and to ensure the tuplestore is closed
* when it's no longer wanted.

Hmm. Yes, that is an entirely bogus use of isInterXact. I am
thinking about how to fix that with refcounts.

I don't think it's right for buffile.c to know anything about file
paths directly -- I'd say that that's a modularity violation.
PathNameOpenFile() is called by very few callers at the moment, all of
them very low level (e.g. md.c), but you're using it within buffile.c
to open a path to the file that you obtain from shared memory

Hmm. I'm not seeing the modularity violation. buffile.c uses
interfaces already exposed by fd.c to do this: OpenTemporaryFile,
then FilePathName to find the path, then PathNameOpenFile to open from
another process. I see that your approach instead has client code
provide more meta data so that things can be discovered, which may
well be a much better idea.

directly. This is buggy because the following code won't be reached in
workers that call your BufFileImport() function:

/* Mark it for deletion at close */
VfdCache[file].fdstate |= FD_TEMPORARY;

/* Register it with the current resource owner */
if (!interXact)
{
VfdCache[file].fdstate |= FD_XACT_TEMPORARY;

ResourceOwnerEnlargeFiles(CurrentResourceOwner);
ResourceOwnerRememberFile(CurrentResourceOwner, file);
VfdCache[file].resowner = CurrentResourceOwner;

/* ensure cleanup happens at eoxact */
have_xact_temporary_files = true;
}

Right, that is a problem. A refcount mode could fix that; virtual
file descriptors would be closed in every backend using the current
resource owner, and the files would be deleted when the last one turns
out the lights.

Certainly, you don't want the "Mark it for deletion at close" bit.
Deletion should not happen at eoxact for non-owners-but-sharers
(within FileClose()), but you *do* want CleanupTempFiles() to call
FileClose() for the virtual file descriptors you've opened in the
backend, to do some other cleanup. In general, you want to buy into
resource ownership for workers. As things stand, I think that this
will leak virtual file descriptors. That's really well hidden because
there is a similar CleanupTempFiles() call at proc exit, I think.
(Didn't take the time to make sure that that's what masked problems.
I'm sure that you want minimal divergence with serial cases,
resource-ownership-wise, in any case.)

Instead of all this, I suggest copying some of my changes to fd.c, so
that resource ownership within fd.c differentiates between a vfd that
is owned by the backend in the conventional sense, including having a
need to delete at eoxact, as well as a lesser form of ownership where
deletion should not happen. Maybe you'll end up using my BufFileUnify
interface [1] within workers (instead of just within the leader, as
with parallel tuplesort), and have it handle all of that for you.
Currently, that would mean that there'd be an unused/0 sized "local"
segment for the unified BufFile, but I was thinking of making that not
happen unless and until a new segment is actually needed, so even that
minor wart wouldn't necessarily affect you.

Ok, I'm studying that code now.

Some assorted notes on the status: I need to do some thinking about
the file cleanup logic: both explicit deletes at the earliest possible
time, and failure/error paths. Currently the creator of each file is
responsible for cleaning it up, but I guess if the creator aborts
early the file disappears underneath the others' feet, and then I
guess they might raise a confusing error report that races against the
root cause error report; I'm looking into that. Rescans and skew
buckets not finished yet.

The rescan code path seems to segfault when the regression tests are
run. There is a NULL pointer dereference here:

@@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node)
node->hj_HashTable = NULL;
node->hj_JoinState = HJ_BUILD_HASHTABLE;

+           if (HashJoinTableIsShared(node->hj_HashTable))
+           {
+               /* Coordinate a rewind to the shared hash table creation phase. */
+               BarrierWaitSet(&hashNode->shared_table_data->barrier,
+                              PHJ_PHASE_BEGINNING,
+                              WAIT_EVENT_HASHJOIN_REWINDING3);
+           }
+

Clearly, HashJoinTableIsShared() should not be called when its
argument (in this case node->hj_HashTable) is NULL.

In general, I think you should try to set expectations about what
happens when the regression tests run up front, because that's usually
the first thing reviewers do.

Apologies, poor form. That block can be commented out for now because
rescan support is obviously incomplete, and I didn't mean to post it
that way. Doing so reveals two remaining test failures: "join" and
"rowsecurity" managed to lose a couple of rows. Oops. I will figure
out what I broke and have a fix for that in my next version.

Various compiler warnings on my system:

/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHash.c:1376:7:
warning: variable ‘size_before_shrink’ set but not used
[-Wunused-but-set-variable]
Size size_before_shrink = 0;
^

In this case it was only used in dtrace builds; I will make sure any
such code is compiled out when in non-dtrace builds.

/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:
In function ‘ExecHashJoinCloseBatch’:
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1548:28:
warning: variable ‘participant’ set but not used
[-Wunused-but-set-variable]
HashJoinParticipantState *participant;
^
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:
In function ‘ExecHashJoinRewindBatches’:
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/executor/nodeHashjoin.c:1587:23:
warning: variable ‘batch_reader’ set but not used
[-Wunused-but-set-variable]
HashJoinBatchReader *batch_reader;
^

Is this change really needed?:

--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -31,6 +31,8 @@
#include "executor/nodeSeqscan.h"
#include "utils/rel.h"
+#include <unistd.h>
+
static void InitScanRelation(SeqScanState *node, EState *estate, int eflags);
static TupleTableSlot *SeqNext(SeqScanState *node);

Right, will clean up.

That's all I have for now...

Thanks! I'm away from my computer for a couple of days but will have
a new patch series early next week, and hope to have a better handle
on what's involved in adopting the 'unification' approach here
instead.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#18Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Thomas Munro (#17)
Re: WIP: [[Parallel] Shared] Hash

On Thu, Jan 12, 2017 at 9:07 AM, Thomas Munro <thomas.munro@enterprisedb.com

wrote:

On Wed, Jan 11, 2017 at 2:56 PM, Peter Geoghegan <pg@heroku.com> wrote:

On Fri, Jan 6, 2017 at 12:01 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Here is a new WIP patch. I have plenty of things to tidy up (see note
at end), but the main ideas are now pretty clear and I'd appreciate
some feedback.

I have some review feedback for your V3. I've chosen to start with the
buffile.c stuff, since of course it might share something with my
parallel tuplesort patch. This isn't comprehensive, but I will have
more comprehensive feedback soon.

Thanks!

I'm not surprised that you've generally chosen to make shared BufFile
management as simple as possible, with no special infrastructure other
than the ability to hold open other backend temp files concurrently
within a worker, and no writing to another worker's temp file, or
shared read pointer. As you put it, everything is immutable. I
couldn't see much opportunity for adding a lot of infrastructure that
wasn't written explicitly as parallel hash join code/infrastructure.
My sense is that that was a good decision. I doubted that you'd ever
want some advanced, generic shared BufFile thing with multiple read
pointers, built-in cache coherency, etc. (Robert seemed to think that
you'd go that way, though.)

Right, this is extremely minimalist infrastructure. fd.c is
unchanged. buffile.c only gains the power to export/import read-only
views of BufFiles. There is no 'unification' of BufFiles: each hash
join participant simply reads from the buffile it wrote, and then
imports and reads from its peers' BufFiles, until all are exhausted;
so the 'unification' is happening in caller code which knows about the
set of participants and manages shared read positions. Clearly there
are some ownership/cleanup issues to straighten out, but I think those
problems are fixable (probably involving refcounts).

I'm entirely willing to throw that away and use the unified BufFile
concept, if it can be extended to support multiple readers of the
data, where every participant unifies the set of files. I have so far
assumed that it would be most efficient for each participant to read
from the file that it wrote before trying to read from files written
by other participants. I'm reading your patch now; more soon.

Anyway, some more specific observations:

* ISTM that this is the wrong thing for shared BufFiles:

+BufFile *
+BufFileImport(BufFileDescriptor *descriptor)
+{

...

+ file->isInterXact = true; /* prevent cleanup by this backend */

There is only one user of isInterXact = true BufFiles at present,
tuplestore.c. It, in turn, only does so for cases that require
persistent tuple stores. A quick audit of these tuplestore.c callers
show this to just be cursor support code within portalmem.c. Here is
the relevant tuplestore_begin_heap() rule that that code adheres to,
unlike the code I've quoted above:

* interXact: if true, the files used for on-disk storage persist beyond

the

* end of the current transaction. NOTE: It's the caller's

responsibility to

* create such a tuplestore in a memory context and resource owner that

will

* also survive transaction boundaries, and to ensure the tuplestore is

closed

* when it's no longer wanted.

Hmm. Yes, that is an entirely bogus use of isInterXact. I am
thinking about how to fix that with refcounts.

I don't think it's right for buffile.c to know anything about file
paths directly -- I'd say that that's a modularity violation.
PathNameOpenFile() is called by very few callers at the moment, all of
them very low level (e.g. md.c), but you're using it within buffile.c
to open a path to the file that you obtain from shared memory

Hmm. I'm not seeing the modularity violation. buffile.c uses
interfaces already exposed by fd.c to do this: OpenTemporaryFile,
then FilePathName to find the path, then PathNameOpenFile to open from
another process. I see that your approach instead has client code
provide more meta data so that things can be discovered, which may
well be a much better idea.

directly. This is buggy because the following code won't be reached in
workers that call your BufFileImport() function:

/* Mark it for deletion at close */
VfdCache[file].fdstate |= FD_TEMPORARY;

/* Register it with the current resource owner */
if (!interXact)
{
VfdCache[file].fdstate |= FD_XACT_TEMPORARY;

ResourceOwnerEnlargeFiles(CurrentResourceOwner);
ResourceOwnerRememberFile(CurrentResourceOwner, file);
VfdCache[file].resowner = CurrentResourceOwner;

/* ensure cleanup happens at eoxact */
have_xact_temporary_files = true;
}

Right, that is a problem. A refcount mode could fix that; virtual
file descriptors would be closed in every backend using the current
resource owner, and the files would be deleted when the last one turns
out the lights.

Certainly, you don't want the "Mark it for deletion at close" bit.
Deletion should not happen at eoxact for non-owners-but-sharers
(within FileClose()), but you *do* want CleanupTempFiles() to call
FileClose() for the virtual file descriptors you've opened in the
backend, to do some other cleanup. In general, you want to buy into
resource ownership for workers. As things stand, I think that this
will leak virtual file descriptors. That's really well hidden because
there is a similar CleanupTempFiles() call at proc exit, I think.
(Didn't take the time to make sure that that's what masked problems.
I'm sure that you want minimal divergence with serial cases,
resource-ownership-wise, in any case.)

Instead of all this, I suggest copying some of my changes to fd.c, so
that resource ownership within fd.c differentiates between a vfd that
is owned by the backend in the conventional sense, including having a
need to delete at eoxact, as well as a lesser form of ownership where
deletion should not happen. Maybe you'll end up using my BufFileUnify
interface [1] within workers (instead of just within the leader, as
with parallel tuplesort), and have it handle all of that for you.
Currently, that would mean that there'd be an unused/0 sized "local"
segment for the unified BufFile, but I was thinking of making that not
happen unless and until a new segment is actually needed, so even that
minor wart wouldn't necessarily affect you.

Ok, I'm studying that code now.

Some assorted notes on the status: I need to do some thinking about
the file cleanup logic: both explicit deletes at the earliest possible
time, and failure/error paths. Currently the creator of each file is
responsible for cleaning it up, but I guess if the creator aborts
early the file disappears underneath the others' feet, and then I
guess they might raise a confusing error report that races against the
root cause error report; I'm looking into that. Rescans and skew
buckets not finished yet.

The rescan code path seems to segfault when the regression tests are
run. There is a NULL pointer dereference here:

@@ -985,6 +1855,14 @@ ExecReScanHashJoin(HashJoinState *node)
node->hj_HashTable = NULL;
node->hj_JoinState = HJ_BUILD_HASHTABLE;

+           if (HashJoinTableIsShared(node->hj_HashTable))
+           {
+               /* Coordinate a rewind to the shared hash table

creation phase. */

+               BarrierWaitSet(&hashNode->shared_table_data->barrier,
+                              PHJ_PHASE_BEGINNING,
+                              WAIT_EVENT_HASHJOIN_REWINDING3);
+           }
+

Clearly, HashJoinTableIsShared() should not be called when its
argument (in this case node->hj_HashTable) is NULL.

In general, I think you should try to set expectations about what
happens when the regression tests run up front, because that's usually
the first thing reviewers do.

Apologies, poor form. That block can be commented out for now because
rescan support is obviously incomplete, and I didn't mean to post it
that way. Doing so reveals two remaining test failures: "join" and
"rowsecurity" managed to lose a couple of rows. Oops. I will figure
out what I broke and have a fix for that in my next version.

Various compiler warnings on my system:

/home/pg/pgbuild/builds/root/../../postgresql/src/backend/

executor/nodeHash.c:1376:7:

warning: variable ‘size_before_shrink’ set but not used
[-Wunused-but-set-variable]
Size size_before_shrink = 0;
^

In this case it was only used in dtrace builds; I will make sure any
such code is compiled out when in non-dtrace builds.

/home/pg/pgbuild/builds/root/../../postgresql/src/backend/

executor/nodeHashjoin.c:

In function ‘ExecHashJoinCloseBatch’:
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/

executor/nodeHashjoin.c:1548:28:

warning: variable ‘participant’ set but not used
[-Wunused-but-set-variable]
HashJoinParticipantState *participant;
^
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/

executor/nodeHashjoin.c:

In function ‘ExecHashJoinRewindBatches’:
/home/pg/pgbuild/builds/root/../../postgresql/src/backend/

executor/nodeHashjoin.c:1587:23:

warning: variable ‘batch_reader’ set but not used
[-Wunused-but-set-variable]
HashJoinBatchReader *batch_reader;
^

Is this change really needed?:

--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -31,6 +31,8 @@
#include "executor/nodeSeqscan.h"
#include "utils/rel.h"
+#include <unistd.h>
+
static void InitScanRelation(SeqScanState *node, EState *estate, int

eflags);

static TupleTableSlot *SeqNext(SeqScanState *node);

Right, will clean up.

That's all I have for now...

Thanks! I'm away from my computer for a couple of days but will have
a new patch series early next week, and hope to have a better handle
on what's involved in adopting the 'unification' approach here
instead.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Hi Thomas,
I was trying to analyse the performance of TPC-H queries with your patch
and came across following results,
Q9 and Q21 were crashing, both of them had following bt in core dump (I
thought it might be helpful),

#0 0x0000000010757da4 in pfree (pointer=0x3fff78d11000) at mcxt.c:1012
#1 0x000000001032c574 in ExecHashIncreaseNumBatches
(hashtable=0x1003af6da60) at nodeHash.c:1124
#2 0x000000001032d518 in ExecHashTableInsert (hashtable=0x1003af6da60,
slot=0x1003af695c0, hashvalue=2904801109, preload=1 '\001') at
nodeHash.c:1700
#3 0x0000000010330fd4 in ExecHashJoinPreloadNextBatch
(hjstate=0x1003af39118) at nodeHashjoin.c:886
#4 0x00000000103301fc in ExecHashJoin (node=0x1003af39118) at
nodeHashjoin.c:376
#5 0x0000000010308644 in ExecProcNode (node=0x1003af39118) at
execProcnode.c:490
#6 0x000000001031f530 in fetch_input_tuple (aggstate=0x1003af38910) at
nodeAgg.c:587
#7 0x0000000010322b50 in agg_fill_hash_table (aggstate=0x1003af38910) at
nodeAgg.c:2304
#8 0x000000001032239c in ExecAgg (node=0x1003af38910) at nodeAgg.c:1942
#9 0x0000000010308694 in ExecProcNode (node=0x1003af38910) at
execProcnode.c:509
#10 0x0000000010302a1c in ExecutePlan (estate=0x1003af37fa0,
planstate=0x1003af38910, use_parallel_mode=0 '\000', operation=CMD_SELECT,
sendTuples=1 '\001', numberTuples=0,
direction=ForwardScanDirection, dest=0x1003af19390) at execMain.c:1587

In case you want to know, I was using TPC-H with 20 scale factor. Please
let me know if you want anymore information on this.

--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

In reply to: Thomas Munro (#17)
Re: WIP: [[Parallel] Shared] Hash

On Wed, Jan 11, 2017 at 7:37 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Hmm. Yes, that is an entirely bogus use of isInterXact. I am
thinking about how to fix that with refcounts.

Cool. As I said, the way I'd introduce refcounts would not be very
different from what I've already done -- there'd still be a strong
adherence to the use of resource managers to clean-up, with that
including exactly one particular backend doing the extra step of
deletion. The refcount only changes which backend does that extra step
in corner cases, which is conceptually a very minor change.

I don't think it's right for buffile.c to know anything about file
paths directly -- I'd say that that's a modularity violation.
PathNameOpenFile() is called by very few callers at the moment, all of
them very low level (e.g. md.c), but you're using it within buffile.c
to open a path to the file that you obtain from shared memory

Hmm. I'm not seeing the modularity violation. buffile.c uses
interfaces already exposed by fd.c to do this: OpenTemporaryFile,
then FilePathName to find the path, then PathNameOpenFile to open from
another process. I see that your approach instead has client code
provide more meta data so that things can be discovered, which may
well be a much better idea.

Indeed, my point was that the metadata thing would IMV be better.
buffile.c shouldn't need to know about file paths, etc. Instead,
caller should pass BufFileImport()/BufFileUnify() simple private state
sufficient for routine to discover all details itself, based on a
deterministic scheme. In my tuplesort patch, that piece of state is:

 /*
+ * BufFileOp is an identifier for a particular parallel operation involving
+ * temporary files.  Parallel temp file operations must be discoverable across
+ * processes based on these details.
+ *
+ * These fields should be set by BufFileGetIdent() within leader process.
+ * Identifier BufFileOp makes temp files from workers discoverable within
+ * leader.
+ */
+typedef struct BufFileOp
+{
+   /*
+    * leaderPid is leader process PID.
+    *
+    * tempFileIdent is an identifier for a particular temp file (or parallel
+    * temp file op) for the leader.  Needed to distinguish multiple parallel
+    * temp file operations within a given leader process.
+    */
+   int         leaderPid;
+   long        tempFileIdent;
+} BufFileOp;
+

Right, that is a problem. A refcount mode could fix that; virtual
file descriptors would be closed in every backend using the current
resource owner, and the files would be deleted when the last one turns
out the lights.

Yeah. That's basically what the BufFile unification process can
provide you with (or will, once I get around to implementing the
refcount thing, which shouldn't be too hard). As already noted, I'll
also want to make it defer creation of a leader-owned segment, unless
and until that proves necessary, which it never will for hash join.

Perhaps I should make superficial changes to unification in my patch
to suit your work, like rename the field BufFileOp.leaderPid to
BufFileOp.ownerPid, without actually changing any behaviors, except as
noted in the last paragraph. Since you only require that backends be
able to open up some other backend's temp file themselves for a short
while, that gives you everything you need. You'll be doing unification
in backends, and not just within the leader as in the tuplesort patch,
I believe, but that's just fine. All that matters is that you present
all data at once to a consuming backend via unification (since you
treat temp file contents as immutable, this will be true for hash
join, just as it is for tuplesort).

There is a good argument against my making such a tweak, however,
which is that maybe it's clearer to DBAs what's going on if temp file
names have the leader PID in them for all operations. So, maybe
BufFileOp.leaderPid isn't renamed to BufFileOp.ownerPid by me;
instead, you always make it the leader pid, while at the same time
having the leader dole out BufFileOp.tempFileIdent identifiers to each
worker as needed (see how I generate BufFileOps for an idea of what I
mean if it's not immediately clear). That's also an easy change, or at
least will be once the refcount thing is added.

--
Peter Geoghegan

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#20Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#19)
Re: WIP: [[Parallel] Shared] Hash

On Fri, Jan 13, 2017 at 2:36 PM, Peter Geoghegan <pg@heroku.com> wrote:

[...]
Yeah. That's basically what the BufFile unification process can
provide you with (or will, once I get around to implementing the
refcount thing, which shouldn't be too hard). As already noted, I'll
also want to make it defer creation of a leader-owned segment, unless
and until that proves necessary, which it never will for hash join.

Hi Peter,

I have broken this up into a patch series, harmonised the private vs
shared hash table code paths better and fixed many things including
the problems with rescans and regression tests mentioned upthread.
You'll see that one of the patches is that throwaway BufFile
import/export facility, which I'll replace with your code as
discussed.

The three 'refactor' patches change the existing hash join code to
work in terms of chunks in more places. These may be improvements in
their own right, but mainly they pave the way for parallelism. The
later patches introduce single-batch and then multi-batch shared
tables.

The patches in the attached tarball are:

0001-nail-down-regression-test-row-order-v4.patch:

A couple of regression tests would fail with later refactoring that
changes the order of unmatched rows emitted by hash joins. So first,
let's fix that by adding ORDER BY in those places, without any code
changes.

0002-hj-add-dtrace-probes-v4.patch:

Before making any code changes, let's add some dtrace probes so that
we can measure time spent doing different phases of hash join work
before and after the later changes. The main problem with the probes
as I have them here (and the extra probes inserted by later patches in
the series) is that interesting query plans contain multiple hash
joins so these get all mixed up when you're trying to measure stuff,
so perhaps I should pass executor node IDs into all the probes. More
on this later. (If people don't want dtrace probes in the executor,
I'm happy to omit them and maintain that kind of thing locally for my
own testing purposes...)

0003-hj-refactor-memory-accounting-v4.patch:

Modify the existing hash join code to work in terms of chunks when
estimating and later tracking memory usage. This is probably more
accurate than the current tuple-based approach, because it tries to
take into account the space used by chunk headers and the wasted space
in chunks. In practice the difference is probably small, but it's
arguably more accurate; I did this because I need chunk-based
accounting the later patches. Also, make HASH_CHUNK_SIZE the actual
size of allocated chunks (ie the header information is included in
that size so we allocate exactly 32KB, not 32KB + a bit, for the
benefit of the dsa allocator which otherwise finishes up allocating
36KB).

0004-hj-refactor-batch-increases-v4.patch:

Modify the existing hash join code to detect work_mem exhaustion at
the point where chunks are allocated, instead of checking after every
tuple insertion. This matches the logic used for estimating, and more
importantly allows for some parallelism in later patches.

0005-hj-refactor-unmatched-v4.patch:

Modifies the existing hash join code to handle unmatched tuples in
right/full joins chunk-by-chunk. This is probably a cache-friendlier
scan order anyway, but the real goal is to provide a natural grain for
parallelism in a later patch.

0006-hj-barrier-v4.patch:

The patch from a nearby thread previously presented as a dependency of
this project. It might as well be considered part of this patch
series.

0007-hj-exec-detach-node-v4.patch

By the time ExecEndNode() runs in workers, ExecShutdownNode() has
already run. That's done on purpose because, for example, the hash
table needs to survive longer than the parallel environment to allow
EXPLAIN to peek at it. But it means that the Gather node has thrown
out the shared memory before any parallel-aware node below it gets to
run its Shutdown and End methods. So I invented ExecDetachNode()
which runs before ExecShutdownNode(), giving parallel-aware nodes a
chance to say goodbye before their shared memory vanishes. Better
ideas?

0008-hj-shared-single-batch-v4.patch:

Introduces hash joins with "Shared Hash" and "Parallel Shared Hash"
nodes, for single-batch joins only. If the planner has a partial
inner plan, it'll pick a Parallel Shared Hash plan to divide that over
K participants. Failing that, if the planner has a parallel-safe
inner plan and thinks that it can avoid batching by using work_mem * K
memory (shared by all K participants), it will now use a Shared Hash.
Otherwise it'll typically use a Hash plan as before. Without the
later patches, it will blow through work_mem * K if it turns out to
have underestimated the hash table size, because it lacks
infrastructure for dealing with batches.

The trickiest thing at this point in the series is that participants
(workers and the leader) can show up at any time, so there are three
places that provide synchronisation with a parallel hash join that is
already in progress. Those can be seen in ExecHashTableCreate,
MultiExecHash and ExecHashJoin (HJ_BUILD_HASHTABLE case).

0009-hj-shared-buffile-strawman-v4.patch:

Simple code for sharing BufFiles between backends. This is standing
in for Peter G's BufFile sharing facility with refcount-based cleanup.

0010-hj-shared-multi-batch-v4.patch:

Adds support for multi-batch joins with shared hash tables. At this
point, more complications appear: deadlock avoidance with the leader,
batch file sharing and coordinated batch number increases (shrinking
the hash table) while building or loading.

Some thoughts:

* Although this patch series adds a ton of wait points, in the common
case of a single batch inner join there is effectively only one:
participants wait for PHJ_PHASE_BUILDING to end and PHJ_PHASE_PROBING
to begin (resizing the hash table in between if necessary). For a
single batch outer join, there is one more wait point: participants
wait for PHJ_PHASE_PROBING to end so that PHJ_PHASE_UNMATCHED can
begin. The length of the wait for PHJ_PHASE_BUILDING to finish is
limited by the grain of the scattered data being loaded into the hash
table: if the source of parallelism is Parallel Seq Scan, then the
worst case scenario is that you run out of tuples to insert and
twiddle your thumbs while some other participant chews on the final
pageful of tuples. The wait for PHJ_PHASE_UNMATCHED (if applicable)
is similarly limited by the time it takes for the slowest participant
to scan the match bits of one chunk of tuples. All other phases and
associated wait points relate to multi-batch joins: either running out
of work_mem and needing to shrink the hash table, or coordinating
loading and various batches; in other words, ugly synchronisation only
enters the picture at the point where hash join starts doing IO
because you don't have enough work_mem.

* I wrestled with rescans for a long time; I think I have it right
now! The key thing to understand is that only the leader runs
ExecHashJoinReScan; new workers will be created for the next scan, so
the leader is able to get the barrier into the right state (attached
and fast-forwarded to PHJ_PHASE_PROBING if reusing the hash table,
detached and in the initial phase PHJ_PHASE_BEGINNING if we need to
recreate it).

* Skew table not supported yet.

* I removed the support for preloading data for the next batch; it
didn't seem to buy anything (it faithfully used up exactly all of your
work_mem for a brief moment, but since probing usually finishes very
close together in all participants anyway, no total execution time
seems to be saved) and added some complexity to the code; might be
worth revisiting but I'm not hopeful.

* The thing where different backends attach at different phases of the
hash join obviously creates a fairly large bug surface; of course we
can review the code and convince ourselves that it is correct, but
what is really needed is a test with 100% coverage that somehow
arranges for a worker to join at phases 0 to 12, and then perhaps also
for the leader to do the same; I have an idea for how to do that with
a debug build, more soon.

* Some of this needs to be more beautiful.

* With the patches up to 0008-hj-shared-single-batch.patch, I find
that typically I can get up to 3x or 4x speedups on queries like TPCH
Q9 that can benefit from a partial inner plan using Parallel Shared
Hash when work_mem is set 'just right', and at least some speedup on
queries without a partial inner plan but where the extra usable memory
available to Shared Hash can avoid the need to batching. (The best
cases I've seen probably combine these factors: avoiding batching and
dividing work up).

* With the full patch series up to 0010-hj-shared-multi-batch.patch,
it produces some terrible plans for some TPCH queries right now, and
I'm investigating that. Up to this point I have been focused on
getting the multi-batch code to work correctly, but will now turn some
attention to planning and efficiency and figure out what's happening
there.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

parallel-shared-hash-v4.tgzapplication/x-gzip; name=parallel-shared-hash-v4.tgzDownload+0-2
In reply to: Thomas Munro (#20)
#22Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#8)
#23Michael Paquier
michael@paquier.xyz
In reply to: Thomas Munro (#20)
#24Ashutosh Bapat
ashutosh.bapat@enterprisedb.com
In reply to: Thomas Munro (#20)
#25Thomas Munro
thomas.munro@gmail.com
In reply to: Ashutosh Bapat (#24)
#26Ashutosh Bapat
ashutosh.bapat@enterprisedb.com
In reply to: Thomas Munro (#25)
#27Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Ashutosh Bapat (#26)
#28Thomas Munro
thomas.munro@gmail.com
In reply to: Rafia Sabih (#27)
#29Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Thomas Munro (#28)
#30Thomas Munro
thomas.munro@gmail.com
In reply to: Rafia Sabih (#29)
#31Ashutosh Bapat
ashutosh.bapat@enterprisedb.com
In reply to: Thomas Munro (#20)
#32Thomas Munro
thomas.munro@gmail.com
In reply to: Rafia Sabih (#29)
#33Thomas Munro
thomas.munro@gmail.com
In reply to: Ashutosh Bapat (#31)
#34Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#33)
#35Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#32)
#36Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#35)
#37Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#35)
#38Robert Haas
robertmhaas@gmail.com
In reply to: Andres Freund (#35)
#39Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#37)
#40Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#39)
#41Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#40)
#42Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#40)
#43Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andres Freund (#42)
#44Thomas Munro
thomas.munro@gmail.com
In reply to: Tom Lane (#43)
#45Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#42)
In reply to: Thomas Munro (#45)
#47Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Thomas Munro (#45)
#48Thomas Munro
thomas.munro@gmail.com
In reply to: Rafia Sabih (#47)
#49Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#48)
#50Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#49)
In reply to: Thomas Munro (#50)
In reply to: Peter Geoghegan (#51)
#53Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#51)
In reply to: Thomas Munro (#53)
#55Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#54)
In reply to: Thomas Munro (#55)
#57Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#56)
In reply to: Thomas Munro (#57)
#59Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#55)
#60Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#55)
#61Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#59)
#62Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#61)
In reply to: Thomas Munro (#62)
#64Thomas Munro
thomas.munro@gmail.com
In reply to: Peter Geoghegan (#63)
In reply to: Thomas Munro (#64)
#66Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#57)
#67Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#55)
#68Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Thomas Munro (#66)
#69David Steele
david@pgmasters.net
In reply to: Rafia Sabih (#68)
#70Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#67)
#71Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Rafia Sabih (#68)
#72Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#70)
#73Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#72)
#74Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#72)
#75Thomas Munro
thomas.munro@gmail.com
In reply to: Andres Freund (#74)
#76Oleg Golovanov
rentech@mail.ru
In reply to: Thomas Munro (#75)
#77Thomas Munro
thomas.munro@gmail.com
In reply to: Oleg Golovanov (#76)
#78Oleg Golovanov
rentech@mail.ru
In reply to: Thomas Munro (#77)
#79Thomas Munro
thomas.munro@gmail.com
In reply to: Oleg Golovanov (#78)
#80Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#79)
#81Thomas Munro
thomas.munro@gmail.com
In reply to: Thomas Munro (#80)
#82Michael Paquier
michael@paquier.xyz
In reply to: Thomas Munro (#81)
#83Thomas Munro
thomas.munro@gmail.com
In reply to: Michael Paquier (#82)