9.5: Memory-bounded HashAgg

Started by Jeff Davisover 11 years ago39 messageshackers
Jump to latest
#1Jeff Davis
pgsql@j-davis.com

This patch is requires the Memory Accounting patch, or something similar
to track memory usage.

The attached patch enables hashagg to spill to disk, which means that
hashagg will contain itself to work_mem even if the planner makes a
bad misestimate of the cardinality.

This is a well-known concept; there's even a Berkeley homework
assignment floating around to implement it -- in postgres 7.2, no
less. I didn't take the exact same approach as the homework assignment
suggests, but it's not much different, either. My apologies if some
classes are still using this as a homework assignment, but postgres
needs to eventually have an answer to this problem.

Included is a GUC, "enable_hashagg_disk" (default on), which allows
the planner to choose hashagg even if it doesn't expect the hashtable
to fit in memory. If it's off, and the planner misestimates the
cardinality, hashagg will still use the disk to contain itself to
work_mem.

One situation that might surprise the user is if work_mem is set too
low, and the user is *relying* on a misestimate to pick hashagg. With
this patch, it would end up going to disk, which might be
significantly slower. The solution for the user is to increase
work_mem.

Rough Design:

Change the hash aggregate algorithm to accept a generic "work item",
which consists of an input file as well as some other bookkeeping
information.

Initially prime the algorithm by adding a single work item where the
file is NULL, indicating that it should read from the outer plan.

If the memory is exhausted during execution of a work item, then
continue to allow existing groups to be aggregated, but do not allow new
groups to be created in the hash table. Tuples representing new groups
are saved in an output partition file referenced in the work item that
is currently being executed.

When the work item is done, emit any groups in the hash table, clear the
hash table, and turn each output partition file into a new work item.

Each time through at least some groups are able to stay in the hash
table, so eventually none will need to be saved in output partitions, no
new work items will be created, and the algorithm will terminate. This
is true even if the number of output partitions is always one.

Open items:
* costing
* EXPLAIN details for disk usage
* choose number of partitions intelligently
* performance testing

Initial tests indicate that it can be competitive with sort+groupagg
when the disk is involved, but more testing is required.

Feedback welcome.

Regards,
Jeff Davis

Attachments:

hashagg-disk-20140810.patchtext/x-patch; charset=UTF-8; name=hashagg-disk-20140810.patchDownload+577-126
#2Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#1)
Re: 9.5: Memory-bounded HashAgg

Hi,

it's 1AM here, so only a few comments after quickly reading the patch.

On 10.8.2014 23:26, Jeff Davis wrote:

This patch is requires the Memory Accounting patch, or something
similar to track memory usage.

I think the patch you sent actually includes the accounting patch. Is
that on purpose, or by accident?

I'd suggest keeping these two patches separate.

Rough Design:

Change the hash aggregate algorithm to accept a generic "work item",
which consists of an input file as well as some other bookkeeping
information.

Initially prime the algorithm by adding a single work item where the
file is NULL, indicating that it should read from the outer plan.

If the memory is exhausted during execution of a work item, then
continue to allow existing groups to be aggregated, but do not allow
new groups to be created in the hash table. Tuples representing new
groups are saved in an output partition file referenced in the work
item that is currently being executed.

When the work item is done, emit any groups in the hash table, clear
the hash table, and turn each output partition file into a new work
item.

Each time through at least some groups are able to stay in the hash
table, so eventually none will need to be saved in output
partitions, no new work items will be created, and the algorithm will
terminate. This is true even if the number of output partitions is
always one.

So once a group gets into memory, it stays there? That's going to work
fine for aggregates with fixed-size state (int4, or generally state that
gets allocated and does not grow), but I'm afraid for aggregates with
growing state (as for example array_agg and similar) that's not really a
solution.

How difficult would it be to dump the current state into a file (and
remove them from the hash table)?

While hacking on the hash join, I envisioned the hash aggregate might
work in a very similar manner, i.e. something like this:

* nbatches=1, nbits=0
* when work_mem gets full => nbatches *= 2, nbits += 1
* get rid of half the groups, using nbits from the hash
=> dump the current states into 'states.batchno' file
=> dump further tuples to 'tuples.batchno' file
* continue until the end, or until work_mem gets full again

This is pretty much what the hashjoin does, except that the join needs
to batch the outer relation too (which hashagg does not need to do).
Otherwise most of the batching logic can be copied.

It also seems to me that the logic of the patch is about this:

* try to lookup the group in the hash table
* found => call the transition function
* not found
* enough space => call transition function
* not enough space => tuple/group goes to a batch

Which pretty much means all tuples need to do the lookup first. The nice
thing on the hash-join approach is that you don't really need to do the
lookup - you just need to peek at the hash whether the group belongs to
the current batch (and if not, to which batch it should go).

Of course, that would require the ability to dump the current state of
the group, but for the aggregates using basic types as a state
(int4/int8, ...) with fixed-length state that's trivial.

For aggregates using 'internal' to pass pointers that requires some help
from the author - serialization/deserialization functions.

Open items:
* costing

Not sure how this is done for the hash-join, but I guess that might be a
good place for inspiration.

* EXPLAIN details for disk usage
* choose number of partitions intelligently

What is the purpose of HASH_DISK_MAX_PARTITIONS? I mean, when we decide
we need 2048 partitions, why should we use less if we believe it will
get us over work_mem?

* performance testing

Initial tests indicate that it can be competitive with sort+groupagg
when the disk is involved, but more testing is required.

For us, removing the sort is a big deal, because we're working with

100M rows regularly. It's more complicated though, because the sort is

usually enforced by COUNT(DISTINCT) and that's not going to disappear
because of this patch. But that's solvable with a custom aggregate.

Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Jeff Davis
pgsql@j-davis.com
In reply to: Tomas Vondra (#2)
Re: 9.5: Memory-bounded HashAgg

On Mon, 2014-08-11 at 01:29 +0200, Tomas Vondra wrote:

On 10.8.2014 23:26, Jeff Davis wrote:

This patch is requires the Memory Accounting patch, or something
similar to track memory usage.

I think the patch you sent actually includes the accounting patch. Is
that on purpose, or by accident?

Accident, thank you.

So once a group gets into memory, it stays there? That's going to work
fine for aggregates with fixed-size state (int4, or generally state that
gets allocated and does not grow), but I'm afraid for aggregates with
growing state (as for example array_agg and similar) that's not really a
solution.

I agree in theory, but for now I'm just not handling that case at all
because there is other work that needs to be done first. For one thing,
we would need a way to save the transition state, and we don't really
have that. In the case of array_agg, the state is not serialized and
there's no generic way to ask it to serialize itself without finalizing.

I'm open to ideas. Do you think my patch is going generally in the right
direction, and we can address this problem later; or do you think we
need a different approach entirely?

While hacking on the hash join, I envisioned the hash aggregate might
work in a very similar manner, i.e. something like this:

* nbatches=1, nbits=0
* when work_mem gets full => nbatches *= 2, nbits += 1
* get rid of half the groups, using nbits from the hash
=> dump the current states into 'states.batchno' file
=> dump further tuples to 'tuples.batchno' file
* continue until the end, or until work_mem gets full again

It would get a little messy with HashAgg. Hashjoin is dealing entirely
with tuples; HashAgg deals with tuples and groups.

Also, if the transition state is fixed-size (or even nearly so), it
makes no sense to remove groups from the hash table before they are
finished. We'd need to detect that somehow, and it seems almost like two
different algorithms (though maybe not a bad idea to use a different
algorithm for things like array_agg).

Not saying that it can't be done, but (unless you have an idea) requires
quite a bit more work than what I did here.

It also seems to me that the logic of the patch is about this:

* try to lookup the group in the hash table
* found => call the transition function
* not found
* enough space => call transition function
* not enough space => tuple/group goes to a batch

Which pretty much means all tuples need to do the lookup first. The nice
thing on the hash-join approach is that you don't really need to do the
lookup - you just need to peek at the hash whether the group belongs to
the current batch (and if not, to which batch it should go).

That's an interesting point. I suspect that, in practice, the cost of
hashing the tuple is more expensive (or at least not much cheaper than)
doing a failed lookup.

For aggregates using 'internal' to pass pointers that requires some help
from the author - serialization/deserialization functions.

Ah, yes, this is what I was referring to earlier.

* EXPLAIN details for disk usage
* choose number of partitions intelligently

What is the purpose of HASH_DISK_MAX_PARTITIONS? I mean, when we decide
we need 2048 partitions, why should we use less if we believe it will
get us over work_mem?

Because I suspect there are costs in having an extra file around that
I'm not accounting for directly. We are implicitly assuming that the OS
will keep around enough buffers for each BufFile to do sequential writes
when needed. If we create a zillion partitions, then either we end up
with random I/O or we push the memory burden into the OS buffer cache.

We could try to model those costs explicitly to put some downward
pressure on the number of partitions we select, but I just chose to cap
it for now.

For us, removing the sort is a big deal, because we're working with

100M rows regularly. It's more complicated though, because the sort is

usually enforced by COUNT(DISTINCT) and that's not going to disappear
because of this patch. But that's solvable with a custom aggregate.

I hope this offers you a good alternative.

I'm not sure it will ever beat sort for very high cardinality cases, but
I hope it can beat sort when the group size averages something higher
than one. It will also be safer, so the optimizer can be more aggressive
about choosing HashAgg.

Thank you for taking a look so quickly!

Regards,
Jeff Davis

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#3)
Re: 9.5: Memory-bounded HashAgg

On 12 Srpen 2014, 7:06, Jeff Davis wrote:

On Mon, 2014-08-11 at 01:29 +0200, Tomas Vondra wrote:

On 10.8.2014 23:26, Jeff Davis wrote:

This patch is requires the Memory Accounting patch, or something
similar to track memory usage.

I think the patch you sent actually includes the accounting patch. Is
that on purpose, or by accident?

Accident, thank you.

So once a group gets into memory, it stays there? That's going to work
fine for aggregates with fixed-size state (int4, or generally state that
gets allocated and does not grow), but I'm afraid for aggregates with
growing state (as for example array_agg and similar) that's not really a
solution.

I agree in theory, but for now I'm just not handling that case at all
because there is other work that needs to be done first. For one thing,
we would need a way to save the transition state, and we don't really
have that. In the case of array_agg, the state is not serialized and
there's no generic way to ask it to serialize itself without finalizing.

Yes and no.

It's true we don't have this ability for aggregates passing state using
'internal', and arguably these are the cases that matter (because those
are the states that tend to "bloat" as more values are passed to the
aggregate).

We can do that for states with a known type (because we have serialize
deserialize methods for them), but we can't really require all aggregates
to use only known types. The 'internal' is there for a reason.

So I think eventually we should to support something like this:

CREATE AGGREGATE myaggregate (
...
SERIALIZE_FUNC = 'dump_data',
DESERIALIZE_FUNC = 'read_data',
...
);

That being said, we can't require this from all existing aggregates.
There'll always be aggregates not providing this (for example some old
ones).

So even if we have this, we'll have to support the case when it's not
provided - possibly by using the batching algorithm you provided. What
I imagine is this:

hitting work_mem limit -> do we know how to dump the aggregate state?

yes (known type or serialize/deserialize)
=> use the batching algorithm from hash join

no (unknown type, ...)
=> use the batching algorithm described in the original message

Now, I'm not trying to make you implement all this - I'm willing to work
on that. Implementing this CREATE AGGREGATE extension is however tightly
coupled with your patch, because that's the only place where it might be
used (that I'm aware of).

I'm open to ideas. Do you think my patch is going generally in the right
direction, and we can address this problem later; or do you think we
need a different approach entirely?

I certainly think having memory-bounded hashagg is a great improvement,
and yes - this patch can get us there. Maybe it won't get us all the way
to the "perfect solution" but so what? We can improve that by further
patches (and I'm certainly willing to spend some time on that).

So thanks a lot for working on this!

While hacking on the hash join, I envisioned the hash aggregate might
work in a very similar manner, i.e. something like this:

* nbatches=1, nbits=0
* when work_mem gets full => nbatches *= 2, nbits += 1
* get rid of half the groups, using nbits from the hash
=> dump the current states into 'states.batchno' file
=> dump further tuples to 'tuples.batchno' file
* continue until the end, or until work_mem gets full again

It would get a little messy with HashAgg. Hashjoin is dealing entirely
with tuples; HashAgg deals with tuples and groups.

I don't see why it should get messy? In the end, you have a chunk of
data and a hash for it.

Also, if the transition state is fixed-size (or even nearly so), it
makes no sense to remove groups from the hash table before they are
finished. We'd need to detect that somehow, and it seems almost like two
different algorithms (though maybe not a bad idea to use a different
algorithm for things like array_agg).

It just means you need to walk through the hash table, look at the
hashes and dump ~50% of the groups to a file. I'm not sure how difficult
that is with dynahash, though (hashjoin uses a custom hashtable, that
makes this very simple).

Not saying that it can't be done, but (unless you have an idea) requires
quite a bit more work than what I did here.

It also seems to me that the logic of the patch is about this:

* try to lookup the group in the hash table
* found => call the transition function
* not found
* enough space => call transition function
* not enough space => tuple/group goes to a batch

Which pretty much means all tuples need to do the lookup first. The nice
thing on the hash-join approach is that you don't really need to do the
lookup - you just need to peek at the hash whether the group belongs to
the current batch (and if not, to which batch it should go).

That's an interesting point. I suspect that, in practice, the cost of
hashing the tuple is more expensive (or at least not much cheaper than)
doing a failed lookup.

I think you're missing the point, here. You need to compute the hash in
both cases. And then you either can do a lookup or just peek at the first
few bits of the hash to see whether it's in the current batch or not.

Certainly, doing this:

batchno = hash & (nbatches - 1);

if (batchno > curbatch) {
... not current batch, dump to file ...
}

is much faster than a lookup. Also, as the hash table grows (beyond L3
cache size, which is a few MBs today), it becomes much slower in my
experience - that's one of the lessons I learnt while hacking on the
hashjoin. And we're dealing with hashagg not fitting into work_mem, so
this seems to be relevant.

For aggregates using 'internal' to pass pointers that requires some help
from the author - serialization/deserialization functions.

Ah, yes, this is what I was referring to earlier.

* EXPLAIN details for disk usage
* choose number of partitions intelligently

What is the purpose of HASH_DISK_MAX_PARTITIONS? I mean, when we decide
we need 2048 partitions, why should we use less if we believe it will
get us over work_mem?

Because I suspect there are costs in having an extra file around that
I'm not accounting for directly. We are implicitly assuming that the OS
will keep around enough buffers for each BufFile to do sequential writes
when needed. If we create a zillion partitions, then either we end up
with random I/O or we push the memory burden into the OS buffer cache.

Assuming I understand it correctly, I think this logic is broken. Are you
saying "We'll try to do memory-bounded hashagg, but not for the really
large datasets because of fear we might cause random I/O"?

While I certainly understand your concerns about generating excessive
amount of random I/O, I think the modern filesystem are handling that just
fine (coalescing the writes into mostly sequential writes, etc.). Also,
current hardware is really good at handling this (controllers with write
cache, SSDs etc.).

Also, if hash-join does not worry about number of batches, why should
hashagg worry about that? I expect the I/O patterns to be very similar.

And if you have many batches, it means you have tiny work_mem, compared
to the amount of data. Either you have unreasonably small work_mem
(better fix that) or a lot of data (better have a lot of RAM and good
storage, or you'll suffer anyway).

In any case, trying to fix this by limiting number of partitions seems
like a bad approach. I think factoring those concerns into a costing
model is more appropriate.

We could try to model those costs explicitly to put some downward
pressure on the number of partitions we select, but I just chose to cap
it for now.

OK, understood. We can't get all the goodies in the first version.

For us, removing the sort is a big deal, because we're working with

100M rows regularly. It's more complicated though, because the sort is

usually enforced by COUNT(DISTINCT) and that's not going to disappear
because of this patch. But that's solvable with a custom aggregate.

I hope this offers you a good alternative.

I'm not sure it will ever beat sort for very high cardinality cases, but
I hope it can beat sort when the group size averages something higher
than one. It will also be safer, so the optimizer can be more aggressive
about choosing HashAgg.

It's certainly an improvement, although the sort may get there for one
of two reasons:

(a) COUNT(DISTINCT) -> this is solved by a custom aggregate

(b) bad estimate of required memory -> this is common for aggregates
passing 'internal' state (planner uses some quite high defaults)

Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Jeff Davis
pgsql@j-davis.com
In reply to: Tomas Vondra (#4)
Re: 9.5: Memory-bounded HashAgg

On Tue, 2014-08-12 at 14:58 +0200, Tomas Vondra wrote:

CREATE AGGREGATE myaggregate (
...
SERIALIZE_FUNC = 'dump_data',
DESERIALIZE_FUNC = 'read_data',
...
);

Seems reasonable.

I don't see why it should get messy? In the end, you have a chunk of
data and a hash for it.

Perhaps it's fine; I'd have to see the approach.

It just means you need to walk through the hash table, look at the
hashes and dump ~50% of the groups to a file.

If you have fixed-size states, why would you *want* to remove the group?
What is gained?

One thing I like about my simple approach is that it returns a good
number of groups after each pass, and then those are completely finished
(returned to the operator above, even). That's impossible with HashJoin
because the hashing all needs to be done before the probe phase begins.

The weakness of my approach is the array_agg case that you mention,
because this approach doesn't offer a way to dump out transition states.
It seems like that could be added later, but let me know if you see a
problem there.

I think you're missing the point, here. You need to compute the hash in
both cases. And then you either can do a lookup or just peek at the first
few bits of the hash to see whether it's in the current batch or not.

I understood that. The point I was trying to make (which might or might
not be true) was that: (a) this only matters for a failed lookup,
because a successful lookup would just go in the hash table anyway; and
(b) a failed lookup probably doesn't cost much compared to all of the
other things that need to happen along that path.

I should have chosen a better example though. For instance: if the
lookup fails, we need to write the tuple, and writing the tuple is sure
to swamp the cost of a failed hash lookup.

is much faster than a lookup. Also, as the hash table grows (beyond L3
cache size, which is a few MBs today), it becomes much slower in my
experience - that's one of the lessons I learnt while hacking on the
hashjoin. And we're dealing with hashagg not fitting into work_mem, so
this seems to be relevant.

Could be, but this is also the path that goes to disk, so I'm not sure
how significant it is.

Because I suspect there are costs in having an extra file around that
I'm not accounting for directly. We are implicitly assuming that the OS
will keep around enough buffers for each BufFile to do sequential writes
when needed. If we create a zillion partitions, then either we end up
with random I/O or we push the memory burden into the OS buffer cache.

Assuming I understand it correctly, I think this logic is broken. Are you
saying "We'll try to do memory-bounded hashagg, but not for the really
large datasets because of fear we might cause random I/O"?

No, the memory is still bounded even for very high cardinality inputs
(ignoring array_agg case for now). When a partition is processed later,
it also might exhaust work_mem, and need to write out tuples to its own
set of partitions. This allows memory-bounded execution to succeed even
if the number of partitions each iteration is one, though it will result
in repeated I/O for the same tuple.

While I certainly understand your concerns about generating excessive
amount of random I/O, I think the modern filesystem are handling that just
fine (coalescing the writes into mostly sequential writes, etc.). Also,
current hardware is really good at handling this (controllers with write
cache, SSDs etc.).

All of that requires memory. We shouldn't dodge a work_mem limit by
using the kernel's memory, instead.

Also, if hash-join does not worry about number of batches, why should
hashagg worry about that? I expect the I/O patterns to be very similar.

One difference with HashJoin is that, to create a large number of
batches, the inner side must be huge, which is not the expected
operating mode for HashJoin[1]. Regardless, every partition that is
active *does* have a memory cost. HashJoin might ignore that cost, but
that doesn't make it right.

I think the right analogy here is to Sort's poly-phase merge -- it
doesn't merge all of the runs at once; see the comments at the top of
tuplesort.c.

In other words, sometimes it's better to have fewer partitions (for
hashing) or merge fewer runs at once (for sorting). It does more
repeated I/O, but the I/O is more sequential.

In any case, trying to fix this by limiting number of partitions seems
like a bad approach. I think factoring those concerns into a costing
model is more appropriate.

Fair enough. I haven't modeled the cost yet; and I agree that an upper
limit is quite crude.

(a) COUNT(DISTINCT) -> this is solved by a custom aggregate

Is there a reason we can't offer a hash-based strategy for this one? It
would have to be separate hash tables for different aggregates, but it
seems like it could work.

(b) bad estimate of required memory -> this is common for aggregates
passing 'internal' state (planner uses some quite high defaults)

Maybe some planner hooks? Ideas?

Regards,
Jeff Davis

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#5)
Re: 9.5: Memory-bounded HashAgg

On 13 Srpen 2014, 7:02, Jeff Davis wrote:

On Tue, 2014-08-12 at 14:58 +0200, Tomas Vondra wrote:

CREATE AGGREGATE myaggregate (
...
SERIALIZE_FUNC = 'dump_data',
DESERIALIZE_FUNC = 'read_data',
...
);

Seems reasonable.

I don't see why it should get messy? In the end, you have a chunk of
data and a hash for it.

Perhaps it's fine; I'd have to see the approach.

It just means you need to walk through the hash table, look at the
hashes and dump ~50% of the groups to a file.

If you have fixed-size states, why would you *want* to remove the group?
What is gained?

You're right that for your batching algorithm (based on lookups), that's
not really needed, and keeping everything in memory is a good initial
approach.

My understanding of the batching algorithm (and I may be wrong on this
one) is that once you choose the number of batches, it's pretty much
fixed. Is that the case?

But what will happen in case of significant cardinality underestimate?
I.e. what will happen if you decide to use 16 batches, and then find
out 256 would be more appropriate? I believe you'll end up with batches
16x the size you'd want, most likely exceeding work_mem.

Do I understand that correctly?

But back to the removal of aggregate states from memory (irrespectedly
of the size) - this is what makes the hashjoin-style batching possible,
because it:

(a) makes the batching decision simple (peeking at hash)
(b) makes it possible to repeatedly increase the number of batches
(c) provides a simple trigger for the increase of batch count

Some of this might be achievable even with keeping the states in memory.
I mean, you can add more batches on the fly, and handle this similarly
to hash join, while reading tuples from the batch (moving the tuples to
the proper batch, if needed).

The problem is that once you have the memory full, there's no trigger
to alert you that you should increase the number of batches again.

One thing I like about my simple approach is that it returns a good
number of groups after each pass, and then those are completely finished
(returned to the operator above, even). That's impossible with HashJoin
because the hashing all needs to be done before the probe phase begins.

The hash-join approach returns ~1/N groups after each pass, so I fail to
see how this is better?

The weakness of my approach is the array_agg case that you mention,
because this approach doesn't offer a way to dump out transition states.
It seems like that could be added later, but let me know if you see a
problem there.

Right. Let's not solve this in the first version of the patch.

I think you're missing the point, here. You need to compute the hash in
both cases. And then you either can do a lookup or just peek at the
first
few bits of the hash to see whether it's in the current batch or not.

I understood that. The point I was trying to make (which might or might
not be true) was that: (a) this only matters for a failed lookup,
because a successful lookup would just go in the hash table anyway; and
(b) a failed lookup probably doesn't cost much compared to all of the
other things that need to happen along that path.

OK. I don't have numbers proving otherwise at hand, and you're probably
right that once the batching kicks in, the other parts are likely more
expensive than this.

I should have chosen a better example though. For instance: if the
lookup fails, we need to write the tuple, and writing the tuple is sure
to swamp the cost of a failed hash lookup.

is much faster than a lookup. Also, as the hash table grows (beyond L3
cache size, which is a few MBs today), it becomes much slower in my
experience - that's one of the lessons I learnt while hacking on the
hashjoin. And we're dealing with hashagg not fitting into work_mem, so
this seems to be relevant.

Could be, but this is also the path that goes to disk, so I'm not sure
how significant it is.

It may or may not go to the disk, actually. The fact that you're doing
batching means it's written to a temporary file, but with large amounts
of RAM it may not get written to disk.

That's because the work_mem is only a very soft guarantee - a query may
use multiple work_mem buffers in a perfectly legal way. So the users ten
to set this rather conservatively. For example we have >256GB of RAM in
each machine, usually <24 queries running at the same time and yet we
have only work_mem=800MB. On the few occasions when a hash join is
batched, it usually remains in page cache and never actually gets writte
to disk. Or maybe it gets written, but it's still in the page cache so
the backend never notices that.

It's true there are other costs though - I/O calls, etc. So it's not free.

Because I suspect there are costs in having an extra file around that
I'm not accounting for directly. We are implicitly assuming that the

OS

will keep around enough buffers for each BufFile to do sequential

writes

when needed. If we create a zillion partitions, then either we end up
with random I/O or we push the memory burden into the OS buffer cache.

Assuming I understand it correctly, I think this logic is broken. Are
you
saying "We'll try to do memory-bounded hashagg, but not for the really
large datasets because of fear we might cause random I/O"?

No, the memory is still bounded even for very high cardinality inputs
(ignoring array_agg case for now). When a partition is processed later,
it also might exhaust work_mem, and need to write out tuples to its own
set of partitions. This allows memory-bounded execution to succeed even
if the number of partitions each iteration is one, though it will result
in repeated I/O for the same tuple.

Aha! And the new batches are 'private' to the work item, making it a bit
recursive, right? Is there any reason not to just double the number of
batches globally? I mean, why not to just say

nbatches *= 2

which effectively splits each batch into two? Half the groups stays
in the current one, half is moved to a new one.

It makes it almost perfectly sequential, because you're reading
a single batch, keeping half the tuples and writing the other half to
a new batch. If you increase the number of batches a bit more, e.g.

nbatches *= 4

then you're keeping 1/4 and writing into 3 new batches.

That seems like a better solution to me.

While I certainly understand your concerns about generating excessive
amount of random I/O, I think the modern filesystem are handling that
just
fine (coalescing the writes into mostly sequential writes, etc.). Also,
current hardware is really good at handling this (controllers with write
cache, SSDs etc.).

All of that requires memory. We shouldn't dodge a work_mem limit by
using the kernel's memory, instead.

Sure, saving memory at one place just to waste it somewhere else is
a poor solution. But I don't think work_mem is a memory-saving tool.
I see it as a memory-limiting protection.

Also, if hash-join does not worry about number of batches, why should
hashagg worry about that? I expect the I/O patterns to be very similar.

One difference with HashJoin is that, to create a large number of
batches, the inner side must be huge, which is not the expected
operating mode for HashJoin[1]. Regardless, every partition that is
active *does* have a memory cost. HashJoin might ignore that cost, but
that doesn't make it right.

I think the right analogy here is to Sort's poly-phase merge -- it
doesn't merge all of the runs at once; see the comments at the top of
tuplesort.c.

In other words, sometimes it's better to have fewer partitions (for
hashing) or merge fewer runs at once (for sorting). It does more
repeated I/O, but the I/O is more sequential.

OK. I don't have a clear opinion on this yet. I don't think the costs
are that high, but maybe I'm wrong in this.

It also seems to me that using HASH_DISK_MAX_PARTITIONS, and then allowing
each work item to create it's own set of additional partitions effectively
renders the HASH_DISK_MAX_PARTITIONS futile.

In any case, trying to fix this by limiting number of partitions seems
like a bad approach. I think factoring those concerns into a costing
model is more appropriate.

Fair enough. I haven't modeled the cost yet; and I agree that an upper
limit is quite crude.

OK, let's keep the HASH_DISK_MAX_PARTITIONS for now and improve this later.

(a) COUNT(DISTINCT) -> this is solved by a custom aggregate

Is there a reason we can't offer a hash-based strategy for this one? It
would have to be separate hash tables for different aggregates, but it
seems like it could work.

I don't know what is the exact reasoning, but apparently it's how the
current planner works. Whenever it sees COUNT(DISTINCT) it enforces a
sort. I suspect this is because of fear of memory requirements (because
a distinct requires keeping all the items), which might have been
perfectly valid when this was designed.

(b) bad estimate of required memory -> this is common for aggregates
passing 'internal' state (planner uses some quite high defaults)

Maybe some planner hooks? Ideas?

My plan is to add this to the CREATE AGGREGATE somehow - either as a
constant parameter (allowing to set a custom constant size) or a callback
to a 'sizing' function (estimating the size based on number of items,
average width and ndistinct in the group). In any case, this is
independent of this patch.

I think that for this patch we may either keep the current batching
strategy (and proceed with the TODO items you listed in your first patch).

Or we may investigate the alternative (hash-join-like) batching strategy.
I suppose this may be done after the TODO items, but I'm afrait it may
impact some of them (e.g. the costing). This can be done with the
simple aggregates (using fixed-size types for states), but eventually
it will require adding the serialize/deserialize to CREATE AGGREGATE.

Now, I'm very in favor of the #2 choice (because that's what works best
with the aggregates I need to use), but I'm also a big fan of the
'availability beats unavailable features 100% of the time' principle.

So if you decide to go for #1 now, I'm fine with that. I'm open to do
the next step - either as a follow-up patch, or maybe as an alternative
spin-off of your patch.

regards
Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Tomas Vondra (#6)
Re: 9.5: Memory-bounded HashAgg

On 13.8.2014 12:31, Tomas Vondra wrote:

On 13 Srpen 2014, 7:02, Jeff Davis wrote:

On Tue, 2014-08-12 at 14:58 +0200, Tomas Vondra wrote:

(b) bad estimate of required memory -> this is common for aggregates
passing 'internal' state (planner uses some quite high defaults)

Maybe some planner hooks? Ideas?

My plan is to add this to the CREATE AGGREGATE somehow - either as a
constant parameter (allowing to set a custom constant size) or a callback
to a 'sizing' function (estimating the size based on number of items,
average width and ndistinct in the group). In any case, this is
independent of this patch.

FWIW, the constant parameter is already implemented for 9.4. Adding the
function seems possible - the most difficult part seems to be getting
all the necessary info before count_agg_clauses() is called. For example
now dNumGroups is evaluated after the call (and tuples/group seems like
a useful info for sizing).

While this seems unrelated to the patch discussed here, it's true that:

(a) good estimate of the memory is important for initial estimate of
batch count

(b) dynamic increase of batch count alleviates issues from
underestimating the amount of memory necessary for states

But let's leave this out of scope for the current patch.

regards
Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Jeff Davis
pgsql@j-davis.com
In reply to: Tomas Vondra (#6)
Re: 9.5: Memory-bounded HashAgg

I think the hash-join like approach is reasonable, but I also think
you're going to run into a lot of challenges that make it more complex
for HashAgg. For instance, let's say you have the query:

SELECT x, array_agg(y) FROM foo GROUP BY x;

Say the transition state is an array (for the sake of simplicity), so
the hash table has something like:

1000 => {7, 8, 9}
1001 => {12, 13, 14}

You run out of memory and need to split the hash table, so you scan the
hash table and find that group 1001 needs to be written to disk. So you
serialize the key and array and write them out.

Then the next tuple you get is (1001, 19). What do you do? Create a new
group 1001 => {19} (how do you combine it later with the first one)? Or
try to fetch the existing group 1001 from disk and advance it (horrible
random I/O)?

On Wed, 2014-08-13 at 12:31 +0200, Tomas Vondra wrote:

My understanding of the batching algorithm (and I may be wrong on this
one) is that once you choose the number of batches, it's pretty much
fixed. Is that the case?

It's only fixed for that one "work item" (iteration). A different K can
be selected if memory is exhausted again. But you're right: this is a
little less flexible than HashJoin.

But what will happen in case of significant cardinality underestimate?
I.e. what will happen if you decide to use 16 batches, and then find
out 256 would be more appropriate? I believe you'll end up with batches
16x the size you'd want, most likely exceeding work_mem.

Yes, except that work_mem would never be exceeded. If the partitions are
16X work_mem, then each would be added as another work_item, and
hopefully it would choose better the next time.

One thing I like about my simple approach is that it returns a good
number of groups after each pass, and then those are completely finished
(returned to the operator above, even). That's impossible with HashJoin
because the hashing all needs to be done before the probe phase begins.

The hash-join approach returns ~1/N groups after each pass, so I fail to
see how this is better?

You can't return any tuples until you begin the probe phase, and that
doesn't happen until you've hashed the entire inner side (which involves
splitting and other work). With my patch, it will return some tuples
after the first scan. Perhaps I'm splitting hairs here, but the idea of
finalizing some groups as early as possible seems appealing.

Aha! And the new batches are 'private' to the work item, making it a bit
recursive, right? Is there any reason not to just double the number of
batches globally?

I didn't quite follow this proposal.

It also seems to me that using HASH_DISK_MAX_PARTITIONS, and then allowing
each work item to create it's own set of additional partitions effectively
renders the HASH_DISK_MAX_PARTITIONS futile.

It's the number of active partitions that matter, because that's what
causes the random I/O.

Regards,
Jeff Davis

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Tom Lane
tgl@sss.pgh.pa.us
In reply to: Jeff Davis (#8)
Re: 9.5: Memory-bounded HashAgg

Jeff Davis <pgsql@j-davis.com> writes:

I think the hash-join like approach is reasonable, but I also think
you're going to run into a lot of challenges that make it more complex
for HashAgg. For instance, let's say you have the query:

SELECT x, array_agg(y) FROM foo GROUP BY x;

Say the transition state is an array (for the sake of simplicity), so
the hash table has something like:

1000 => {7, 8, 9}
1001 => {12, 13, 14}

You run out of memory and need to split the hash table, so you scan the
hash table and find that group 1001 needs to be written to disk. So you
serialize the key and array and write them out.

Then the next tuple you get is (1001, 19). What do you do? Create a new
group 1001 => {19} (how do you combine it later with the first one)? Or
try to fetch the existing group 1001 from disk and advance it (horrible
random I/O)?

If you're following the HashJoin model, then what you do is the same thing
it does: you write the input tuple back out to the pending batch file for
the hash partition that now contains key 1001, whence it will be processed
when you get to that partition. I don't see that there's any special case
here.

The fly in the ointment is how to serialize a partially-computed aggregate
state value to disk, if it's not of a defined SQL type.

regards, tom lane

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#8)
Re: 9.5: Memory-bounded HashAgg

On 14 Srpen 2014, 9:22, Jeff Davis wrote:

I think the hash-join like approach is reasonable, but I also think
you're going to run into a lot of challenges that make it more complex
for HashAgg. For instance, let's say you have the query:

SELECT x, array_agg(y) FROM foo GROUP BY x;

Say the transition state is an array (for the sake of simplicity), so
the hash table has something like:

1000 => {7, 8, 9}
1001 => {12, 13, 14}

You run out of memory and need to split the hash table, so you scan the
hash table and find that group 1001 needs to be written to disk. So you
serialize the key and array and write them out.

Then the next tuple you get is (1001, 19). What do you do? Create a new
group 1001 => {19} (how do you combine it later with the first one)? Or
try to fetch the existing group 1001 from disk and advance it (horrible
random I/O)?

No, that's not how it works. The batching algorithm works with a hash of
the group. For example let's suppose you do this:

batchno = hash % nbatches;

which essentially keeps the last few bits of the hash. 0 bits for
nbatches=1, 1 bit for nbatches=2, 2 bits for nbatches=4 etc.

So let's say we have 2 batches, and we're working on the first batch.
That means we're using 1 bit:

batchno = hash % 2;

and for the first batch we're keeping only groups with batchno=0. So
only groups with 0 as the last bit are in batchno==0.

When running out of memory, you simply do

nbatches *= 2

and start considering one more bit from the hash. So if you had this
before:

group_a => batchno=0 => {7, 8, 9}
group_b => batchno=0 => {12, 13, 14}
group_c => batchno=0 => {23, 1, 45}
group_d => batchno=0 => {77, 37, 54}

(where batchno is a bit string), after doubling the number of batches
you get something like this:

group_a => batchno=10 => {7, 8, 9}
group_b => batchno=00 => {12, 13, 14}
group_c => batchno=00 => {23, 1, 45}
group_d => batchno=10 => {77, 37, 54}

So you have only two possible batchno values here, depending on the new
most-significant bit - either you got 0 (which means it's still in the
current batch) or 1 (and you need to move it to the temp file of the
new batch).

Then, when you get a new tuple, you get it's hash and do a simple check
of the last few bits - effectively computing batchno just like before

batchno = hash % nbatches;

Either it belongs to the current batch (and either it's in the hash
table, or you add it there), or it's not - in that case write it to a
temp file.

It gets a bit more complex when you increase the number of batches
repeatedly (effectively you need to do the check/move when reading the
batches).

For sure, it's not for free - it may write to quite a few files. Is it
more expensive than what you propose? I'm not sure about that. With
your batching scheme, you'll end up with lower number of large batches,
and you'll need to read and split them, possibly repeatedly. The
batching scheme from hashjoin minimizes this.

IMHO the only way to find out is to some actual tests.

On Wed, 2014-08-13 at 12:31 +0200, Tomas Vondra wrote:

My understanding of the batching algorithm (and I may be wrong on this
one) is that once you choose the number of batches, it's pretty much
fixed. Is that the case?

It's only fixed for that one "work item" (iteration). A different K can
be selected if memory is exhausted again. But you're right: this is a
little less flexible than HashJoin.

But what will happen in case of significant cardinality underestimate?
I.e. what will happen if you decide to use 16 batches, and then find
out 256 would be more appropriate? I believe you'll end up with batches
16x the size you'd want, most likely exceeding work_mem.

Yes, except that work_mem would never be exceeded. If the partitions are
16X work_mem, then each would be added as another work_item, and
hopefully it would choose better the next time.

Only for aggregates with fixed-length state. For aggregates with growing
serialize/deserialize, the states may eventually exceeding work_mem.

One thing I like about my simple approach is that it returns a good
number of groups after each pass, and then those are completely

finished

(returned to the operator above, even). That's impossible with

HashJoin

because the hashing all needs to be done before the probe phase

begins.

The hash-join approach returns ~1/N groups after each pass, so I fail to
see how this is better?

You can't return any tuples until you begin the probe phase, and that
doesn't happen until you've hashed the entire inner side (which involves
splitting and other work). With my patch, it will return some tuples
after the first scan. Perhaps I'm splitting hairs here, but the idea of
finalizing some groups as early as possible seems appealing.

I fail to see how this is different from your approach? How can you
output any tuples before processing the whole inner relation?

After the first scan, the hash-join approach is pretty much guaranteed
to output ~1/N tuples.

Aha! And the new batches are 'private' to the work item, making it a bit
recursive, right? Is there any reason not to just double the number of
batches globally?

I didn't quite follow this proposal.

Again, it's about a difference between your batching approach and the
hashjoin-style batching. The hashjoin batching keeps a single level of
batches, and when hitting work_mem just doubles the number of batches.

Your approach is to do multi-level batching, and I was thinking whether
it'd be possible to use the same approach (single level). But in
retrospect it probably does not make much sense, because the multi-level
batching is one of the points of the proposed approach.

It also seems to me that using HASH_DISK_MAX_PARTITIONS, and then
allowing
each work item to create it's own set of additional partitions
effectively
renders the HASH_DISK_MAX_PARTITIONS futile.

It's the number of active partitions that matter, because that's what
causes the random I/O.

OK, point taken. While I share the general concern about random I/O,
I'm not sure this case is particularly problematic.

regard
Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#11Jeff Davis
pgsql@j-davis.com
In reply to: Tom Lane (#9)
Re: 9.5: Memory-bounded HashAgg

On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:

If you're following the HashJoin model, then what you do is the same thing
it does: you write the input tuple back out to the pending batch file for
the hash partition that now contains key 1001, whence it will be processed
when you get to that partition. I don't see that there's any special case
here.

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

Regards,
Jeff Davis

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Atri Sharma
atri.jiit@gmail.com
In reply to: Jeff Davis (#11)
Re: 9.5: Memory-bounded HashAgg

On Thursday, August 14, 2014, Jeff Davis <pgsql@j-davis.com> wrote:

On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:

If you're following the HashJoin model, then what you do is the same

thing

it does: you write the input tuple back out to the pending batch file for
the hash partition that now contains key 1001, whence it will be

processed

when you get to that partition. I don't see that there's any special

case

here.

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

+1

Not to mention future cases if we start maintaining multiple state
values,in regarded to grouping sets.

Regards,

Atri

--
Regards,

Atri
*l'apprenant*

#13Tom Lane
tgl@sss.pgh.pa.us
In reply to: Jeff Davis (#11)
Re: 9.5: Memory-bounded HashAgg

Jeff Davis <pgsql@j-davis.com> writes:

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

Not sure that I follow your point. You're going to have to deal with that
no matter what, no?

I guess in principle you could avoid the need to dump agg state to disk.
What you'd have to do is write out tuples to temp files even when you
think you've processed them entirely, so that if you later realize you
need to split the current batch, you can recompute the states of the
postponed aggregates from scratch (ie from the input tuples) when you get
around to processing the batch they got moved to. This would avoid
confronting the how-to-dump-agg-state problem, but it seems to have little
else to recommend it. Even if splitting a batch is a rare occurrence,
the killer objection here is that even a totally in-memory HashAgg would
have to write all its input to a temp file, on the small chance that it
would exceed work_mem and need to switch to batching.

regards, tom lane

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Tom Lane (#13)
Re: 9.5: Memory-bounded HashAgg

On 14 Srpen 2014, 18:12, Tom Lane wrote:

Jeff Davis <pgsql@j-davis.com> writes:

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

Not sure that I follow your point. You're going to have to deal with that
no matter what, no?

That is not how the patch work. Once the memory consumption hits work_mem,
it keeps the already existing groups in memory, and only stops creating
new groups. For each tuple, hashagg does a lookup - if the group is
already in memory, it performs the transition, otherwise it writes the
tuple to disk (and does some batching, but that's mostly irrelevant here).

This way it's not necessary to dump the partially-computed states, and for
fixed-size states it actually limits the amount of consumed memory. For
variable-length aggregates (array_agg et.al.) not so much.

I guess in principle you could avoid the need to dump agg state to disk.
What you'd have to do is write out tuples to temp files even when you
think you've processed them entirely, so that if you later realize you
need to split the current batch, you can recompute the states of the
postponed aggregates from scratch (ie from the input tuples) when you get
around to processing the batch they got moved to. This would avoid
confronting the how-to-dump-agg-state problem, but it seems to have little
else to recommend it. Even if splitting a batch is a rare occurrence,
the killer objection here is that even a totally in-memory HashAgg would
have to write all its input to a temp file, on the small chance that it
would exceed work_mem and need to switch to batching.

Yeah, I think putting this burden on each hashagg is not a good thing.

I was thinking about is an automatic fall-back - try to do an in-memory
hash-agg. When you hit work_mem limit, see how far we are (have we scanned
10% or 90% of tuples?), and decide whether to restart with batching.

But I think there's no single solution, fixing all the possible cases. I
think the patch proposed here is a solid starting point, that may be
improved and extended by further patches. Eventually, what I think might
work is this combination of approaches:

1) fixed-size states and states with serialize/deserialize methods

=> hashjoin-like batching (i.e. dumping both tuples and states)

2) variable-size states without serialize/deserialize

=> Jeff's approach (keep states in memory, dump tuples)
=> possibly with the rescan fall-back, for quickly growing states

Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#15Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Atri Sharma (#12)
Re: 9.5: Memory-bounded HashAgg

On 14 Srpen 2014, 18:02, Atri Sharma wrote:

On Thursday, August 14, 2014, Jeff Davis <pgsql@j-davis.com> wrote:

On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:

If you're following the HashJoin model, then what you do is the same

thing

it does: you write the input tuple back out to the pending batch file

for

the hash partition that now contains key 1001, whence it will be

processed

when you get to that partition. I don't see that there's any special

case

here.

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

+1

Not to mention future cases if we start maintaining multiple state
values,in regarded to grouping sets.

So what would you do for aggregates where the state is growing quickly?
Say, things like median() or array_agg()?

I think that "we can't do that for all aggregates" does not imply "we must
not do that at all."

There will always be aggregates not implementing dumping state for various
reasons, and in those cases the proposed approach is certainly a great
improvement. I like it, and I hope it will get committed.

But maybe for aggregates supporting serialize/deserialize of the state
(including all aggregates using known types, not just fixed-size types) a
hashjoin-like batching would be better? I can name a few custom aggregates
that'd benefit tremendously from this.

Just to be clear - this is certainly non-trivial to implement, and I'm not
trying to force anyone (e.g. Jeff) to implement the ideas I proposed. I'm
ready to spend time on reviewing the current patch, implement the approach
I proposed and compare the behaviour.

Kudos to Jeff for working on this.

Tomas

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#16Tom Lane
tgl@sss.pgh.pa.us
In reply to: Tomas Vondra (#14)
Re: 9.5: Memory-bounded HashAgg

"Tomas Vondra" <tv@fuzzy.cz> writes:

On 14 Srpen 2014, 18:12, Tom Lane wrote:

Not sure that I follow your point. You're going to have to deal with that
no matter what, no?

That is not how the patch work. Once the memory consumption hits work_mem,
it keeps the already existing groups in memory, and only stops creating
new groups.

Oh? So if we have aggregates like array_agg whose memory footprint
increases over time, the patch completely fails to avoid bloat?

I might think a patch with such a limitation was useful, if it weren't
for the fact that aggregates of that nature are a large part of the
cases where the planner misestimates the table size in the first place.
Any complication that we add to nodeAgg should be directed towards
dealing with cases that the planner is likely to get wrong.

regards, tom lane

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Jeff Davis
pgsql@j-davis.com
In reply to: Tomas Vondra (#10)
Re: 9.5: Memory-bounded HashAgg

On Thu, 2014-08-14 at 16:17 +0200, Tomas Vondra wrote:

Either it belongs to the current batch (and either it's in the hash
table, or you add it there), or it's not - in that case write it to a
temp file.

I think the part you left out is that you need two files per batch: one
for the dumped-out partially-computed state values, and one for the
tuples.

In other words, you haven't really discussed the step where you reunite
the tuples with that partially-computed state.

For sure, it's not for free - it may write to quite a few files. Is it
more expensive than what you propose? I'm not sure about that. With
your batching scheme, you'll end up with lower number of large batches,
and you'll need to read and split them, possibly repeatedly. The
batching scheme from hashjoin minimizes this.

My approach only has fewer batches if it elects to have fewer batches,
which might happen for two reasons:
1. A cardinality misestimate. This certainly could happen, but we do
have useful numbers to work from (we know the number of tuples and
distincts that we've read so far), so it's far from a blind guess.
2. We're concerned about the random I/O from way too many partitions.

I fail to see how this is different from your approach? How can you
output any tuples before processing the whole inner relation?

Right, the only thing I avoid is scanning the hash table and dumping out
the groups.

This isn't a major distinction, more like "my approach does a little
less work before returning tuples", and I'm not even sure I can defend
that, so I'll retract this point.

Your approach is to do multi-level batching, and I was thinking whether
it'd be possible to use the same approach (single level). But in
retrospect it probably does not make much sense, because the multi-level
batching is one of the points of the proposed approach.

Now that I think about it, many of the points we discussed could
actually work with either approach:
* In my approach, if I need more partitions, I could create more in
much the same way as HashJoin to keep it single-level (as you suggest
above).
* In your approach, if there are too many partitions, you could avoid
random I/O by intentionally putting tuples from multiple partitions in a
single file and moving them while reading.
* If given a way to write out the partially-computed states, I could
evict some groups from the hash table to keep an array_agg() bounded.

Our approaches only differ on one fundamental trade-off that I see:
(A) My approach requires a hash lookup of an already-computed hash for
every incoming tuple, not only the ones going into the hash table.
(B) Your approach requires scanning the hash table and dumping out the
states every time the hash table fills up, which therefore requires a
way to dump out the partial states.

You could probably win the argument by pointing out that (A) is O(N) and
(B) is O(log2(N)). But I suspect that cost (A) is very low.

Unfortunately, it would take some effort to test your approach because
we'd actually need a way to write out the partially-computed state, and
the algorithm itself seems a little more complex. So I'm not really sure
how to proceed.

Regards,
Jeff Davis

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#18Atri Sharma
atri.jiit@gmail.com
In reply to: Tomas Vondra (#15)
Re: 9.5: Memory-bounded HashAgg

On Thu, Aug 14, 2014 at 10:21 PM, Tomas Vondra <tv@fuzzy.cz> wrote:

On 14 Srpen 2014, 18:02, Atri Sharma wrote:

On Thursday, August 14, 2014, Jeff Davis <pgsql@j-davis.com> wrote:

On Thu, 2014-08-14 at 10:06 -0400, Tom Lane wrote:

If you're following the HashJoin model, then what you do is the same

thing

it does: you write the input tuple back out to the pending batch file

for

the hash partition that now contains key 1001, whence it will be

processed

when you get to that partition. I don't see that there's any special

case

here.

HashJoin only deals with tuples. With HashAgg, you have to deal with a
mix of tuples and partially-computed aggregate state values. Not
impossible, but it is a little more awkward than HashJoin.

+1

Not to mention future cases if we start maintaining multiple state
values,in regarded to grouping sets.

So what would you do for aggregates where the state is growing quickly?
Say, things like median() or array_agg()?

I think that "we can't do that for all aggregates" does not imply "we must
not do that at all."

There will always be aggregates not implementing dumping state for various
reasons, and in those cases the proposed approach is certainly a great
improvement. I like it, and I hope it will get committed.

But maybe for aggregates supporting serialize/deserialize of the state
(including all aggregates using known types, not just fixed-size types) a
hashjoin-like batching would be better? I can name a few custom aggregates
that'd benefit tremendously from this.

Yeah, could work, but is it worth adding additional paths (assuming this
patch gets committed) for some aggregates? I think we should do a further
analysis on the use case.

Just to be clear - this is certainly non-trivial to implement, and I'm not
trying to force anyone (e.g. Jeff) to implement the ideas I proposed. I'm
ready to spend time on reviewing the current patch, implement the approach
I proposed and compare the behaviour.

Totally agreed. It would be a different approach, albeit as you said, the
approach can be done off the current patch.

Kudos to Jeff for working on this.

Agreed :)

--
Regards,

Atri
*l'apprenant*

#19Jeff Davis
pgsql@j-davis.com
In reply to: Tom Lane (#16)
Re: 9.5: Memory-bounded HashAgg

On Thu, 2014-08-14 at 12:53 -0400, Tom Lane wrote:

Oh? So if we have aggregates like array_agg whose memory footprint
increases over time, the patch completely fails to avoid bloat?

Yes, in its current form.

I might think a patch with such a limitation was useful, if it weren't
for the fact that aggregates of that nature are a large part of the
cases where the planner misestimates the table size in the first place.
Any complication that we add to nodeAgg should be directed towards
dealing with cases that the planner is likely to get wrong.

In my experience, the planner has a lot of difficulty estimating the
cardinality unless it's coming from a base table without any operators
above it (other than maybe a simple predicate). This is probably a lot
more common than array_agg problems, simply because array_agg is
relatively rare compared with GROUP BY in general.

Also, there are also cases where my patch should win against Sort even
when it does go to disk. For instance, high enough cardinality to exceed
work_mem, but also a large enough group size. Sort will have to deal
with all of the tuples before it can group any of them, whereas HashAgg
can group at least some of them along the way.

Consider the skew case where the cardinality is 2M, work_mem fits 1M
groups, and the input consists of the keys 1..1999999 mixed randomly
inside one billion zeros. (Aside: if the input is non-random, you may
not get the skew value before the hash table fills up, in which case
HashAgg is just as bad as Sort.)

That being said, we can hold out for an array_agg fix if desired. As I
pointed out in another email, my proposal is compatible with the idea of
dumping groups out of the hash table, and does take some steps in that
direction.

Regards,
Jeff Davis

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#20Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#17)
Re: 9.5: Memory-bounded HashAgg

On 14.8.2014 18:54, Jeff Davis wrote:

On Thu, 2014-08-14 at 16:17 +0200, Tomas Vondra wrote:

Either it belongs to the current batch (and either it's in the hash
table, or you add it there), or it's not - in that case write it to a
temp file.

I think the part you left out is that you need two files per batch: one
for the dumped-out partially-computed state values, and one for the
tuples.

In other words, you haven't really discussed the step where you reunite
the tuples with that partially-computed state.

No, that's not how the serialize/deserialize should work. The aggregate
needs to store the state as-is, so that after deserializing it gets
pretty much the same thing.

For example, for 'median' the state is the list of all the values
received so far, and when serializing it you have to write all the
values out. After deserializing it, you will get the same list of values.

Some aggregates may use complex data structures that may need more
elaborate serialize.

For sure, it's not for free - it may write to quite a few files. Is it
more expensive than what you propose? I'm not sure about that. With
your batching scheme, you'll end up with lower number of large batches,
and you'll need to read and split them, possibly repeatedly. The
batching scheme from hashjoin minimizes this.

My approach only has fewer batches if it elects to have fewer batches,
which might happen for two reasons:
1. A cardinality misestimate. This certainly could happen, but we do
have useful numbers to work from (we know the number of tuples and
distincts that we've read so far), so it's far from a blind guess.
2. We're concerned about the random I/O from way too many partitions.

OK. We can't really do much with the cardinality estimate.

As for the random IO concerns, I did a quick test to see how this
behaves. I used a HP ProLiant DL380 G5 (i.e. a quite old machine, from
2006-09 if I'm not mistaken). 16GB RAM, RAID10 on 6 x 10k SAS drives,
512MB write cache. So a quite lousy machine, considering today's standards.

I used a simple C program (attached) that creates N files, and writes
into them in a round-robin fashion until a particular file size is
reached. I opted for 64GB total size, 1kB writes.

./iotest filecount filesize writesize

File size is in MB, writesize is in bytes. So for example this writes 64
files, each 1GB, using 512B writes.

./iotest 64 1024 512

Measured is duration before/after fsync (in seconds):

files | file size | before fsync | after fsync
---------------------------------------------------------
32 | 2048 | 290.16 | 294.33
64 | 1024 | 264.68 | 267.60
128 | 512 | 278.68 | 283.44
256 | 256 | 332.11 | 338.45
1024 | 64 | 419.91 | 425.48
2048 | 32 | 450.37 | 455.20

So while there is a difference, I don't think it's the 'random I/O wall'
as usually observed on rotational drives. Also, this is 2.6.32 kernel,
and my suspicion is that with a newer one the behaviour would be better.

I also have an SSD in that machine (Intel S3700), so I did the same test
with these results:

files | file size | before fsync | after fsync
---------------------------------------------------------
32 | 2048 | 445.05 | 464.73
64 | 1024 | 447.32 | 466.56
128 | 512 | 446.63 | 465.90
256 | 256 | 446.64 | 466.19
1024 | 64 | 511.85 | 523.24
2048 | 32 | 579.92 | 590.76

So yes, the number of files matter, but I don't think it's strong enough
to draw a clear line on how many batches we allow. Especially
considering how old this machine is (on 3.x kernels, we usually see much
better performance in I/O intensive conditions).

I fail to see how this is different from your approach? How can you
output any tuples before processing the whole inner relation?

Right, the only thing I avoid is scanning the hash table and dumping out
the groups.

This isn't a major distinction, more like "my approach does a little
less work before returning tuples", and I'm not even sure I can defend
that, so I'll retract this point.

Your approach is to do multi-level batching, and I was thinking whether
it'd be possible to use the same approach (single level). But in
retrospect it probably does not make much sense, because the multi-level
batching is one of the points of the proposed approach.

Now that I think about it, many of the points we discussed could
actually work with either approach:
* In my approach, if I need more partitions, I could create more in
much the same way as HashJoin to keep it single-level (as you suggest
above).
* In your approach, if there are too many partitions, you could avoid
random I/O by intentionally putting tuples from multiple partitions in a
single file and moving them while reading.
* If given a way to write out the partially-computed states, I could
evict some groups from the hash table to keep an array_agg() bounded.

Our approaches only differ on one fundamental trade-off that I see:
(A) My approach requires a hash lookup of an already-computed hash for
every incoming tuple, not only the ones going into the hash table.
(B) Your approach requires scanning the hash table and dumping out the
states every time the hash table fills up, which therefore requires a
way to dump out the partial states.

You could probably win the argument by pointing out that (A) is O(N) and
(B) is O(log2(N)). But I suspect that cost (A) is very low.

Unfortunately, it would take some effort to test your approach because
we'd actually need a way to write out the partially-computed state, and
the algorithm itself seems a little more complex. So I'm not really sure
how to proceed.

I plan to work on this a bit over the next week or two. In any case,
it'll be a limited implementation, but hopefully it will be usable for
some initial testing.

regards
Tomas

Attachments:

iotest.ctext/x-csrc; name=iotest.cDownload
#21Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Tomas Vondra (#20)
#22Robert Haas
robertmhaas@gmail.com
In reply to: Jeff Davis (#19)
#23Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#1)
#24Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Robert Haas (#22)
#25Jeff Davis
pgsql@j-davis.com
In reply to: Robert Haas (#22)
#26Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#25)
#27Robert Haas
robertmhaas@gmail.com
In reply to: Tomas Vondra (#24)
#28Jeff Davis
pgsql@j-davis.com
In reply to: Robert Haas (#27)
#29Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Jeff Davis (#28)
#30Jeff Davis
pgsql@j-davis.com
In reply to: Heikki Linnakangas (#29)
#31Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#30)
#32Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Tomas Vondra (#31)
#33Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Tomas Vondra (#32)
#34Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Robert Haas (#27)
#35Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Tomas Vondra (#34)
#36Robert Haas
robertmhaas@gmail.com
In reply to: Tomas Vondra (#34)
#37Jeff Davis
pgsql@j-davis.com
In reply to: Jeff Davis (#1)
#38Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: Jeff Davis (#37)
#39Jeff Davis
pgsql@j-davis.com
In reply to: Jeff Davis (#37)