From def25162acace9b9fdb32b1865ccc037de3b69c1 Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Mon, 11 Nov 2024 21:28:31 -0800
Subject: [PATCH v1 2/2] HashAgg: avoid unexpected hash table growth near
 memory limit.

When the simplehash table size is already a significant fraction of
the memory, and growing it would risk exceeding the memory limit, then
enter a mode in which new groups can be added iff there's room in the
existing bucket array (i.e. it wouldn't trigger SH_GROW()).
---
 src/backend/executor/execGrouping.c | 61 ++++++++++++++++++++++++-----
 src/backend/executor/nodeAgg.c      | 61 +++++++++++++++++++++++------
 src/include/executor/executor.h     | 11 ++++--
 src/include/nodes/execnodes.h       |  3 +-
 4 files changed, 111 insertions(+), 25 deletions(-)

diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index 774e4de882..44dc48c981 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -25,7 +25,8 @@ static inline uint32 TupleHashTableHash_internal(struct tuplehash_hash *tb,
 												 const MinimalTuple tuple);
 static inline TupleHashEntry LookupTupleHashEntry_internal(TupleHashTable hashtable,
 														   TupleTableSlot *slot,
-														   bool *isnew, uint32 hash);
+														   bool grow_ok, bool *isnew,
+														   uint32 hash);
 
 /*
  * Define parameters for tuple hash table code generation. The interface is
@@ -320,7 +321,42 @@ LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
 	hashtable->cur_eq_func = hashtable->tab_eq_func;
 
 	local_hash = TupleHashTableHash_internal(hashtable->hashtab, NULL);
-	entry = LookupTupleHashEntry_internal(hashtable, slot, isnew, local_hash);
+	entry = LookupTupleHashEntry_internal(hashtable, slot, true, isnew,
+										  local_hash);
+
+	if (hash != NULL)
+		*hash = local_hash;
+
+	Assert(entry == NULL || entry->hash == local_hash);
+
+	MemoryContextSwitchTo(oldContext);
+
+	return entry;
+}
+
+/*
+ * Like LookupTupleHashEntry, but allows specifying grow_ok to false, which
+ * prevents a new entry from growing the internal bucket array.
+ */
+TupleHashEntry
+LookupTupleHashEntryExt(TupleHashTable hashtable, TupleTableSlot *slot,
+						bool grow_ok, bool *isnew, uint32 *hash)
+{
+	TupleHashEntry entry;
+	MemoryContext oldContext;
+	uint32		local_hash;
+
+	/* Need to run the hash functions in short-lived context */
+	oldContext = MemoryContextSwitchTo(hashtable->tempcxt);
+
+	/* set up data needed by hash and match functions */
+	hashtable->inputslot = slot;
+	hashtable->in_hash_funcs = hashtable->tab_hash_funcs;
+	hashtable->cur_eq_func = hashtable->tab_eq_func;
+
+	local_hash = TupleHashTableHash_internal(hashtable->hashtab, NULL);
+	entry = LookupTupleHashEntry_internal(hashtable, slot, grow_ok, isnew,
+										  local_hash);
 
 	if (hash != NULL)
 		*hash = local_hash;
@@ -356,11 +392,12 @@ TupleHashTableHash(TupleHashTable hashtable, TupleTableSlot *slot)
 
 /*
  * A variant of LookupTupleHashEntry for callers that have already computed
- * the hash value.
+ * the hash value. Also allows specifying grow_ok to false, which prevents a
+ * new entry from growing the internal bucket array.
  */
 TupleHashEntry
-LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot,
-						 bool *isnew, uint32 hash)
+LookupTupleHashEntryHashExt(TupleHashTable hashtable, TupleTableSlot *slot,
+							bool grow_ok, bool *isnew, uint32 hash)
 {
 	TupleHashEntry entry;
 	MemoryContext oldContext;
@@ -373,7 +410,8 @@ LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot,
 	hashtable->in_hash_funcs = hashtable->tab_hash_funcs;
 	hashtable->cur_eq_func = hashtable->tab_eq_func;
 
-	entry = LookupTupleHashEntry_internal(hashtable, slot, isnew, hash);
+	entry = LookupTupleHashEntry_internal(hashtable, slot, grow_ok, isnew,
+										  hash);
 	Assert(entry == NULL || entry->hash == hash);
 
 	MemoryContextSwitchTo(oldContext);
@@ -490,12 +528,15 @@ TupleHashTableHash_internal(struct tuplehash_hash *tb,
  * so that we can avoid switching the memory context multiple times for
  * LookupTupleHashEntry.
  *
+ * If grow_ok is false, returns NULL if adding a new entry would cause the
+ * internal tuplehash bucket array to grow (that is, to double in size).
+ *
  * NB: This function may or may not change the memory context. Caller is
  * expected to change it back.
  */
 static inline TupleHashEntry
 LookupTupleHashEntry_internal(TupleHashTable hashtable, TupleTableSlot *slot,
-							  bool *isnew, uint32 hash)
+							  bool grow_ok, bool *isnew, uint32 hash)
 {
 	TupleHashEntryData *entry;
 	bool		found;
@@ -505,11 +546,11 @@ LookupTupleHashEntry_internal(TupleHashTable hashtable, TupleTableSlot *slot,
 
 	if (isnew)
 	{
-		entry = tuplehash_insert_hash(hashtable->hashtab, key, hash, &found);
+		entry = tuplehash_insert_ext(hashtable->hashtab, key, hash, grow_ok,
+									 &found);
 
-		if (found)
+		if (entry == NULL || found)
 		{
-			/* found pre-existing entry */
 			*isnew = false;
 		}
 		else
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 53ead77ece..221cee1cd5 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1494,6 +1494,7 @@ build_hash_tables(AggState *aggstate)
 	}
 
 	aggstate->hash_ngroups_current = 0;
+	aggstate->hash_grow_ok = true;
 }
 
 /*
@@ -1861,6 +1862,19 @@ hash_agg_check_limits(AggState *aggstate)
 	Size		hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
 														true);
 
+	/*
+	 * The bucket array is stored in hash_metacxt, and each time it grows, it
+	 * doubles. If doubling the bucket array would put us over the memory
+	 * limit, then we must set hash_grow_ok to false to prevent it.
+	 *
+	 * When multiple grouping sets are involved, this calculation is overly
+	 * conservative, because growing a single hashtable will not double the
+	 * overall meta_mem. But that only affects the first batch; subsequent
+	 * batches only deal with a single grouping set at a time.
+	 */
+	if (meta_mem * 2 + hashkey_mem > aggstate->hash_mem_limit)
+		aggstate->hash_grow_ok = false;
+
 	/*
 	 * Don't spill unless there's at least one group in the hash table so we
 	 * can be sure to make progress even in edge cases.
@@ -2103,7 +2117,7 @@ lookup_hash_entries(AggState *aggstate)
 		AggStatePerHash perhash = &aggstate->perhash[setno];
 		TupleHashTable hashtable = perhash->hashtable;
 		TupleTableSlot *hashslot = perhash->hashslot;
-		TupleHashEntry entry;
+		TupleHashEntry entry = NULL;
 		uint32		hash;
 		bool		isnew = false;
 		bool	   *p_isnew;
@@ -2116,16 +2130,25 @@ lookup_hash_entries(AggState *aggstate)
 						  outerslot,
 						  hashslot);
 
-		entry = LookupTupleHashEntry(hashtable, hashslot,
-									 p_isnew, &hash);
-
-		if (entry != NULL)
+		entry = LookupTupleHashEntryExt(hashtable, hashslot,
+										aggstate->hash_grow_ok,
+										p_isnew, &hash);
+		if (entry)
 		{
 			if (isnew)
 				initialize_hash_entry(aggstate, hashtable, entry);
 			pergroup[setno] = entry->additional;
 		}
-		else
+		else if (!aggstate->hash_spill_mode)
+		{
+			/*
+			 * New entry would grow hash table bucket array, and may put us
+			 * over the memory limit.
+			 */
+			hash_agg_enter_spill_mode(aggstate);
+		}
+
+		if (!entry)
 		{
 			HashAggSpill *spill = &aggstate->hash_spills[setno];
 			TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
@@ -2626,6 +2649,7 @@ agg_refill_hash_table(AggState *aggstate)
 		ResetTupleHashTable(aggstate->perhash[setno].hashtable);
 
 	aggstate->hash_ngroups_current = 0;
+	aggstate->hash_grow_ok = true;
 
 	/*
 	 * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
@@ -2656,7 +2680,7 @@ agg_refill_hash_table(AggState *aggstate)
 	{
 		TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
 		TupleTableSlot *hashslot = perhash->hashslot;
-		TupleHashEntry entry;
+		TupleHashEntry entry = NULL;
 		MinimalTuple tuple;
 		uint32		hash;
 		bool		isnew = false;
@@ -2674,17 +2698,28 @@ agg_refill_hash_table(AggState *aggstate)
 		prepare_hash_slot(perhash,
 						  aggstate->tmpcontext->ecxt_outertuple,
 						  hashslot);
-		entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot,
-										 p_isnew, hash);
 
-		if (entry != NULL)
+		entry = LookupTupleHashEntryHashExt(perhash->hashtable, hashslot,
+											true, p_isnew,
+											hash);
+
+		if (entry)
 		{
 			if (isnew)
 				initialize_hash_entry(aggstate, perhash->hashtable, entry);
 			aggstate->hash_pergroup[batch->setno] = entry->additional;
 			advance_aggregates(aggstate);
 		}
-		else
+		else if (!aggstate->hash_spill_mode)
+		{
+			/*
+			 * New entry would grow hash table bucket array, and may put us
+			 * over the memory limit.
+			 */
+			hash_agg_enter_spill_mode(aggstate);
+		}
+
+		if (!entry)
 		{
 			if (!spill_initialized)
 			{
@@ -2723,6 +2758,7 @@ agg_refill_hash_table(AggState *aggstate)
 	else
 		hash_agg_update_metrics(aggstate, true, 0);
 
+	aggstate->hash_grow_ok = true;
 	aggstate->hash_spill_mode = false;
 
 	/* prepare to walk the first hash table */
@@ -3082,6 +3118,7 @@ hashagg_finish_initial_spills(AggState *aggstate)
 
 	hash_agg_update_metrics(aggstate, false, total_npartitions);
 	aggstate->hash_spill_mode = false;
+	aggstate->hash_grow_ok = true;
 }
 
 /*
@@ -3225,6 +3262,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	aggstate->grp_firstTuple = NULL;
 	aggstate->sort_in = NULL;
 	aggstate->sort_out = NULL;
+	aggstate->hash_grow_ok = true;
 
 	/*
 	 * phases[0] always exists, but is dummy in sorted/plain mode
@@ -4449,6 +4487,7 @@ ExecReScanAgg(AggState *node)
 	{
 		hashagg_reset_spill_state(node);
 
+		node->hash_grow_ok = true;
 		node->hash_ever_spilled = false;
 		node->hash_spill_mode = false;
 		node->hash_ngroups_current = 0;
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 69c3ebff00..100facd155 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -152,11 +152,16 @@ extern TupleHashTable BuildTupleHashTableExt(PlanState *parent,
 extern TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable,
 										   TupleTableSlot *slot,
 										   bool *isnew, uint32 *hash);
+extern TupleHashEntry LookupTupleHashEntryExt(TupleHashTable hashtable,
+											  TupleTableSlot *slot,
+											  bool grow_ok, bool *isnew,
+											  uint32 *hash);
 extern uint32 TupleHashTableHash(TupleHashTable hashtable,
 								 TupleTableSlot *slot);
-extern TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable,
-											   TupleTableSlot *slot,
-											   bool *isnew, uint32 hash);
+extern TupleHashEntry LookupTupleHashEntryHashExt(TupleHashTable hashtable,
+												  TupleTableSlot *slot,
+												  bool grow_ok, bool *isnew,
+												  uint32 hash);
 extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable,
 										 TupleTableSlot *slot,
 										 ExprState *eqcomp,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 182a6956bb..0c25d706f9 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2543,6 +2543,7 @@ typedef struct AggState
 	TupleTableSlot *hash_spill_rslot;	/* for reading spill files */
 	TupleTableSlot *hash_spill_wslot;	/* for writing spill files */
 	List	   *hash_batches;	/* hash batches remaining to be processed */
+	bool	    hash_grow_ok;
 	bool		hash_ever_spilled;	/* ever spilled during this execution? */
 	bool		hash_spill_mode;	/* we hit a limit during the current batch
 									 * and we must not create new groups */
@@ -2562,7 +2563,7 @@ typedef struct AggState
 										 * per-group pointers */
 
 	/* support for evaluation of agg input expressions: */
-#define FIELDNO_AGGSTATE_ALL_PERGROUPS 53
+#define FIELDNO_AGGSTATE_ALL_PERGROUPS 54
 	AggStatePerGroup *all_pergroups;	/* array of first ->pergroups, than
 										 * ->hash_pergroup */
 	SharedAggInfo *shared_info; /* one entry per worker */
-- 
2.34.1

