From 8c60a1fdec40b8c7b29843e40c8a85abef454461 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Sun, 5 Jan 2025 21:24:23 +0100
Subject: [PATCH v20250125 1/3] Balance memory usage with hashjoin batch
 explosion

The basic logic to pick the number of hashjoin batches is concerned only
with the in-memory hash table, adding batches to keep the hash table
within the limit defined by work_mem and hash_mem_multiplier. It ignores
the memory needed by the batch files, but with enough batches this may
be a substantial amount of memory, easily orders of magnitude more than
the hash table.

We've seen reports of hash joins with hundreds of thousands or millions
of batch files, consuming gigabytes of memory, and triggering OOM. These
cases are not too common, but it's clearly possible to hit them.

This patch improves the situation by rebalancing how the memory is
distributed between the hash table and batch files, to minimize the
total memory consumption.

Whenever we need to increase the capacity of the hash node, we can do
that by either doubling the number of batches or doubling the size of
the in-memory hash table. The outcome is the same, allowing the hash
node to handle a relation twice the size. But the memory requirements
may be substantially different, depending on the current hashjoin
parameters (for low nbatch values it's better to add batches, for high
nbatch values it's better to allow a larger hash table).

It may seem a bit strange, as it clearly allows exceeding the memory
limit specified by the GUC parameters. But it has always been like this,
except that the code assumed adding batches is free. The patch just
makes this visible and explicit.

Increasing the hashtable memory limit may also help to prevent the batch
explosion in the first place. Given enough hash collisions or duplicate
hashes it's easy to get a batch that can't be split, resulting in a
cycle of quickly doubling the number of batches. Allowing the hashtable
to get larger may stop this.
---
 src/backend/executor/nodeHash.c     | 243 +++++++++++++++++++++++++++-
 src/backend/utils/misc/guc_tables.c |  11 ++
 src/include/executor/hashjoin.h     |   2 +
 3 files changed, 254 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6f8a379e3b9..ce3dc6f1fa6 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,8 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+/* enable adaptive adjustment of hashtable size */
+bool	enable_hashjoin_adjust = false;
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -848,6 +850,84 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		nbatch = pg_nextpower2_32(Max(2, minbatch));
 	}
 
+	/*
+	 * Optimize the total amount of memory consumed by the hash node.
+	 *
+	 * The nbatch calculation above focuses on the size of the in-memory hash
+	 * table, ignoring the memory used by batch files. But that can be a lot
+	 * of memory - each batch file has a BLCKSZ buffer, and we may need two
+	 * files per batch (inner and outer side). So with enough batches this can
+	 * be significantly more memory than the hashtable itself, and it grows
+	 * quickly as we're adding more batches.
+	 *
+	 * It might seem cleaner to adjust the calculation above, to consider
+	 * memory for both the in-memory hashtable and the batch files, and ensure
+	 * it fits into hash_table_bytes. That is, look for a nbatch value so that
+	 *
+	 * (inner_rel_bytes / nbatch) + (2 * nbatch * BLCKSZ) <= hash_table_bytes
+	 *
+	 * But that has a flaw - for sufficiently large inner_rel_bytes value it
+	 * may not have a solution (either the hash table is too large or it
+	 * requires too many batches). So instead we merely try to minimize the
+	 * impact, and use as little memory as possible, instead of strictly
+	 * enforcing the memory limit. (But we haven't really enforced it before
+	 * either, as we simply ignored the batch files.)
+	 *
+	 * The basic observation is that given an inner relation of a given size,
+	 * we may divide it in arbitrary number of batches, which determines the
+	 * memory consumption per the already mentioned formula:
+	 *
+	 * (inner_rel_bytes / nbatch) + (2 * nbatch * BLCKSZ)
+	 *
+	 * That is, we can reduce the number of batches to (nbatch/2), at the
+	 * cost of doubling the size of the in-memory hash table. But these two
+	 * terms work in opposite ways - size of the in-memory part decreases
+	 * with nbatch, while the batch file memory grows very quickly. Initially
+	 * the memory usage is dominated by in-memory hash table (for nbatch=0),
+	 * then at some point the batch files start to consume more memory.
+	 *
+	 * If you combine these two, the memory consumption (for a fixed size of
+	 * the inner relation) has a u-shape, with a minimum at some nbatch value.
+	 * Our goal is to look for this minimum. We do that by calculating memory
+	 * usage for (nbatch/2), and accepting it if it's lower than current.
+	 *
+	 * This means we're only ever reducing nbatch values, we'll never increase
+	 * it. We could try moving in both directions, depending on which part of
+	 * the curve we're at initially, but that does not seem very useful. Yes,
+	 * it might reduce the memory, but in this part the memory is dominated by
+	 * the hash table, which we used for the initial nbatch calculation, so
+	 * there is little risk of OOM or major memory overruns. The impact on
+	 * memory consumption is less significant than when reducing nbatches, and
+	 * adding batches can have significant impact. It does not seem worth it.
+	 *
+	 * While reducing the nbatch value, we increase the nbucket value. With
+	 * fewer batches we accept the in-memory hashtable will get larger, but
+	 * the earlier calculation sized it for the hash_table_bytes.
+	 */
+	while (nbatch > 0)
+	{
+		/* how much memory would we use with half the batches? */
+		size_t	space = hash_table_bytes * 2 + (nbatch * BLCKSZ);
+		size_t	current = hash_table_bytes + (2 * nbatch * BLCKSZ);
+
+		/* Is the adaptive behavior enabled? */
+		if (!enable_hashjoin_adjust)
+			break;
+
+		/* If the memory usage does not decrease, we have the optimum. */
+		if (current < space)
+			break;
+
+		/*
+		 * It's better to use half the batches, so do that and adjust the
+		 * nbucket in the opposite direction, and the allowance.
+		 */
+		nbatch /= 2;
+		nbuckets *= 2;
+
+		*space_allowed = *space_allowed * 2;
+	}
+
 	Assert(nbuckets > 0);
 	Assert(nbatch > 0);
 
@@ -890,6 +970,99 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	pfree(hashtable);
 }
 
+/*
+ * Consider adjusting the allowed hash table size, depending on the number
+ * of batches, to minimize the overall memory usage (for both the hashtable
+ * and batch files).
+ */
+static bool
+ExecHashIncreaseBatchSize(HashJoinTable hashtable)
+{
+	HashMemoryChunk chunk;
+
+	/*
+	 * How much memory would doubling nbatch use? Each batch may require
+	 * two buffered files (inner/outer), with a BLCKSZ buffer.
+	 */
+	size_t	batchSpace = (hashtable->nbatch * 2 * BLCKSZ);
+
+	/* Do nothing if the adaptive behavior is disabled. */
+	if (!enable_hashjoin_adjust)
+		return false;
+
+	/*
+	 * Compare the new space needed for doubling nbatch and for enlarging the
+	 * in-memory hash table. If doubling the hash table needs less memory,
+	 * just do that. Otherwise, continue with doubling the nbatch.
+	 *
+	 * XXX We're comparing the current spaceAllowed/batchSpace values, because
+	 * if we double either of those this is the new memory we'll use.
+	 */
+	if (hashtable->spaceAllowed > batchSpace)
+	{
+		/* countinue with nbatch doubling */
+		return false;
+	}
+
+	/*
+	 * Better to double the size of the in-memory hashtable. We want to increase
+	 * the number of buckets too, so we'll rebuild the hash table.
+	 */
+	hashtable->spaceAllowed *= 2;
+	hashtable->nbuckets_optimal *= 2;
+	hashtable->log2_nbuckets_optimal++;
+
+	/*
+	 * We will scan through the chunks directly, so that we can reset the
+	 * buckets now and not have to keep track which tuples in the buckets have
+	 * already been processed. We will free the old chunks as we go.
+	 *
+	 * XXX This is copy from ExecHashIncreaseNumBatches, except that we don't
+	 * need to copy/shuffle tuples to other batches, we're only rebuilding
+	 * the buckets.
+	 */
+	memset(hashtable->buckets.unshared, 0,
+		   sizeof(HashJoinTuple) * hashtable->nbuckets);
+	chunk = hashtable->chunks;
+
+	/* so, let's scan through the chunks, and all tuples in each chunk */
+	while (chunk != NULL)
+	{
+		HashMemoryChunk nextchunk = chunk->next.unshared;
+
+		/* position within the buffer (up to oldchunks->used) */
+		size_t		idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < chunk->used)
+		{
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+			int			bucketno;
+			int			batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* and add it back to the appropriate bucket */
+			hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+			hashtable->buckets.unshared[bucketno] = hashTuple;
+
+			/* next tuple in this chunk */
+			idx += MAXALIGN(hashTupleSize);
+
+			/* allow this loop to be cancellable */
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/* we're done with this chunk - proceed to the next one */
+		chunk = nextchunk;
+	}
+
+	return true;
+}
+
 /*
  * ExecHashIncreaseNumBatches
  *		increase the original number of batches in order to reduce
@@ -913,6 +1086,10 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
 		return;
 
+	/* consider increasing size of the in-memory hash table instead */
+	if (ExecHashIncreaseBatchSize(hashtable))
+		return;
+
 	nbatch = oldnbatch * 2;
 	Assert(nbatch > 1);
 
@@ -1793,6 +1970,69 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
 		heap_free_minimal_tuple(tuple);
 }
 
+static inline unsigned char
+reverse_byte(unsigned char x)
+{
+	static const unsigned char table[] = {
+		0x00, 0x80, 0x40, 0xc0, 0x20, 0xa0, 0x60, 0xe0,
+		0x10, 0x90, 0x50, 0xd0, 0x30, 0xb0, 0x70, 0xf0,
+		0x08, 0x88, 0x48, 0xc8, 0x28, 0xa8, 0x68, 0xe8,
+		0x18, 0x98, 0x58, 0xd8, 0x38, 0xb8, 0x78, 0xf8,
+		0x04, 0x84, 0x44, 0xc4, 0x24, 0xa4, 0x64, 0xe4,
+		0x14, 0x94, 0x54, 0xd4, 0x34, 0xb4, 0x74, 0xf4,
+		0x0c, 0x8c, 0x4c, 0xcc, 0x2c, 0xac, 0x6c, 0xec,
+		0x1c, 0x9c, 0x5c, 0xdc, 0x3c, 0xbc, 0x7c, 0xfc,
+		0x02, 0x82, 0x42, 0xc2, 0x22, 0xa2, 0x62, 0xe2,
+		0x12, 0x92, 0x52, 0xd2, 0x32, 0xb2, 0x72, 0xf2,
+		0x0a, 0x8a, 0x4a, 0xca, 0x2a, 0xaa, 0x6a, 0xea,
+		0x1a, 0x9a, 0x5a, 0xda, 0x3a, 0xba, 0x7a, 0xfa,
+		0x06, 0x86, 0x46, 0xc6, 0x26, 0xa6, 0x66, 0xe6,
+		0x16, 0x96, 0x56, 0xd6, 0x36, 0xb6, 0x76, 0xf6,
+		0x0e, 0x8e, 0x4e, 0xce, 0x2e, 0xae, 0x6e, 0xee,
+		0x1e, 0x9e, 0x5e, 0xde, 0x3e, 0xbe, 0x7e, 0xfe,
+		0x01, 0x81, 0x41, 0xc1, 0x21, 0xa1, 0x61, 0xe1,
+		0x11, 0x91, 0x51, 0xd1, 0x31, 0xb1, 0x71, 0xf1,
+		0x09, 0x89, 0x49, 0xc9, 0x29, 0xa9, 0x69, 0xe9,
+		0x19, 0x99, 0x59, 0xd9, 0x39, 0xb9, 0x79, 0xf9,
+		0x05, 0x85, 0x45, 0xc5, 0x25, 0xa5, 0x65, 0xe5,
+		0x15, 0x95, 0x55, 0xd5, 0x35, 0xb5, 0x75, 0xf5,
+		0x0d, 0x8d, 0x4d, 0xcd, 0x2d, 0xad, 0x6d, 0xed,
+		0x1d, 0x9d, 0x5d, 0xdd, 0x3d, 0xbd, 0x7d, 0xfd,
+		0x03, 0x83, 0x43, 0xc3, 0x23, 0xa3, 0x63, 0xe3,
+		0x13, 0x93, 0x53, 0xd3, 0x33, 0xb3, 0x73, 0xf3,
+		0x0b, 0x8b, 0x4b, 0xcb, 0x2b, 0xab, 0x6b, 0xeb,
+		0x1b, 0x9b, 0x5b, 0xdb, 0x3b, 0xbb, 0x7b, 0xfb,
+		0x07, 0x87, 0x47, 0xc7, 0x27, 0xa7, 0x67, 0xe7,
+		0x17, 0x97, 0x57, 0xd7, 0x37, 0xb7, 0x77, 0xf7,
+		0x0f, 0x8f, 0x4f, 0xcf, 0x2f, 0xaf, 0x6f, 0xef,
+		0x1f, 0x9f, 0x5f, 0xdf, 0x3f, 0xbf, 0x7f, 0xff,
+	};
+
+	return table[x];
+}
+
+/*
+ * Reverse bits using a lookup table from here:
+ *
+ * https://graphics.stanford.edu/~seander/bithacks.html#BitReverseTable
+ *
+ * FWIW the 4-ops algorithm seems to be a bit faster.
+ */
+static inline uint32
+reverse_uint32(uint32 x)
+{
+	uint32	ret;
+
+	unsigned char *src = (unsigned char *) &x;
+	unsigned char *dst = (unsigned char *) &ret;
+
+	dst[0] = reverse_byte(src[3]);
+	dst[1] = reverse_byte(src[2]);
+	dst[2] = reverse_byte(src[1]);
+	dst[3] = reverse_byte(src[0]);
+
+	return ret;
+}
 
 /*
  * ExecHashGetBucketAndBatch
@@ -1833,8 +2073,7 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	if (nbatch > 1)
 	{
 		*bucketno = hashvalue & (nbuckets - 1);
-		*batchno = pg_rotate_right32(hashvalue,
-									 hashtable->log2_nbuckets) & (nbatch - 1);
+		*batchno = reverse_uint32(hashvalue) & (nbatch - 1);
 	}
 	else
 	{
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 38cb9e970d5..874cc3547a2 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -46,6 +46,7 @@
 #include "commands/vacuum.h"
 #include "common/file_utils.h"
 #include "common/scram-common.h"
+#include "executor/hashjoin.h"
 #include "jit/jit.h"
 #include "libpq/auth.h"
 #include "libpq/libpq.h"
@@ -900,6 +901,16 @@ struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_hashjoin_adjust", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables adjusting hashtable size to minimize memory usage."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_hashjoin_adjust,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_gathermerge", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of gather merge plans."),
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 67ae89c8257..e16a03a87f8 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -71,6 +71,8 @@
  * ----------------------------------------------------------------
  */
 
+extern PGDLLIMPORT bool enable_hashjoin_adjust;
+
 /* these are in nodes/execnodes.h: */
 /* typedef struct HashJoinTupleData *HashJoinTuple; */
 /* typedef struct HashJoinTableData *HashJoinTable; */
-- 
2.47.1

