Adjusting hash join memory limit to handle batch explosion
Hi,
I've been once again reminded of the batch explosion issue in hashjoin,
due to how it enforces the memory limit. This resurfaces every now and
then, when a used gets strange OOM issues - see for example these
threads from ~2019 for an example, and even some patches: [1]/messages/by-id/20190504003414.bulcbnge3rhwhcsh@development [2]/messages/by-id/20230228190643.1e368315@karst [3]/messages/by-id/bc138e9f-c89e-9147-5395-61d51a757b3b@gusw.net
Let me restart the discussion, resubmit some of the older patches, and
present a plan for what to do about this ...
Just to remind the basic details, a brief summary - the hashjoin does
not account for the spill files when enforcing the memory limit. The
hash table gets full, it decides to double the number of batches which
cuts the hash table size in half. But with enough batches the doubling
can actually make the situation much worse - the new batches simply use
more memory than was saved.
This can happen for various reasons. A simple example is that we under
estimate the size of the input relation, so the hash needs to be built
on many more tuples. This is bad, but usually not disastrous.
It's much worse when there's a batch that is not "divisible", i.e.
adding more batches does not split it roughly in half. This can happen
due to hash collisions (in the part that determines the batch),
duplicate values that didn't make it into MCV (and thus the skew
optimization does not kick in).
This is fairly rare, but when it happens it can easily lead to batch
explosion, i.e. rapidly increasing the number of batches. We add
batches, but the batch does not split, so we promptly hit the limit
again, triggering another increase. It often stops only when we exhaust
the 32-bit hash space, ending with 100s of thousands of batches.
Attached is a SQL script that reproduces something like this. It builds
a table with values with hashes that have 0s in the upper bits. And then
the hash join just spirals into a batch explosion.
Note: The script is a bit dumb and needs a lot of temp space (~50GB)
when generating the values with duplicate hashes.
In 2019 I shared a bunch of patches [4]/messages/by-id/20190428141901.5dsbge2ka3rxmpk6@development improving this, but then I got
distracted and the discussion stalled because there were proposals to
fix this by introducing a special hash join "mode" to address these
issues [5]/messages/by-id/CAAKRu_YsWm7gc_b2nBGWFPE6wuhdOLfc1LBZ786DUzaCPUDXCA@mail.gmail.com, but we never got past a prototype and there's a lot of
outstanding questions.
So I decided to revisit the three patches from 2019. Attached are
rebased and cleaned up versions. A couple comments on each one:
1) v20241231-adjust-limit-0001-Account-for-batch-files-in-ha.patch
I believe this is the way to go, for now. The basic idea is to keep the
overall behavior, but "relax" the memory limit as the number of batches
increases to minimize the total memory use.
This may seem a bit weird, but as the number of batches grows there's no
way to not violate the limit. And the current code simply ignores this
and allocates arbitrary amounts of memory.
2) v20241231-single-spill-0001-Hash-join-with-a-single-spill.patch
The basic idea is that we keep only a small "slice" of batches in
memory, and data for later batches are spilled into a single file. This
means that even if the number of batches increases, the memory use does
not change. Which means the memory limit is enforced very strictly.
The problem is this performs *terribly* because it shuffles data many
times, always just to the next slice. So if we happen to have 128
batches in memory and the number explodes to ~128k batches, we end up
reading/writing each tuple ~500x.
3) v20241231-multi-spill-0001-Hash-join-with-a-multiple-spil.patch
This is an improvement of the "single spill", except that we keep one
spill file per slice, which greatly reduces the amount of temporary
traffic. The trouble is this means we can no longer enforce the memory
limit that strictly, because the number of files does grow with the
number of batches, although not 1:1. But with a slice of 128 batches we
get only 1 file per 128 batches, which is a nice reduction.
This means that ultimately it's either (1) or (3), and the more I've
been looking into this the more I prefer (1), for a couple reasons:
* It's much simpler (it doesn't really change anything on the basic
behavior, doesn't introduce any new files or anything like that.
* There doesn't seem to be major difference in total memory consumption
between the two approaches. Both allow allocating more memory.
* It actually helps with the "indivisible batch" case - it relaxes the
limit, so there's a chance the batch eventually fits and we stop adding
more and more batches. With spill files that's not the case - we still
keep the original limit, and we end up with the batch explosion (but
then we handle it much more efficiently).
Unless there are some objections, my plan is to get (1) cleaned up and
try to get it in for 18, possibly in the January CF. It's not a
particularly complex patch, and it already passes check-world (it only
affected three plans in join_hash, and those make sense I think).
One thing I'm not sure about yet is whether this needs to tweak the
hashjoin costing to also consider the files when deciding how many
batches to use. Maybe it should?
regards
[1]: /messages/by-id/20190504003414.bulcbnge3rhwhcsh@development
/messages/by-id/20190504003414.bulcbnge3rhwhcsh@development
[2]: /messages/by-id/20230228190643.1e368315@karst
[3]: /messages/by-id/bc138e9f-c89e-9147-5395-61d51a757b3b@gusw.net
/messages/by-id/bc138e9f-c89e-9147-5395-61d51a757b3b@gusw.net
[4]: /messages/by-id/20190428141901.5dsbge2ka3rxmpk6@development
/messages/by-id/20190428141901.5dsbge2ka3rxmpk6@development
[5]: /messages/by-id/CAAKRu_YsWm7gc_b2nBGWFPE6wuhdOLfc1LBZ786DUzaCPUDXCA@mail.gmail.com
/messages/by-id/CAAKRu_YsWm7gc_b2nBGWFPE6wuhdOLfc1LBZ786DUzaCPUDXCA@mail.gmail.com
--
Tomas Vondra
Attachments:
v20241231-adjust-limit-0001-Account-for-batch-files-in-ha.patchtext/x-patch; charset=UTF-8; name=v20241231-adjust-limit-0001-Account-for-batch-files-in-ha.patchDownload
From edcaac0705193acea33c4423bf0d59128219a46f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Tue, 31 Dec 2024 17:09:40 +0100
Subject: [PATCH v20241231-adjust-limit] Account for batch files in hash join
spilling
Hash joins try to limit the amount of memory used by the Hash node by
only keeping a single batch in memory and spilling future batches to
disk. Unfortunately, the implementation does not account for the files
used for spilling, which can lead to issues with many batches. Each file
keeps a BLCKSZ buffer in memory, and we need 2*nbatches of those files,
so with many batches this may use substatial amounts of memory.
The hash join code however assumes adding batches is virtually free (in
terms of memory needed), ignoring this issue. It increases the number of
batches, possibly keeping the current batch within the limit, but ends
up using much more memory for the files.
This can be particularly painful with adversary data sets, with a batch
that can't be split. This may happen due to hash collisions (overlaps in
the part used to calculate "batch"), or a value with many duplicities
that however didn't make it to the MCV list (and thus the skew table
can't help). In these cases the hash can get into a cycle of increasing
the number of batches, often reaching 256k or 512k batches before
exhausting available hash space (32-bits).
If this happens, there's not much point in enforcing the original memory
limit. That's simply not feasible - especially in the case of a single
batch exceeding the allowed space.
Instead, the best we can do is relaxing the limit, and focusing on using
as little total memory as possible. By allowing the batches to be
larger, we reduce the number of batch files. The adjustment formula is
based on the observation that doubling the number of batches doubles the
amount of memory needed for the files, while cutting the batch size in
half. This defines the "break even" point for the next batch increase.
---
src/backend/executor/nodeHash.c | 153 +++++++++++++++++++++---
src/test/regress/expected/join_hash.out | 4 +-
src/test/regress/sql/join_hash.sql | 4 +-
3 files changed, 142 insertions(+), 19 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3e22d50e3a4..680d2897738 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -54,6 +54,9 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
uint32 hashvalue,
int bucketNumber);
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
+static bool ExecHashExceededMemoryLimit(HashJoinTable hashtable);
+static void ExecHashAdjustMemoryLimit(HashJoinTable hashtable);
static void *dense_alloc(HashJoinTable hashtable, Size size);
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable,
@@ -199,10 +202,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* update info about peak memory usage */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -1036,6 +1037,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
hashtable, nfreed, ninmemory, hashtable->spaceUsed);
#endif
+ /* adjust the memory limit for the new nbatches etc. */
+ ExecHashAdjustMemoryLimit(hashtable);
+
/*
* If we dumped out either all or none of the tuples in the table, disable
* further expansion of nbatch. This situation implies that we have
@@ -1673,11 +1677,12 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
- if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
- > hashtable->spaceAllowed)
+
+ /* update info about peak memory usage */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Should we add more batches, to enforce memory limit? */
+ if (ExecHashExceededMemoryLimit(hashtable))
ExecHashIncreaseNumBatches(hashtable);
}
else
@@ -1843,6 +1848,120 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+/*
+ * ExecHashUpdateSpacePeak
+ * Update information about peak memory usage.
+ *
+ * This considers tuples added to the hash table, buckets of the hash table
+ * itself, and also the bufferer batch files on both the inner and outer side.
+ * Each file has a BLCKSZ buffer, so with enough batches this may actually
+ * represent most of the memory used by the hash join node.
+ */
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* buckets of the hash table */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* buffered batch files (inner + outer), each has a BLCKSZ buffer */
+ spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+ /* if we exceeded the current peak, remember the new one */
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
+/*
+ * ExecHashMemoryLimitExceeded
+ * Check if the amount of memory used exceeds spaceAllowed.
+ *
+ * Check if the total amount of space used by the hash join exceeds the
+ * current value of spaceAllowed, and we should try to increase the number
+ * of batches.
+ *
+ * We need to consider both the data added to the hash and the hashtable
+ * itself (i.e. buckets), but also the files used for future batches.
+ * Each batch needs a file for inner/outer side, so we need (2*nbatch)
+ * files in total, and each BufFile has a BLCKSZ buffer. If we ignored
+ * the files and simply doubled the number of batches, we could easily
+ * increase the total amount of memory because while we expect to cut the
+ * batch size in half to, doubling the number of batches also doubles the
+ * amount of memory allocated by BufFile.
+ *
+ * That means doubling the number of batches is pointless when
+ *
+ * (spaceUsed / 2) < 2 * (nbatches * sizeof(BufFile))
+ *
+ * because it would result in allocating more memory than it saves.
+ *
+ * This is a temporary decision - we can't stop adding batches entirely,
+ * just until the hash table grows enough to make it a win again.
+ */
+static bool
+ExecHashExceededMemoryLimit(HashJoinTable hashtable)
+{
+ return (hashtable->spaceUsed +
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ hashtable->nbatch * sizeof(PGAlignedBlock) * 2
+ > hashtable->spaceAllowed);
+}
+
+/*
+ * ExecHashAdjustMemoryLimit
+ * Adjust the memory limit after increasing the number of batches.
+ *
+ * We can't keep the same spaceAllowed value, because as we keep adding
+ * batches we're guaranteed to exceed the older values simply thanks to
+ * the BufFile allocations.
+ *
+ * Instead, we consider the "break even" threshold for the current number
+ * of batches, add a bit of slack (so that we don't get into a cycle of
+ * incrementing number of batches), and calculate the new limit from that.
+ *
+ * For well estimated cases this should do nothing, as the batches are
+ * expected to account only for a small fraction of work_mem. But if we
+ * significantly underestimate the number of batches, or if one batch
+ * happens to be very large, this will relax the limit a bit.
+ *
+ * This means we won't enforce the work_mem limit strictly - but without
+ * adjusting the limit that wouldn't be the case either, we'd just use
+ * a lot of memory for the BufFiles without accounting for that. This
+ * way we do our best to minimize the amount of memory used.
+ */
+static void
+ExecHashAdjustMemoryLimit(HashJoinTable hashtable)
+{
+ Size newSpaceAllowed;
+
+ /*
+ * The next time increasing the number of batches "breaks even" is when
+ *
+ * (spaceUsed / 2) == (2 * nbatches * BLCKSZ)
+ *
+ * which means
+ *
+ * spaceUsed == (4 * nbatches * BLCKSZ)
+ *
+ * However, this is a "break even" threshold, when we shrink the hash
+ * table just enough to compensate the new batches, and we'd hit the
+ * new threshold almost immediately again. In practice we want to free
+ * more memory to allow new data before having to increase the number
+ * of batches again. So we allow 25% more space.
+ */
+ newSpaceAllowed
+ = 1.25 * (4 * hashtable->nbatch * sizeof(PGAlignedBlock));
+
+ /* but also account for the buckets, and the current batch files */
+ newSpaceAllowed += hashtable->nbuckets_optimal * sizeof(HashJoinTuple);
+ newSpaceAllowed += (2 * hashtable->nbatch * sizeof(PGAlignedBlock));
+
+ /* shouldn't go down, but use Max() to make sure */
+ hashtable->spaceAllowed = Max(hashtable->spaceAllowed,
+ newSpaceAllowed);
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2349,8 +2468,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak memory usage */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2399,8 +2519,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak memory usage */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2489,8 +2610,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak memory usage */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 4fc34a0e72a..8d54822eb8c 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -198,7 +198,7 @@ rollback to settings;
-- non-parallel
savepoint settings;
set local max_parallel_workers_per_gather = 0;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
set local hash_mem_multiplier = 1.0;
explain (costs off)
select count(*) from simple r join simple s using (id);
@@ -232,7 +232,7 @@ rollback to settings;
-- parallel with parallel-oblivious hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
set local hash_mem_multiplier = 1.0;
set local enable_parallel_hash = off;
explain (costs off)
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 6b0688ab0a6..ca8758900aa 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -145,7 +145,7 @@ rollback to settings;
-- non-parallel
savepoint settings;
set local max_parallel_workers_per_gather = 0;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
set local hash_mem_multiplier = 1.0;
explain (costs off)
select count(*) from simple r join simple s using (id);
@@ -160,7 +160,7 @@ rollback to settings;
-- parallel with parallel-oblivious hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
set local hash_mem_multiplier = 1.0;
set local enable_parallel_hash = off;
explain (costs off)
--
2.47.1
v20241231-multi-spill-0001-Hash-join-with-a-multiple-spil.patchtext/x-patch; charset=UTF-8; name=v20241231-multi-spill-0001-Hash-join-with-a-multiple-spil.patchDownload
From 030fca4e56132dd516b8aba23ed36c47311b33f5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Tue, 31 Dec 2024 17:08:17 +0100
Subject: [PATCH v20241231-multi-spill] Hash join with a multiple spill files
We can't use arbitrary number of batches in the hash join, because that
can use substantial memory use, ignored by the memory limit. Instead,
decide how many batches we can keep in memory, and open files only for
this "slice" of batches. For future batches we keep on spill file per
slice.
Then use these spill files after "switching" to the first batch in each
of those non-in-memory slices.
NB: Compared to the "single-spill" patch, this is less strict, as we
need one spill file per slice, and the number of slices may grow during
execution. It's better than the current behavior, because even with
modest work_mem values we can fit 32-128 batches into memory, thus
thus reducing the number of files to 1/32x - 1/128x. But with many
batches (as with batch explosion) this can still use substantial
amounts of memory (certainly more than work_mem).
Ultimately, it behaves similarly to the "adjustment" patch, but with
more complexity. And it can't handle the "oversized batch" really well,
because the limit is not adjusted.
---
src/backend/commands/explain.c | 6 +-
src/backend/executor/nodeHash.c | 256 +++++++++++++++++++++++---
src/backend/executor/nodeHashjoin.c | 75 +++++---
src/backend/optimizer/path/costsize.c | 2 +
src/include/executor/hashjoin.h | 4 +
src/include/executor/nodeHash.h | 8 +
src/include/nodes/execnodes.h | 1 +
7 files changed, 300 insertions(+), 52 deletions(-)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index a201ed30824..aa3fbeda3dd 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3466,19 +3466,23 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbatch, es);
ExplainPropertyInteger("Original Hash Batches", NULL,
hinstrument.nbatch_original, es);
+ ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+ hinstrument.nbatch_original, es);
ExplainPropertyUInteger("Peak Memory Usage", "kB",
spacePeakKb, es);
}
else if (hinstrument.nbatch_original != hinstrument.nbatch ||
+ hinstrument.nbatch_inmemory != hinstrument.nbatch ||
hinstrument.nbuckets_original != hinstrument.nbuckets)
{
ExplainIndentText(es);
appendStringInfo(es->str,
- "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: " UINT64_FORMAT "kB\n",
+ "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: " UINT64_FORMAT "kB\n",
hinstrument.nbuckets,
hinstrument.nbuckets_original,
hinstrument.nbatch,
hinstrument.nbatch_original,
+ hinstrument.nbatch_inmemory,
spacePeakKb);
}
else
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3e22d50e3a4..e9d482577ac 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -199,10 +200,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -451,6 +450,7 @@ ExecHashTableCreate(HashState *state)
size_t space_allowed;
int nbuckets;
int nbatch;
+ int nbatch_inmemory;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
@@ -477,7 +477,8 @@ ExecHashTableCreate(HashState *state)
state->parallel_state != NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
- &nbuckets, &nbatch, &num_skew_mcvs);
+ &nbuckets, &nbatch, &nbatch_inmemory,
+ &num_skew_mcvs);
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
@@ -503,6 +504,7 @@ ExecHashTableCreate(HashState *state)
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
+ hashtable->nbatch_inmemory = nbatch_inmemory;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
@@ -512,6 +514,8 @@ ExecHashTableCreate(HashState *state)
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->innerOverflowFiles = NULL;
+ hashtable->outerOverflowFiles = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -552,6 +556,7 @@ ExecHashTableCreate(HashState *state)
if (nbatch > 1 && hashtable->parallel_state == NULL)
{
MemoryContext oldctx;
+ int cnt = Min(nbatch, nbatch_inmemory);
/*
* allocate and initialize the file arrays in hashCxt (not needed for
@@ -559,8 +564,19 @@ ExecHashTableCreate(HashState *state)
*/
oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
- hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
- hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+ hashtable->innerBatchFile = palloc0_array(BufFile *, cnt);
+ hashtable->outerBatchFile = palloc0_array(BufFile *, cnt);
+
+ /* also allocate files for overflow batches */
+ if (nbatch > nbatch_inmemory)
+ {
+ int nslices = (nbatch / nbatch_inmemory);
+
+ Assert(nslices % 2 == 0);
+
+ hashtable->innerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+ hashtable->outerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+ }
MemoryContextSwitchTo(oldctx);
@@ -661,6 +677,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs)
{
int tupsize;
@@ -669,6 +686,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
size_t bucket_bytes;
size_t max_pointers;
int nbatch = 1;
+ int nbatch_inmemory = 1;
int nbuckets;
double dbuckets;
@@ -811,6 +829,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
space_allowed,
numbuckets,
numbatches,
+ numbatches_inmemory,
num_skew_mcvs);
return;
}
@@ -848,11 +867,24 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
nbatch = pg_nextpower2_32(Max(2, minbatch));
}
+ /*
+ * See how many batches we can fit into memory (driven mostly by size
+ * of BufFile, with PGAlignedBlock being the largest part of that).
+ * We need one BufFile for inner and outer side, so we count it twice
+ * for each batch, and we stop once we exceed (work_mem/2).
+ */
+ while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+ <= (work_mem * 1024L / 2))
+ nbatch_inmemory *= 2;
+
+ // nbatch_inmemory = nbatch;
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
+ *numbatches_inmemory = nbatch_inmemory;
}
@@ -874,13 +906,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
*/
if (hashtable->innerBatchFile != NULL)
{
- for (i = 1; i < hashtable->nbatch; i++)
+ int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+ for (i = 1; i < n; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
+
+ /* number of batch slices */
+ n = (hashtable->nbatch / hashtable->nbatch_inmemory) + 1;
+
+ for (i = 1; i < n; i++)
+ {
+ if (hashtable->innerOverflowFiles[i])
+ BufFileClose(hashtable->innerOverflowFiles[i]);
+
+ if (hashtable->outerOverflowFiles[i])
+ BufFileClose(hashtable->outerOverflowFiles[i]);
+ }
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -923,11 +969,14 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
/* we had no file arrays before */
- hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
- hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+ hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
+ hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
MemoryContextSwitchTo(oldcxt);
@@ -936,9 +985,35 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+ int oldnbatch_tmp = Min(oldnbatch, hashtable->nbatch_inmemory);
+
/* enlarge arrays and zero out added entries */
- hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
- hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
+ hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
+ hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
+
+ if (nbatch > hashtable->nbatch_inmemory)
+ {
+ int nslices = (nbatch / hashtable->nbatch_inmemory);
+ int oldnslices = (oldnbatch / hashtable->nbatch_inmemory);
+
+ Assert(nslices > 1);
+ Assert(nslices % 2 == 0);
+ Assert((oldnslices == 1) || (oldnslices % 2 == 0));
+ Assert(oldnslices <= nslices);
+
+ if (hashtable->innerOverflowFiles == NULL)
+ {
+ hashtable->innerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+ hashtable->outerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+ }
+ else
+ {
+ hashtable->innerOverflowFiles = repalloc0_array(hashtable->innerOverflowFiles, BufFile *, oldnslices + 1, nslices + 1);
+ hashtable->outerOverflowFiles = repalloc0_array(hashtable->outerOverflowFiles, BufFile *, oldnslices + 1, nslices + 1);
+ }
+ }
+
}
hashtable->nbatch = nbatch;
@@ -1008,11 +1083,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->innerBatchFile,
+ hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
- &hashtable->innerBatchFile[batchno],
+ batchFile,
hashtable);
hashtable->spaceUsed -= hashTupleSize;
@@ -1673,22 +1755,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Consider increasing number of batches if we filled work_mem. */
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
+ BufFile **batchFile;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->innerBatchFile,
+ hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
- &hashtable->innerBatchFile[batchno],
+ batchFile,
hashtable);
}
@@ -1843,6 +1936,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+ int slice,
+ curslice;
+
+ if (hashtable->nbatch < hashtable->nbatch_inmemory)
+ return batchno;
+
+ slice = batchno / hashtable->nbatch_inmemory;
+ curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+ /* slices can't go backwards */
+ Assert(slice >= curslice);
+
+ /* overflow slice */
+ if (slice > curslice)
+ return -1;
+
+ /* current slice, compute index in the current array */
+ return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile **overflowFiles)
+{
+ int idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+ /* get the right overflow file */
+ if (idx == -1)
+ {
+ int slice = (batchno / hashtable->nbatch_inmemory);
+
+ return &overflowFiles[slice];
+ }
+
+ /* batch file in the current slice */
+ return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+ int slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+ memset(hashtable->innerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+ hashtable->innerOverflowFiles[slice] = NULL;
+
+ memset(hashtable->outerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+ hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+ int batchidx;
+
+ hashtable->curbatch++;
+
+ /* see if we skipped to the next batch slice */
+ batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+ /* Can't be -1, current batch is in the current slice by definition. */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+ /*
+ * If we skipped to the next slice of batches, reset the array of files
+ * and use the overflow file as the first batch.
+ */
+ if (batchidx == 0)
+ ExecHashSwitchToNextBatchSlice(hashtable);
+
+ return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ /* Account for slice files (inner + outer) */
+ spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2349,8 +2544,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2399,8 +2595,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2489,8 +2686,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
@@ -2569,10 +2768,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->innerBatchFile,
+ hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
- &hashtable->innerBatchFile[batchno],
+ batchFile,
hashtable);
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
@@ -2750,6 +2956,8 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument,
hashtable->nbatch);
instrument->nbatch_original = Max(instrument->nbatch_original,
hashtable->nbatch_original);
+ instrument->nbatch_inmemory = Min(hashtable->nbatch,
+ hashtable->nbatch_inmemory);
instrument->space_peak = Max(instrument->space_peak,
hashtable->spacePeak);
}
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ea0045bc0f3..580b5da93bd 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -481,10 +481,15 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
{
+ BufFile **batchFile;
bool shouldFree;
MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
&shouldFree);
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->outerBatchFile,
+ hashtable->outerOverflowFiles);
+
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
@@ -492,7 +497,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(mintuple, hashvalue,
- &hashtable->outerBatchFile[batchno],
+ batchFile,
hashtable);
if (shouldFree)
@@ -1030,17 +1035,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
else if (curbatch < hashtable->nbatch)
{
- BufFile *file = hashtable->outerBatchFile[curbatch];
+ BufFile **file = ExecHashGetBatchFile(hashtable, curbatch,
+ hashtable->outerBatchFile,
+ hashtable->outerOverflowFiles);
/*
* In outer-join cases, we could get here even though the batch file
* is empty.
*/
- if (file == NULL)
+ if (*file == NULL)
return NULL;
slot = ExecHashJoinGetSavedTuple(hjstate,
- file,
+ *file,
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
@@ -1135,9 +1142,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
+ int batchidx;
+ int curbatch_old;
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
+ curbatch_old = curbatch;
+
+ /* index of the old batch */
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+ /* has to be in the current slice of batches */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
if (curbatch > 0)
{
@@ -1145,9 +1161,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* We no longer need the previous outer batch file; close it right
* away to free disk space.
*/
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
}
else /* we just finished the first batch */
{
@@ -1182,45 +1198,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* scan, we have to rescan outer batches in case they contain tuples that
* need to be reassigned.
*/
- curbatch++;
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
while (curbatch < nbatch &&
- (hashtable->outerBatchFile[curbatch] == NULL ||
- hashtable->innerBatchFile[curbatch] == NULL))
+ (hashtable->outerBatchFile[batchidx] == NULL ||
+ hashtable->innerBatchFile[batchidx] == NULL))
{
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
HJ_FILL_OUTER(hjstate))
break; /* must process due to rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
HJ_FILL_INNER(hjstate))
break; /* must process due to rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to rule 2 */
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
- if (hashtable->innerBatchFile[curbatch])
- BufFileClose(hashtable->innerBatchFile[curbatch]);
- hashtable->innerBatchFile[curbatch] = NULL;
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
- curbatch++;
+ if (hashtable->innerBatchFile[batchidx])
+ BufFileClose(hashtable->innerBatchFile[batchidx]);
+ hashtable->innerBatchFile[batchidx] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
+
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
}
if (curbatch >= nbatch)
+ {
+ hashtable->curbatch = curbatch_old;
return false; /* no more batches */
-
- hashtable->curbatch = curbatch;
+ }
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[curbatch];
+ innerFile = hashtable->innerBatchFile[batchidx];
if (innerFile != NULL)
{
@@ -1246,15 +1267,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* needed
*/
BufFileClose(innerFile);
- hashtable->innerBatchFile[curbatch] = NULL;
+ hashtable->innerBatchFile[batchidx] = NULL;
}
/*
* Rewind outer batch file (if present), so that we can start reading it.
*/
- if (hashtable->outerBatchFile[curbatch] != NULL)
+ if (hashtable->outerBatchFile[batchidx] != NULL)
{
- if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
+ if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file")));
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c36687aa4df..6f40600d10b 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -4173,6 +4173,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
+ int numbatches_inmemory;
int num_skew_mcvs;
size_t space_allowed; /* unused */
@@ -4227,6 +4228,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
&space_allowed,
&numbuckets,
&numbatches,
+ &numbatches_inmemory,
&num_skew_mcvs);
/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2d8ed8688cd..76c983dbd4d 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -320,6 +320,7 @@ typedef struct HashJoinTableData
int *skewBucketNums; /* array indexes of active skew buckets */
int nbatch; /* number of batches */
+ int nbatch_inmemory; /* max number of in-memory batches */
int curbatch; /* current batch #; 0 during 1st pass */
int nbatch_original; /* nbatch when we started inner scan */
@@ -331,6 +332,9 @@ typedef struct HashJoinTableData
double partialTuples; /* # tuples obtained from inner plan by me */
double skewTuples; /* # tuples inserted into skew tuples */
+ BufFile **innerOverflowFiles; /* temp file for inner overflow batches */
+ BufFile **outerOverflowFiles; /* temp file for outer overflow batches */
+
/*
* These arrays are allocated for the life of the hash join, but only if
* nbatch > 1. A file is opened only when we first write a tuple into it
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index e4eb7bc6359..ef1d2bec15a 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+#include "storage/buffile.h"
struct SharedHashJoinBatch;
@@ -46,6 +47,12 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles,
+ BufFile **overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -62,6 +69,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1590b643920..eb8eaea89ab 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2755,6 +2755,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of buckets */
int nbatch; /* number of batches at end of execution */
int nbatch_original; /* planned number of batches */
+ int nbatch_inmemory; /* number of batches kept in memory */
Size space_peak; /* peak memory usage in bytes */
} HashInstrumentation;
--
2.47.1
v20241231-single-spill-0001-Hash-join-with-a-single-spill.patchtext/x-patch; charset=UTF-8; name=v20241231-single-spill-0001-Hash-join-with-a-single-spill.patchDownload
From 55142837eaf439652fd91e165f3d4c3607ea9987 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Tue, 31 Dec 2024 17:34:59 +0100
Subject: [PATCH v20241231-single-spill] Hash join with a single spill file
We can't use arbitrary number of batches in the hash join, because that
can use substantial memory use, ignored by the memory limit. Instead,
decide how many batches we can keep in memory, and open files only for
those batches, and one "overflow" file for all future batches.
Then use this spill file after "switching" to the first batch that is
not in memory.
NB: This allows enforcing the memory limit very strictly, but it's very
inefficient and causes a lot of temporary file traffic.
---
src/backend/commands/explain.c | 6 +-
src/backend/executor/nodeHash.c | 206 +++++++++++++++++++++++---
src/backend/executor/nodeHashjoin.c | 76 ++++++----
src/backend/optimizer/path/costsize.c | 2 +
src/include/executor/hashjoin.h | 4 +
src/include/executor/nodeHash.h | 7 +
src/include/nodes/execnodes.h | 1 +
7 files changed, 250 insertions(+), 52 deletions(-)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index a201ed30824..aa3fbeda3dd 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3466,19 +3466,23 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbatch, es);
ExplainPropertyInteger("Original Hash Batches", NULL,
hinstrument.nbatch_original, es);
+ ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+ hinstrument.nbatch_original, es);
ExplainPropertyUInteger("Peak Memory Usage", "kB",
spacePeakKb, es);
}
else if (hinstrument.nbatch_original != hinstrument.nbatch ||
+ hinstrument.nbatch_inmemory != hinstrument.nbatch ||
hinstrument.nbuckets_original != hinstrument.nbuckets)
{
ExplainIndentText(es);
appendStringInfo(es->str,
- "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: " UINT64_FORMAT "kB\n",
+ "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: " UINT64_FORMAT "kB\n",
hinstrument.nbuckets,
hinstrument.nbuckets_original,
hinstrument.nbatch,
hinstrument.nbatch_original,
+ hinstrument.nbatch_inmemory,
spacePeakKb);
}
else
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3e22d50e3a4..58062a8b256 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -199,10 +200,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -451,6 +450,7 @@ ExecHashTableCreate(HashState *state)
size_t space_allowed;
int nbuckets;
int nbatch;
+ int nbatch_inmemory;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
@@ -477,7 +477,8 @@ ExecHashTableCreate(HashState *state)
state->parallel_state != NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
- &nbuckets, &nbatch, &num_skew_mcvs);
+ &nbuckets, &nbatch, &nbatch_inmemory,
+ &num_skew_mcvs);
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
@@ -503,6 +504,7 @@ ExecHashTableCreate(HashState *state)
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
+ hashtable->nbatch_inmemory = nbatch_inmemory;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
@@ -512,6 +514,8 @@ ExecHashTableCreate(HashState *state)
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->innerOverflowFile = NULL;
+ hashtable->outerOverflowFile = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -553,14 +557,16 @@ ExecHashTableCreate(HashState *state)
{
MemoryContext oldctx;
+ int cnt = Min(nbatch, nbatch_inmemory);
+
/*
* allocate and initialize the file arrays in hashCxt (not needed for
* parallel case which uses shared tuplestores instead of raw files)
*/
oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
- hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
- hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+ hashtable->innerBatchFile = palloc0_array(BufFile *, cnt);
+ hashtable->outerBatchFile = palloc0_array(BufFile *, cnt);
MemoryContextSwitchTo(oldctx);
@@ -661,6 +667,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs)
{
int tupsize;
@@ -669,6 +676,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
size_t bucket_bytes;
size_t max_pointers;
int nbatch = 1;
+ int nbatch_inmemory = 1;
int nbuckets;
double dbuckets;
@@ -811,6 +819,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
space_allowed,
numbuckets,
numbatches,
+ numbatches_inmemory,
num_skew_mcvs);
return;
}
@@ -848,11 +857,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
nbatch = pg_nextpower2_32(Max(2, minbatch));
}
+ /*
+ * See how many batches we can fit into memory (driven mostly by size
+ * of BufFile, with PGAlignedBlock being the largest part of that).
+ * We need one BufFile for inner and outer side, so we count it twice
+ * for each batch, and we stop once we exceed (work_mem/2).
+ */
+ while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+ <= (work_mem * 1024L / 2))
+ nbatch_inmemory *= 2;
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
+ *numbatches_inmemory = nbatch_inmemory;
}
@@ -874,13 +894,21 @@ ExecHashTableDestroy(HashJoinTable hashtable)
*/
if (hashtable->innerBatchFile != NULL)
{
- for (i = 1; i < hashtable->nbatch; i++)
+ int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+ for (i = 1; i < nbatch; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
+
+ if (hashtable->innerOverflowFile)
+ BufFileClose(hashtable->innerOverflowFile);
+
+ if (hashtable->outerOverflowFile)
+ BufFileClose(hashtable->outerOverflowFile);
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -923,11 +951,13 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
/* we had no file arrays before */
- hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
- hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+ hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
+ hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
MemoryContextSwitchTo(oldcxt);
@@ -936,9 +966,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+ int oldnbatch_tmp = Min(oldnbatch, hashtable->nbatch_inmemory);
+
/* enlarge arrays and zero out added entries */
- hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
- hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
+ hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
+ hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
}
hashtable->nbatch = nbatch;
@@ -1008,11 +1041,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->innerBatchFile,
+ &hashtable->innerOverflowFile);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
- &hashtable->innerBatchFile[batchno],
+ batchFile,
hashtable);
hashtable->spaceUsed -= hashTupleSize;
@@ -1673,22 +1713,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Consider increasing number of batches if we filled work_mem. */
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
+ BufFile **batchFile;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->innerBatchFile,
+ &hashtable->innerOverflowFile);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
- &hashtable->innerBatchFile[batchno],
+ batchFile,
hashtable);
}
@@ -1843,6 +1894,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+ int slice,
+ curslice;
+
+ if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+ return batchno;
+
+ slice = batchno / hashtable->nbatch_inmemory;
+ curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+ /* slices can't go backwards */
+ Assert(slice >= curslice);
+
+ /* overflow slice */
+ if (slice > curslice)
+ return -1;
+
+ /* current slice, compute index in the current array */
+ return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile **overflowFile)
+{
+ int idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+ if (idx == -1)
+ return overflowFile;
+
+ return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+ memset(hashtable->innerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->innerBatchFile[0] = hashtable->innerOverflowFile;
+ hashtable->innerOverflowFile = NULL;
+
+ memset(hashtable->outerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->outerBatchFile[0] = hashtable->outerOverflowFile;
+ hashtable->outerOverflowFile = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+ int batchidx;
+
+ hashtable->curbatch++;
+
+ /* see if we skipped to the next batch slice */
+ batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+ /* Can't be -1, current batch is in the current slice by definition. */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+ /*
+ * If we skipped to the next slice of batches, reset the array of files
+ * and use the overflow file as the first batch.
+ */
+ if (batchidx == 0)
+ ExecHashSwitchToNextBatchSlice(hashtable);
+
+ return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ /* Account for slice files (inner + outer) */
+ spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2349,8 +2494,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2399,8 +2545,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2489,8 +2636,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
@@ -2569,10 +2718,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->innerBatchFile,
+ &hashtable->innerOverflowFile);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
- &hashtable->innerBatchFile[batchno],
+ batchFile,
hashtable);
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
@@ -2750,6 +2906,8 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument,
hashtable->nbatch);
instrument->nbatch_original = Max(instrument->nbatch_original,
hashtable->nbatch_original);
+ instrument->nbatch_inmemory = Min(hashtable->nbatch,
+ hashtable->nbatch_inmemory);
instrument->space_peak = Max(instrument->space_peak,
hashtable->spacePeak);
}
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ea0045bc0f3..2f5d94653e8 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -481,6 +481,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
{
+ BufFile **batchFile;
bool shouldFree;
MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
&shouldFree);
@@ -491,8 +492,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+ hashtable->outerBatchFile,
+ &hashtable->outerOverflowFile);
+
ExecHashJoinSaveTuple(mintuple, hashvalue,
- &hashtable->outerBatchFile[batchno],
+ batchFile,
hashtable);
if (shouldFree)
@@ -1030,17 +1036,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
else if (curbatch < hashtable->nbatch)
{
- BufFile *file = hashtable->outerBatchFile[curbatch];
+ BufFile **file = ExecHashGetBatchFile(hashtable, curbatch,
+ hashtable->outerBatchFile,
+ &hashtable->outerOverflowFile);
/*
* In outer-join cases, we could get here even though the batch file
* is empty.
*/
- if (file == NULL)
+ if (*file == NULL)
return NULL;
slot = ExecHashJoinGetSavedTuple(hjstate,
- file,
+ *file,
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
@@ -1135,9 +1143,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
+ int batchidx;
+ int curbatch_old;
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
+ curbatch_old = curbatch;
+
+ /* index of the old batch */
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+ /* has to be in the current slice of batches */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
if (curbatch > 0)
{
@@ -1145,9 +1162,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* We no longer need the previous outer batch file; close it right
* away to free disk space.
*/
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
}
else /* we just finished the first batch */
{
@@ -1182,45 +1199,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* scan, we have to rescan outer batches in case they contain tuples that
* need to be reassigned.
*/
- curbatch++;
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
while (curbatch < nbatch &&
- (hashtable->outerBatchFile[curbatch] == NULL ||
- hashtable->innerBatchFile[curbatch] == NULL))
+ (hashtable->outerBatchFile[batchidx] == NULL ||
+ hashtable->innerBatchFile[batchidx] == NULL))
{
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
HJ_FILL_OUTER(hjstate))
break; /* must process due to rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
HJ_FILL_INNER(hjstate))
break; /* must process due to rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to rule 2 */
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
- if (hashtable->innerBatchFile[curbatch])
- BufFileClose(hashtable->innerBatchFile[curbatch]);
- hashtable->innerBatchFile[curbatch] = NULL;
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
- curbatch++;
+ if (hashtable->innerBatchFile[batchidx])
+ BufFileClose(hashtable->innerBatchFile[batchidx]);
+ hashtable->innerBatchFile[batchidx] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
+
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
}
if (curbatch >= nbatch)
+ {
+ hashtable->curbatch = curbatch_old;
return false; /* no more batches */
-
- hashtable->curbatch = curbatch;
+ }
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[curbatch];
+ innerFile = hashtable->innerBatchFile[batchidx];
if (innerFile != NULL)
{
@@ -1246,15 +1268,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* needed
*/
BufFileClose(innerFile);
- hashtable->innerBatchFile[curbatch] = NULL;
+ hashtable->innerBatchFile[batchidx] = NULL;
}
/*
* Rewind outer batch file (if present), so that we can start reading it.
*/
- if (hashtable->outerBatchFile[curbatch] != NULL)
+ if (hashtable->outerBatchFile[batchidx] != NULL)
{
- if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
+ if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file")));
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c36687aa4df..6f40600d10b 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -4173,6 +4173,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
+ int numbatches_inmemory;
int num_skew_mcvs;
size_t space_allowed; /* unused */
@@ -4227,6 +4228,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
&space_allowed,
&numbuckets,
&numbatches,
+ &numbatches_inmemory,
&num_skew_mcvs);
/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2d8ed8688cd..c612ae98117 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -320,6 +320,7 @@ typedef struct HashJoinTableData
int *skewBucketNums; /* array indexes of active skew buckets */
int nbatch; /* number of batches */
+ int nbatch_inmemory; /* max number of in-memory batches */
int curbatch; /* current batch #; 0 during 1st pass */
int nbatch_original; /* nbatch when we started inner scan */
@@ -331,6 +332,9 @@ typedef struct HashJoinTableData
double partialTuples; /* # tuples obtained from inner plan by me */
double skewTuples; /* # tuples inserted into skew tuples */
+ BufFile *innerOverflowFile; /* temp file for overflow batch batch */
+ BufFile *outerOverflowFile; /* temp file for overflow batch batch */
+
/*
* These arrays are allocated for the life of the hash join, but only if
* nbatch > 1. A file is opened only when we first write a tuple into it
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index e4eb7bc6359..fc82ea43c3b 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+#include "storage/buffile.h"
struct SharedHashJoinBatch;
@@ -46,6 +47,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **files, BufFile **overflow);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -62,6 +68,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1590b643920..eb8eaea89ab 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2755,6 +2755,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of buckets */
int nbatch; /* number of batches at end of execution */
int nbatch_original; /* planned number of batches */
+ int nbatch_inmemory; /* number of batches kept in memory */
Size space_peak; /* peak memory usage in bytes */
} HashInstrumentation;
--
2.47.1
Hi,
I kept thinking about this, thinking about alternative approaches, and
also about how hashjoin limits memory in general.
First, I want to discuss one thing I tried, but I think it does not
really work. The annoying part about the "memory rebalance" patch is
that it relaxes the memory limit. However, the memory limit is a lie,
because we enforce it by adding batches, and that unfortunately is not
free - each batch is a BufFile with BLCKSZ buffer, so while we may
succeed in keeping the hash table in work_mem, we may end up with the
batches using gigabytes of memory - which is not reported in EXPLAIN,
but it's still allocated. This does happen even without the hash
explosion, the hash explosion is just an extreme version.
There's no way to work around this ... as long as we use BufFiles. What
if we used plain File(s), without the buffering? Then the per-batch cost
would be considerably lower. Of course, this would be acceptable only if
not having the buffering has acceptable impact on performance. That
seemed unlikely, but I decided to give it a try - see a PoC of this in
the attached files-poc-patches.tgz (patch 0001).
Unfortunately, the impact does not seem acceptable - it does enforce the
limit, but the lack of buffering does make a huge difference, making it
~2x slower depending on the query.
I experimented a bit with a cross-file buffer, much smaller than the sum
of BufFile buffers, but still allowing combining writes into larger
chunks. Imagine you have a buffer large enough for (nbatch * 2) tuples,
then we may expect writing two tuples at once into each batch file.
The 0002 patch in the PoC series tries to do this, but it does not
really help, and in some cases it's doing even worse than 0001, because
the cost of maintaining the shared buffer increases with the number of
batches. I'm sure some of this is my fault, and it could be improved and
optimized quite a bit.
I decided not to do that, because this experiment made me realize that:
a) The buffer would need to grow with the number of batches, to have any
chance of combining the writes. If we want to combine K tuples into a
single write (on average), we'd need the buffer to keep (nbatches * K)
tuples, and once the nbatches gets large (because who cares about
BufFile allocations with a handful of batches), that may be a lot of
memory. Say we get to 32k batches (which is not that hard), and we want
to keep 16 tuples, each 128B, that's ~64MB. Not a huge amount, and much
less than the 512MB we'd need for batches. But still, it means we're not
really enforcing the memory limit - which was the point of using files
without buffering.
b) It does enforce the limit on the hash table itself, though. And that
is actually not great, because it means it can't possibly help with the
batch explosion, caused by a single "indivisible" batch.
c) It's pretty invasive.
So I still think adjusting the memory as we're adding batches seems like
a better approach. The question is where to do the adjustments, based on
what logic ...
I think the general idea and formula explained in [1]/messages/by-id/7bed6c08-72a0-4ab9-a79c-e01fcdd0940f@vondra.me is right, but
while working on the PoC patch I started to think about how to formalize
this. And I ended up creating two tables that I think visualize is
pretty nicely.
Imagine a table (in the spreadsheet sense), with work_mem values in rows
and nbatch values in columns. And the cell is "total memory" used to
execute such hash join, i.e.
work_mem + (2 * nbatches * 8K)
(Yes, there's a multiplier for the hash table size, but I use work_mem
for simplicity.) This is what the two attached PDF files show,
highlighting two interesting patterns, so let's talk about that.
1) hash-memory-model-1.pdf
Imagine you're executing a hash join - you're in a particular cell of
the table. And we've reached the current memory limit, i.e. we've filled
the hash table, and need to do something. The only solution is to
"expand" the expected "total hash size" (nbatch * hash_table_size),
which we do by simply doubling the number of batches. And often that's
the right thing to do.
For example, let's say we're running with work_mem=4MB and nbatch=16,
we've filled the hash table and are using 4336kB of memory (a little bit
more than work_mem). If we double the number of batches, we may use up
to 4352kB of memory in the next cycle. And that's fine.
But hey, there's another way to double the "total hash size" - allowing
the in-memory hash table to be twice as large. In the above case, that
would be wrong, because doubling work_mem would use up to 8432kB.
So in this case it's clearly right to double the number of batches,
because that minimizes the total memory used in the next step.
However, consider for example the cell with work_mem=4MB, nbatch=8192.
We're using 135MB of memory, and need to decide what to do. Doubling the
batches means we'll use up to 266MB. But doubling work_mem increases the
memory use only to 139MB.
This is what the green/red in the table means. Green means "better to
double nbatch" while red is "better to double work_mem". And clearly,
the table is split into two regions, separated by the diagonal.
The diagonal is the "optimal" path - if you start in any cell, the
red/green decisions will get you to the diagonal, and then along it.
The patch [1]/messages/by-id/7bed6c08-72a0-4ab9-a79c-e01fcdd0940f@vondra.me aims to do this, but I think this visual explanation is
much clearer than anything in that patch.
2) hash-memory-model-2.pdf
I've also asked if maybe the patch should do something about the choice
of initial nbatch value, which gets me to the second PDF.
Imagine we know the total amount of table in the Hash node is 1GB. There
are different ways to split that into batches. If we have enough memory,
we could do hash join without batching. With work_mem=1MB we'll need to
split this into 1024 batches, or we might do work_mem=2MB with only 512
batches. And so on - we're moving along the anti-diagonal.
The point is that while this changes the work_mem, this can have pretty
non-intuitive impact on total memory use. For example, with wm=1MB we
actually use 17MB of memory, while with wm=2MB we use only 10MB.
But each anti-diagonal has a minimum - the value on the diagonal. I
believe this is the "optimal starting cell" for the hash join. If we
don't pick this, the rules explained in (1) will eventually get us to
the diagonal anyway.
A different visualization is in the attached SVG, which is a surface
plot / heat map of the total memory use. It shows that there really is a
"valley" of minimal values on the diagonal, and that the growth for
doubling batches is much steeper than for doubling work_mem.
Attached is an "adjust-size" patch implementing this. In the end it has
pretty much the same effect as the patch in [1]/messages/by-id/7bed6c08-72a0-4ab9-a79c-e01fcdd0940f@vondra.me, except that it's much
simpler - everything important happens in just two simple blocks, one in
ExecChooseHashTableSize(), the other in ExecHashIncreaseNumBatches().
There's a bit of complexity, because if we allow growing the size of the
in-memory hash table, we probably need to allow increasing number of
buckets. But that's not possible with how we split the hashvalue now, so
the patch adjusts that by reversing the hashvalue bits when calculating
the batch. I'm not sure if this is the best way to do this, there might
well be a better solution.
I admit all of this seemed a bit weird / wrong initially, because it
feels like giving up the memory limit. But the simple truth is that
memory limit is pretty much just a lie - the fact that we only show the
hash table size in EXPLAIN does not mean we're not using gigabytes more
memory, we're just not making it clear. So I'd argue this actually does
a better job in limiting memory usage.
When thinking about reasons why doubling the work_mem might not be the
right thing, I can think of one case - CPU caches. IIRC it may be much
faster to do the lookups if the hash is small enough to fit into L3, and
and doubling this might work against the goal, although I'm not sure how
bad the impact may be. In the batch explosion case it surely doesn't
matter - the cost of spilling/loading many files is much higher. But for
regular (well estimated) cases it might have negative impact.
This is why the patch only adjusts the initial parameters in the "red"
area, not in the green. Maybe it should be a bit more conservative and
only kick in when nbatch value above some threshold.
I'd appreciate opinions and alternative ideas about this.
I'm also attaching the data + SQL script I use to trigger the batch
explosion with up to 2M batches.
regards
[1]: /messages/by-id/7bed6c08-72a0-4ab9-a79c-e01fcdd0940f@vondra.me
/messages/by-id/7bed6c08-72a0-4ab9-a79c-e01fcdd0940f@vondra.me
--
Tomas Vondra
Attachments:
vadjust-size-0001-hashjoin-sizing-balance.patchtext/x-patch; charset=UTF-8; name=vadjust-size-0001-hashjoin-sizing-balance.patchDownload
From 20d56eb519d5785160b333bbbd1bb9d909529a5c Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Sun, 5 Jan 2025 21:24:23 +0100
Subject: [PATCH vadjust-size 1/2] hashjoin sizing balance
---
src/backend/executor/nodeHash.c | 113 +++++++++++++++++++++++++++++++-
1 file changed, 111 insertions(+), 2 deletions(-)
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6f8a379e3b9..271ffeb2228 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -848,6 +848,37 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
nbatch = pg_nextpower2_32(Max(2, minbatch));
}
+ /*
+ * Optimize the total amount of memory consumed by the hash, including
+ * the buffer files. It may be more efficient to use fewer large batches.
+ *
+ * Try to move on the anti-diagonal and see if we'd consume less memory.
+ * If yes, do that, and also adjust the bucket counts. We only move in
+ * the direction of fewer batches, because that part of the "matrix" has
+ * the most issues with memory consumption.
+ *
+ * We might try this for the other direction too, but only as long as
+ * the memory consumption is more than the work_mem value. It can only
+ * be less than 2x worse than the ideal memory allowance, and after one
+ * step we'll get on the "boundary" I think.
+ */
+ while (nbatch > 0)
+ {
+ /* how much memory would we use with half the batches? */
+ size_t space = hash_table_bytes * 2 + (nbatch * BLCKSZ);
+ size_t current = hash_table_bytes + (2 * nbatch * BLCKSZ);
+
+ /* we're already optimal */
+ if (current < space)
+ break;
+
+ /* better to use fewer larger batches */
+ *space_allowed = *space_allowed * 2;
+
+ nbatch /= 2;
+ nbuckets *= 2;
+ }
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
@@ -943,6 +974,29 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
hashtable->nbatch = nbatch;
+ /*
+ * Consider adjusting the allowed batch size, depending on the number of
+ * batches, to minimize the overall amount of memory consumed - for both
+ * the hash table and batch files.
+ *
+ * XXX If we increase the batch size, we should also adjust the optimal
+ * number of buckets. But we can't do that, because right now we split
+ * the hash into batch/bucket like this [...|batch|bucket] so adding
+ * bits to bucket changes the batch in a way that breaks the spilling
+ * (the batchno could move "backwards, but we assume that can't happen).
+ */
+ {
+ size_t optimalSpace = (nbatch * 2 * BLCKSZ);
+
+ while (hashtable->spaceAllowed < optimalSpace)
+ {
+ hashtable->spaceAllowed *= 2;
+
+ hashtable->nbuckets_optimal *= 2;
+ hashtable->log2_nbuckets_optimal++;
+ }
+ }
+
/*
* Scan through the existing hash table entries and dump out any that are
* no longer of the current batch.
@@ -1793,6 +1847,62 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
heap_free_minimal_tuple(tuple);
}
+static inline unsigned char
+reverse_byte(unsigned char x)
+{
+ static const unsigned char table[] = {
+ 0x00, 0x80, 0x40, 0xc0, 0x20, 0xa0, 0x60, 0xe0,
+ 0x10, 0x90, 0x50, 0xd0, 0x30, 0xb0, 0x70, 0xf0,
+ 0x08, 0x88, 0x48, 0xc8, 0x28, 0xa8, 0x68, 0xe8,
+ 0x18, 0x98, 0x58, 0xd8, 0x38, 0xb8, 0x78, 0xf8,
+ 0x04, 0x84, 0x44, 0xc4, 0x24, 0xa4, 0x64, 0xe4,
+ 0x14, 0x94, 0x54, 0xd4, 0x34, 0xb4, 0x74, 0xf4,
+ 0x0c, 0x8c, 0x4c, 0xcc, 0x2c, 0xac, 0x6c, 0xec,
+ 0x1c, 0x9c, 0x5c, 0xdc, 0x3c, 0xbc, 0x7c, 0xfc,
+ 0x02, 0x82, 0x42, 0xc2, 0x22, 0xa2, 0x62, 0xe2,
+ 0x12, 0x92, 0x52, 0xd2, 0x32, 0xb2, 0x72, 0xf2,
+ 0x0a, 0x8a, 0x4a, 0xca, 0x2a, 0xaa, 0x6a, 0xea,
+ 0x1a, 0x9a, 0x5a, 0xda, 0x3a, 0xba, 0x7a, 0xfa,
+ 0x06, 0x86, 0x46, 0xc6, 0x26, 0xa6, 0x66, 0xe6,
+ 0x16, 0x96, 0x56, 0xd6, 0x36, 0xb6, 0x76, 0xf6,
+ 0x0e, 0x8e, 0x4e, 0xce, 0x2e, 0xae, 0x6e, 0xee,
+ 0x1e, 0x9e, 0x5e, 0xde, 0x3e, 0xbe, 0x7e, 0xfe,
+ 0x01, 0x81, 0x41, 0xc1, 0x21, 0xa1, 0x61, 0xe1,
+ 0x11, 0x91, 0x51, 0xd1, 0x31, 0xb1, 0x71, 0xf1,
+ 0x09, 0x89, 0x49, 0xc9, 0x29, 0xa9, 0x69, 0xe9,
+ 0x19, 0x99, 0x59, 0xd9, 0x39, 0xb9, 0x79, 0xf9,
+ 0x05, 0x85, 0x45, 0xc5, 0x25, 0xa5, 0x65, 0xe5,
+ 0x15, 0x95, 0x55, 0xd5, 0x35, 0xb5, 0x75, 0xf5,
+ 0x0d, 0x8d, 0x4d, 0xcd, 0x2d, 0xad, 0x6d, 0xed,
+ 0x1d, 0x9d, 0x5d, 0xdd, 0x3d, 0xbd, 0x7d, 0xfd,
+ 0x03, 0x83, 0x43, 0xc3, 0x23, 0xa3, 0x63, 0xe3,
+ 0x13, 0x93, 0x53, 0xd3, 0x33, 0xb3, 0x73, 0xf3,
+ 0x0b, 0x8b, 0x4b, 0xcb, 0x2b, 0xab, 0x6b, 0xeb,
+ 0x1b, 0x9b, 0x5b, 0xdb, 0x3b, 0xbb, 0x7b, 0xfb,
+ 0x07, 0x87, 0x47, 0xc7, 0x27, 0xa7, 0x67, 0xe7,
+ 0x17, 0x97, 0x57, 0xd7, 0x37, 0xb7, 0x77, 0xf7,
+ 0x0f, 0x8f, 0x4f, 0xcf, 0x2f, 0xaf, 0x6f, 0xef,
+ 0x1f, 0x9f, 0x5f, 0xdf, 0x3f, 0xbf, 0x7f, 0xff,
+ };
+
+ return table[x];
+}
+
+static inline uint32
+reverse_uint32(uint32 x)
+{
+ uint32 ret;
+
+ unsigned char *src = (unsigned char *) &x;
+ unsigned char *dst = (unsigned char *) &ret;
+
+ dst[0] = reverse_byte(src[3]);
+ dst[1] = reverse_byte(src[2]);
+ dst[2] = reverse_byte(src[1]);
+ dst[3] = reverse_byte(src[0]);
+
+ return ret;
+}
/*
* ExecHashGetBucketAndBatch
@@ -1833,8 +1943,7 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
if (nbatch > 1)
{
*bucketno = hashvalue & (nbuckets - 1);
- *batchno = pg_rotate_right32(hashvalue,
- hashtable->log2_nbuckets) & (nbatch - 1);
+ *batchno = reverse_uint32(hashvalue) & (nbatch - 1);
}
else
{
--
2.47.1
hash-collisions.data.gzapplication/gzip; name=hash-collisions.data.gzDownload
�Y�zg hash-collisions.data <�i����D���]��x8��B���W���&�c��"@
=)���Wy�X��c���y�;��y�����)��R�V�_��Ms����|��\�^��%���������{��V\���?��~�����������q���QC��|�(_��yJ���1�/�������r��g��=�SR|�q������~W����J����<��Zv��~��������k�1gn|�H!���4B[������CrH���8�__}BzG�I�������_�?���J|��O���~�/]�ev���{��������z�~��k�W�o��{���W������r����k��[���}�K��������1�b�CLe��o-�N9��,���>������+��������d^9������3Y�{���R
��Q���t_%�����{���X���k�)��{[�NVI�t�*��ez��p��~�����=��6�[ZJ�M{��0���������'m�h���b�+���5>�d���>�^����������vW��sk�<��)!]q��sx�N���u
q��=���w�� ag��Jc�HG�3���omf9��?�S�[c�����jbr�3��jN���y��yj���+���=�������n����0����Lz�����g�����'���W��� �~��iOw��n,<6����=r��W��T7s���c�\�r����o����_�k��f��W����MS�����w;��VL�>��7?=X�2fhq����wg<����a��/��G�L��;bbo.�{�'o�k��'e�e��|p#�3]%��k�B�31������c���`|3�c2�|V�}���H5&��z1��{|��x��~������9������}�Z�P�r�Oy��*> ��J�b�k���T�?�P����3��^������,^�=���@����bLm�9��ne�]����%��a�����G�N�'v�a����l�|}o�����������m��o��0�_�>���/�cx�3��aoE���K����������Mk�S�s�t���d�\���yo��5g
y�k�>�������W�!��Y^�
�y~7������jy2�����c�����T�����}���������G�+��t����ac������;�1����[�5+���^��S���9����l����������p���S7�<���_��������V0L�O%���
�5��O-X��,|&T���G��~������ffQ�zY�A48+�;����/��������./�,���� ��u����1)BF�� �&�p�/8�)m�t�C'�V� wC�5���o7��m�a��O���g �z#��v�}w�
���0oV��[�1��1���e.mfV�9o��3������������g�]���������Q+O-���lo8�p����^0�WB��1��]��`�:�����`����������V���/�b����q��ny��i
\ ���~���IP�i`�#��b��>���yG�>QcX���2���2��v6�0�Q�r�Mp���
8��G�D�;/�a^}��
XV~���y����H�����`��?����?P�E��l|Nb1x �F��aw���sq'p����k�'L���D�|���pd�����W�+������[@c�QM�w�R������J�����[�`����K{+c+3��Yq�����p��/��)��b����H�����������^�=^)���"����.��F��e�7_�
d�@�}����|S�����
���P�t ]x���Ipdv��oEq.#K�����i��V�D�f 5�������������Kg#���\��Ax 6�_,�f�{���9[�V{�o��T���3�@
�.Ng�onB���m&�^���cq������w��hH\���)�(0zgo�ba�����y��z��y��F]����Y��P�O�0�\�������LP�$x� ���� sA�n��8q��Nw�������b1�q��b��pLo`�(>�CXd�6��m�������� H�3�g�,�#$�gl�Dz�fDN�&a�4��P�wl �F��� �e }y����[�i�s��M�{I�
p��A�����l��S��U����@� ����X_�&X 8��}����E� �6������Y��S�::���/����1����?8��������+������=�p�}A
�G�$���^-����2��5l� �]�WL2���H��cA����5� ���t�| ��u�S����v��A�i4�
E�&�%����
�)�"��0�1��x�oK��&n��3����g�K��U���#1�,`\��d�]��gn����I��XzZ �����%�C��E|��CUG��v�?>:�����:of�������=\�E�n 2�9���_�%���gS}��U�Ri~������m�^hs���e��(���� pRF�T��C����ol�I�K��� I���
��Cy�o,��p��@'id�h-�3��m]�%<X��rg�l/�&�f�3�� ����[ga7����y�������f ����u��-p�|�O�R������&~�tP�c!�`?�� ���\��VB�T������O�.�[\T�};
��F������/~�0����8���0i���|8� ����$�Z�����a�p7C������A���}��B�M�v���w�����)���L���� �U_8,:b��� �5V��
p:V�l��������{�?a��
n�����S�X�z*�������PXaS�Dx=�k���AVp?���'>�
�X.� ��$:����:|���f��"����&'����H���J��1�
8!$�u����w?Bq� �p:/��|�� �
���
�A�����kK��?�����w�gf-����� �C�����`�����:�����E����#8�`n �P�-�F @5�X�m�7( �����@�� zb"@VX�(
����^���6X"^x�� �z����=��"��.0��f% �L���L���+gf�����'�&�9��9�%�>�:�:���������=y���!gp
����@��9�`,|v)u���S�`�|+ �-�X�����+��$����������_����<��@��N�]�����;�w����x���0�'yLmj���kc�)���4�(��K��d���*���H�e����8.�/�n����8E$��h���w���p��� �������5B��D��N �r����P P+����7O����ph@� 5�����v�Q��g������`����X���.<XA������*$5x��a[I�+x���<m��D��o�� ��3����d,#h�� � �8�,Y�v��*����@�x.��{l�R67?�[$�3����?��������q��b��O8�{���x�y�
~� ~���:����D��%��� ����>�
���_P��MV~3��S��Q[���@S�����F�3����_��D�quH� HC:�{��5��\&?
��(xL�Rmo���5��3 H�� ���s�D����w�= �rp�C_R��� ��� ��7$�k��=�F�>�g)D�^�8$B;4��}���1� $���z�r��Yv�����l��Z��T��={��
�����e;< 0�L�A����N�b2`�z���V�GP�`�9!��C��y��e�Y}1i�k����������7���:��E,��1��F'*���qE>s�g����`i���Q|�C��
D������x:P��Ux����"pD���{��/��OflzG m�S��Y�Y`\��<r�53��h���@��I({���������oxO6[a�3-�8������7�-Dm�K;{���Y� 0���R����^(��?01�,2�1��x��'����J L���^�|�P��^�Q�q�~�'9V_���BL�O��x@�W��7�
���XQ�g&��
j!^��8�0n0�2A�#86��Y�p�We3��D�5a��&�_^o��4l!��|y8�k��� d�3���� �������Y����|l���c3����������V��['0-���+{8�[�T�W?�}�Y���*�b3i�3�cWb�.?��k��w�0�~N&p��|[}��_���B�R0���������u����O��5�ub�� pE�l��(�36{�WJq�(W]wh@
Iki�d��������RR,[�:��A��y[���@�� ���RP[��'cE����y��X&���
[@��C��I�w��We���B@���$���1�#��<����[� wo�v�� S��;�������}�36c:�)��C<�h
��������}��?�z���^�5�+��@�b&�&������ L��.�����2@@� �#y�t�q�!@%� ��D<P �n� ����`W>5�P<}�� �� .��������<K�3��)h?�,\v�����a:60��x���p8������KZ��c
�)�����z�������;x�|�BaW�@f��$��l������s�Vxa��Z�3 =�7��C���A!��ek�nX�,A!x���m�����i�����}BX���+
C��v�;��r��J����v�b��jV@1i��.)���^�D�f�`W� !5bO��������rg��/���cy�AD��94�<