*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2884,2889 **** include_dir 'conf.d'
--- 2884,2904 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-enable-hashagg-disk" xreflabel="enable_hashagg_disk">
+       <term><varname>enable_hashagg_disk</varname> (<type>boolean</type>)
+       <indexterm>
+        <primary><varname>enable_hashagg_disk</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Enables or disables the query planner's use of hashed aggregation plan
+         types when the planner expects the hash table size to exceed
+         <varname>work_mem</varname>. The default is <literal>on</>.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       <varlistentry id="guc-enable-hashjoin" xreflabel="enable_hashjoin">
        <term><varname>enable_hashjoin</varname> (<type>boolean</type>)
        <indexterm>
*** a/src/backend/executor/execGrouping.c
--- b/src/backend/executor/execGrouping.c
***************
*** 331,336 **** TupleHashEntry
--- 331,385 ----
  LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
  					 bool *isnew)
  {
+ 	uint32 hashvalue;
+ 
+ 	hashvalue = TupleHashEntryHash(hashtable, slot);
+ 	return LookupTupleHashEntryHash(hashtable, slot, hashvalue, isnew);
+ }
+ 
+ /*
+  * TupleHashEntryHash
+  *
+  * Calculate the hash value of the tuple.
+  */
+ uint32
+ TupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot)
+ {
+ 	TupleHashEntryData	dummy;
+ 	TupleHashTable		saveCurHT;
+ 	uint32				hashvalue;
+ 
+ 	/*
+ 	 * Set up data needed by hash function.
+ 	 *
+ 	 * We save and restore CurTupleHashTable just in case someone manages to
+ 	 * invoke this code re-entrantly.
+ 	 */
+ 	hashtable->inputslot = slot;
+ 	hashtable->in_hash_funcs = hashtable->tab_hash_funcs;
+ 	hashtable->cur_eq_funcs = hashtable->tab_eq_funcs;
+ 
+ 	saveCurHT = CurTupleHashTable;
+ 	CurTupleHashTable = hashtable;
+ 
+ 	dummy.firstTuple = NULL;	/* flag to reference inputslot */
+ 	hashvalue = TupleHashTableHash(&dummy, sizeof(TupleHashEntryData));
+ 
+ 	CurTupleHashTable = saveCurHT;
+ 
+ 	return hashvalue;
+ }
+ 
+ /*
+  * LookupTupleHashEntryHash
+  *
+  * Like LookupTupleHashEntry, but allows the caller to specify the tuple's
+  * hash value, to avoid recalculating it.
+  */
+ TupleHashEntry
+ LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot,
+ 						 uint32 hashvalue, bool *isnew)
+ {
  	TupleHashEntry entry;
  	MemoryContext oldContext;
  	TupleHashTable saveCurHT;
***************
*** 371,380 **** LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
  
  	/* Search the hash table */
  	dummy.firstTuple = NULL;	/* flag to reference inputslot */
! 	entry = (TupleHashEntry) hash_search(hashtable->hashtab,
! 										 &dummy,
! 										 isnew ? HASH_ENTER : HASH_FIND,
! 										 &found);
  
  	if (isnew)
  	{
--- 420,428 ----
  
  	/* Search the hash table */
  	dummy.firstTuple = NULL;	/* flag to reference inputslot */
! 	entry = (TupleHashEntry) hash_search_with_hash_value(
! 		hashtable->hashtab, &dummy, hashvalue, isnew ? HASH_ENTER : HASH_FIND,
! 		&found);
  
  	if (isnew)
  	{
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
***************
*** 108,121 ****
--- 108,126 ----
  #include "optimizer/tlist.h"
  #include "parser/parse_agg.h"
  #include "parser/parse_coerce.h"
+ #include "storage/buffile.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
+ #include "utils/dynahash.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/syscache.h"
  #include "utils/tuplesort.h"
  #include "utils/datum.h"
  
+ #define HASH_DISK_MIN_PARTITIONS		1
+ #define HASH_DISK_DEFAULT_PARTITIONS	16
+ #define HASH_DISK_MAX_PARTITIONS		256
  
  /*
   * AggStatePerAggData - per-aggregate working state for the Agg scan
***************
*** 301,306 **** typedef struct AggHashEntryData
--- 306,321 ----
  	AggStatePerGroupData pergroup[1];	/* VARIABLE LENGTH ARRAY */
  }	AggHashEntryData;	/* VARIABLE LENGTH STRUCT */
  
+ typedef struct HashWork
+ {
+ 	BufFile		 *input_file;	/* input partition, NULL for outer plan */
+ 	int			  input_bits;	/* number of bits for input partition mask */
+ 
+ 	int			  n_output_partitions; /* number of output partitions */
+ 	BufFile		**output_partitions; /* output partition files */
+ 	int			 *output_ntuples; /* number of tuples in each partition */
+ 	int			  output_bits; /* log2(n_output_partitions) + input_bits */
+ } HashWork;
  
  static void initialize_aggregates(AggState *aggstate,
  					  AggStatePerAgg peragg,
***************
*** 322,331 **** static void finalize_aggregate(AggState *aggstate,
  static Bitmapset *find_unaggregated_cols(AggState *aggstate);
  static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
  static void build_hash_table(AggState *aggstate);
! static AggHashEntry lookup_hash_entry(AggState *aggstate,
! 				  TupleTableSlot *inputslot);
  static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
! static void agg_fill_hash_table(AggState *aggstate);
  static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
  static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
  
--- 337,349 ----
  static Bitmapset *find_unaggregated_cols(AggState *aggstate);
  static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
  static void build_hash_table(AggState *aggstate);
! static AggHashEntry lookup_hash_entry(AggState *aggstate, HashWork *work,
! 				   TupleTableSlot *inputslot, uint32 hashvalue);
! static HashWork *hash_work(BufFile *input_file, int input_bits);
! static void save_tuple(AggState *aggstate, HashWork *work,
! 					   TupleTableSlot *slot, uint32 hashvalue);
  static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
! static bool agg_fill_hash_table(AggState *aggstate);
  static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
  static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
  
***************
*** 946,953 **** build_hash_table(AggState *aggstate)
  											  aggstate->hashfunctions,
  											  node->numGroups,
  											  entrysize,
! 											  aggstate->aggcontext,
  											  tmpmem);
  }
  
  /*
--- 964,974 ----
  											  aggstate->hashfunctions,
  											  node->numGroups,
  											  entrysize,
! 											  aggstate->hashcontext,
  											  tmpmem);
+ 
+ 	aggstate->hash_mem_min = MemoryContextGetAllocated(
+ 		aggstate->hashcontext, true);
  }
  
  /*
***************
*** 1024,1035 **** hash_agg_entry_size(int numAggs)
   * When called, CurrentMemoryContext should be the per-query context.
   */
  static AggHashEntry
! lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot)
  {
  	TupleTableSlot *hashslot = aggstate->hashslot;
  	ListCell   *l;
  	AggHashEntry entry;
! 	bool		isnew;
  
  	/* if first time through, initialize hashslot by cloning input slot */
  	if (hashslot->tts_tupleDescriptor == NULL)
--- 1045,1059 ----
   * When called, CurrentMemoryContext should be the per-query context.
   */
  static AggHashEntry
! lookup_hash_entry(AggState *aggstate, HashWork *work,
! 				  TupleTableSlot *inputslot, uint32 hashvalue)
  {
  	TupleTableSlot *hashslot = aggstate->hashslot;
  	ListCell   *l;
  	AggHashEntry entry;
! 	int64		hash_mem;
! 	bool		isnew = false;
! 	bool	   *p_isnew;
  
  	/* if first time through, initialize hashslot by cloning input slot */
  	if (hashslot->tts_tupleDescriptor == NULL)
***************
*** 1049,1058 **** lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot)
  		hashslot->tts_isnull[varNumber] = inputslot->tts_isnull[varNumber];
  	}
  
  	/* find or create the hashtable entry using the filtered tuple */
! 	entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable,
! 												hashslot,
! 												&isnew);
  
  	if (isnew)
  	{
--- 1073,1089 ----
  		hashslot->tts_isnull[varNumber] = inputslot->tts_isnull[varNumber];
  	}
  
+ 	hash_mem = MemoryContextGetAllocated(aggstate->hashcontext, true);
+ 	if (hash_mem == aggstate->hash_mem_min ||
+ 		hash_mem < work_mem * 1024L)
+ 		p_isnew = &isnew;
+ 	else
+ 		p_isnew = NULL;
+ 
  	/* find or create the hashtable entry using the filtered tuple */
! 	entry = (AggHashEntry) LookupTupleHashEntryHash(aggstate->hashtable,
! 													hashslot, hashvalue,
! 													p_isnew);
  
  	if (isnew)
  	{
***************
*** 1060,1068 **** lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot)
--- 1091,1242 ----
  		initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup);
  	}
  
+ 	if (entry == NULL)
+ 		save_tuple(aggstate, work, inputslot, hashvalue);
+ 
  	return entry;
  }
  
+ 
+ /*
+  * hash_work
+  *
+  * Construct a HashWork item, which represents one iteration of HashAgg to be
+  * done. Should be called in the aggregate's memory context.
+  */
+ static HashWork *
+ hash_work(BufFile *input_file, int input_bits)
+ {
+ 	HashWork *work = palloc(sizeof(HashWork));
+ 
+ 	work->input_file = input_file;
+ 	work->input_bits = input_bits;
+ 
+ 	/*
+ 	 * Will be set only if we run out of memory and need to partition an
+ 	 * additional level.
+ 	 */
+ 	work->n_output_partitions = 0;
+ 	work->output_partitions = NULL;
+ 	work->output_ntuples = NULL;
+ 	work->output_bits = 0;
+ 
+ 	return work;
+ }
+ 
+ /*
+  * save_tuple
+  *
+  * Not enough memory to add tuple as new entry in hash table. Save for later
+  * in the appropriate partition.
+  */
+ static void
+ save_tuple(AggState *aggstate, HashWork *work, TupleTableSlot *slot,
+ 		   uint32 hashvalue)
+ {
+ 	int					 partition;
+ 	MinimalTuple		 tuple;
+ 	BufFile				*file;
+ 	int					 written;
+ 
+ 	if (work->output_partitions == NULL)
+ 	{
+ 		int npartitions = HASH_DISK_DEFAULT_PARTITIONS; //TODO choose
+ 		int partition_bits;
+ 		int i;
+ 
+ 		if (npartitions < HASH_DISK_MIN_PARTITIONS)
+ 			npartitions = HASH_DISK_MIN_PARTITIONS;
+ 		if (npartitions > HASH_DISK_MAX_PARTITIONS)
+ 			npartitions = HASH_DISK_MAX_PARTITIONS;
+ 
+ 		partition_bits = my_log2(npartitions);
+ 
+ 		/* make sure that we don't exhaust the hash bits */
+ 		if (partition_bits + work->input_bits >= 32)
+ 			partition_bits = 32 - work->input_bits;
+ 
+ 		/* number of partitions will be a power of two */
+ 		npartitions = 1L << partition_bits;
+ 
+ 		work->output_bits = partition_bits;
+ 		work->n_output_partitions = npartitions;
+ 		work->output_partitions = palloc(sizeof(BufFile *) * npartitions);
+ 		work->output_ntuples = palloc0(sizeof(int) * npartitions);
+ 
+ 		for (i = 0; i < npartitions; i++)
+ 			work->output_partitions[i] = BufFileCreateTemp(false);
+ 	}
+ 
+ 	if (work->output_bits == 0)
+ 		partition = 0;
+ 	else
+ 		partition = (hashvalue << work->input_bits) >>
+ 			(32 - work->output_bits);
+ 
+ 	work->output_ntuples[partition]++;
+ 	file = work->output_partitions[partition];
+ 	tuple = ExecFetchSlotMinimalTuple(slot);
+ 
+ 	written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
+ 	if (written != sizeof(uint32))
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not write to HashAgg temporary file: %m")));
+ 
+ 	written = BufFileWrite(file, (void *) tuple, tuple->t_len);
+ 	if (written != tuple->t_len)
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not write to HashAgg temporary file: %m")));
+ }
+ 
+ 
+ /*
+  * read_saved_tuple
+  *		read the next tuple from a batch file.  Return NULL if no more.
+  *
+  * On success, *hashvalue is set to the tuple's hash value, and the tuple
+  * itself is stored in the given slot.
+  *
+  * Copied with minor modifications from ExecHashJoinGetSavedTuple.
+  */
+ static TupleTableSlot *
+ read_saved_tuple(BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
+ {
+ 	uint32		header[2];
+ 	size_t		nread;
+ 	MinimalTuple tuple;
+ 
+ 	/*
+ 	 * Since both the hash value and the MinimalTuple length word are uint32,
+ 	 * we can read them both in one BufFileRead() call without any type
+ 	 * cheating.
+ 	 */
+ 	nread = BufFileRead(file, (void *) header, sizeof(header));
+ 	if (nread == 0)				/* end of file */
+ 	{
+ 		ExecClearTuple(tupleSlot);
+ 		return NULL;
+ 	}
+ 	if (nread != sizeof(header))
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not read from HashAgg temporary file: %m")));
+ 	*hashvalue = header[0];
+ 	tuple = (MinimalTuple) palloc(header[1]);
+ 	tuple->t_len = header[1];
+ 	nread = BufFileRead(file,
+ 						(void *) ((char *) tuple + sizeof(uint32)),
+ 						header[1] - sizeof(uint32));
+ 	if (nread != header[1] - sizeof(uint32))
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not read from HashAgg temporary file: %m")));
+ 	return ExecStoreMinimalTuple(tuple, tupleSlot, true);
+ }
+ 
+ 
  /*
   * ExecAgg -
   *
***************
*** 1107,1115 **** ExecAgg(AggState *node)
  	/* Dispatch based on strategy */
  	if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
  	{
! 		if (!node->table_filled)
! 			agg_fill_hash_table(node);
! 		return agg_retrieve_hash_table(node);
  	}
  	else
  		return agg_retrieve_direct(node);
--- 1281,1296 ----
  	/* Dispatch based on strategy */
  	if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
  	{
! 		TupleTableSlot *slot = NULL;
! 
! 		while (slot == NULL)
! 		{
! 			if (!node->table_filled)
! 				if (!agg_fill_hash_table(node))
! 					break;
! 			slot = agg_retrieve_hash_table(node);
! 		}
! 		return slot;
  	}
  	else
  		return agg_retrieve_direct(node);
***************
*** 1325,1337 **** agg_retrieve_direct(AggState *aggstate)
  /*
   * ExecAgg for hashed case: phase 1, read input and build hash table
   */
! static void
  agg_fill_hash_table(AggState *aggstate)
  {
  	PlanState  *outerPlan;
  	ExprContext *tmpcontext;
  	AggHashEntry entry;
  	TupleTableSlot *outerslot;
  
  	/*
  	 * get state info from node
--- 1506,1520 ----
  /*
   * ExecAgg for hashed case: phase 1, read input and build hash table
   */
! static bool
  agg_fill_hash_table(AggState *aggstate)
  {
  	PlanState  *outerPlan;
  	ExprContext *tmpcontext;
  	AggHashEntry entry;
  	TupleTableSlot *outerslot;
+ 	HashWork	*work;
+ 	int			 i;
  
  	/*
  	 * get state info from node
***************
*** 1340,1359 **** agg_fill_hash_table(AggState *aggstate)
  	/* tmpcontext is the per-input-tuple expression context */
  	tmpcontext = aggstate->tmpcontext;
  
  	/*
  	 * Process each outer-plan tuple, and then fetch the next one, until we
  	 * exhaust the outer plan.
  	 */
  	for (;;)
  	{
! 		outerslot = ExecProcNode(outerPlan);
! 		if (TupIsNull(outerslot))
! 			break;
  		/* set up for advance_aggregates call */
  		tmpcontext->ecxt_outertuple = outerslot;
  
  		/* Find or build hashtable entry for this tuple's group */
! 		entry = lookup_hash_entry(aggstate, outerslot);
  
  		/* Advance the aggregates */
  		advance_aggregates(aggstate, entry->pergroup);
--- 1523,1581 ----
  	/* tmpcontext is the per-input-tuple expression context */
  	tmpcontext = aggstate->tmpcontext;
  
+ 	if (aggstate->hash_work == NIL)
+ 	{
+ 		aggstate->agg_done = true;
+ 		return false;
+ 	}
+ 
+ 	work = linitial(aggstate->hash_work);
+ 	aggstate->hash_work = list_delete_first(aggstate->hash_work);
+ 
+ 	/* if not the first time through, reinitialize */
+ 	if (!aggstate->hash_init_state)
+ 	{
+ 		MemoryContextResetAndDeleteChildren(aggstate->hashcontext);
+ 		build_hash_table(aggstate);
+ 	}
+ 
+ 	aggstate->hash_init_state = false;
+ 
  	/*
  	 * Process each outer-plan tuple, and then fetch the next one, until we
  	 * exhaust the outer plan.
  	 */
  	for (;;)
  	{
! 		uint32 hashvalue;
! 
! 		CHECK_FOR_INTERRUPTS();
! 
! 		if (work->input_file == NULL)
! 		{
! 			outerslot = ExecProcNode(outerPlan);
! 			if (TupIsNull(outerslot))
! 				break;
! 
! 			hashvalue = TupleHashEntryHash(aggstate->hashtable, outerslot);
! 		}
! 		else
! 		{
! 			outerslot = read_saved_tuple(work->input_file, &hashvalue,
! 										 aggstate->hashslot);
! 			if (TupIsNull(outerslot))
! 			{
! 				BufFileClose(work->input_file);
! 				work->input_file = NULL;
! 				break;
! 			}
! 		}
! 
  		/* set up for advance_aggregates call */
  		tmpcontext->ecxt_outertuple = outerslot;
  
  		/* Find or build hashtable entry for this tuple's group */
! 		entry = lookup_hash_entry(aggstate, work, outerslot, hashvalue);
  
  		/* Advance the aggregates */
  		advance_aggregates(aggstate, entry->pergroup);
***************
*** 1362,1370 **** agg_fill_hash_table(AggState *aggstate)
--- 1584,1619 ----
  		ResetExprContext(tmpcontext);
  	}
  
+ 	/* add each output partition as a new work item */
+ 	for (i = 0; i < work->n_output_partitions; i++)
+ 	{
+ 		BufFile			*file = work->output_partitions[i];
+ 		MemoryContext	 oldContext;
+ 
+ 		/* partition is empty */
+ 		if (work->output_ntuples == 0)
+ 			continue;
+ 
+ 		/* rewind file for reading */
+ 		if (BufFileSeek(file, 0, 0L, SEEK_SET))
+ 			ereport(ERROR,
+ 					(errcode_for_file_access(),
+ 					 errmsg("could not rewind HashAgg temporary file: %m")));
+ 
+ 		oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
+ 		aggstate->hash_work = lappend(aggstate->hash_work,
+ 									  hash_work(file,
+ 												work->output_bits + work->input_bits));
+ 		MemoryContextSwitchTo(oldContext);
+ 	}
+ 
+ 	pfree(work);
+ 
  	aggstate->table_filled = true;
  	/* Initialize to walk the hash table */
  	ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter);
+ 
+ 	return true;
  }
  
  /*
***************
*** 1396,1411 **** agg_retrieve_hash_table(AggState *aggstate)
  	 * We loop retrieving groups until we find one satisfying
  	 * aggstate->ss.ps.qual
  	 */
! 	while (!aggstate->agg_done)
  	{
  		/*
  		 * Find the next entry in the hash table
  		 */
  		entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter);
  		if (entry == NULL)
  		{
! 			/* No more entries in hashtable, so done */
! 			aggstate->agg_done = TRUE;
  			return NULL;
  		}
  
--- 1645,1662 ----
  	 * We loop retrieving groups until we find one satisfying
  	 * aggstate->ss.ps.qual
  	 */
! 	for (;;)
  	{
+ 		CHECK_FOR_INTERRUPTS();
+ 
  		/*
  		 * Find the next entry in the hash table
  		 */
  		entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter);
  		if (entry == NULL)
  		{
! 			/* No more entries in hashtable, so done with this batch */
! 			aggstate->table_filled = false;
  			return NULL;
  		}
  
***************
*** 1636,1645 **** ExecInitAgg(Agg *node, EState *estate, int eflags)
--- 1887,1914 ----
  
  	if (node->aggstrategy == AGG_HASHED)
  	{
+ 		MemoryContext oldContext;
+ 
+ 		aggstate->hashcontext =
+ 			AllocSetContextCreateTracked(aggstate->aggcontext,
+ 										 "HashAgg Hash Table Context",
+ 										 ALLOCSET_DEFAULT_MINSIZE,
+ 										 ALLOCSET_DEFAULT_INITSIZE,
+ 										 ALLOCSET_DEFAULT_MAXSIZE,
+ 										 true);
+ 
  		build_hash_table(aggstate);
+ 		aggstate->hash_init_state = true;
  		aggstate->table_filled = false;
+ 		aggstate->hash_disk = false;
  		/* Compute the columns we actually need to hash on */
  		aggstate->hash_needed = find_hash_columns(aggstate);
+ 
+ 		/* prime with initial work item to read from outer plan */
+ 		oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
+ 		aggstate->hash_work = lappend(aggstate->hash_work,
+ 									  hash_work(NULL, 0));
+ 		MemoryContextSwitchTo(oldContext);
  	}
  	else
  	{
***************
*** 2058,2079 **** ExecReScanAgg(AggState *node)
  	if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
  	{
  		/*
! 		 * In the hashed case, if we haven't yet built the hash table then we
! 		 * can just return; nothing done yet, so nothing to undo. If subnode's
! 		 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
! 		 * else no reason to re-scan it at all.
  		 */
! 		if (!node->table_filled)
  			return;
  
  		/*
! 		 * If we do have the hash table and the subplan does not have any
! 		 * parameter changes, then we can just rescan the existing hash table;
! 		 * no need to build it again.
  		 */
! 		if (node->ss.ps.lefttree->chgParam == NULL)
  		{
  			ResetTupleHashIterator(node->hashtable, &node->hashiter);
  			return;
  		}
  	}
--- 2327,2349 ----
  	if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
  	{
  		/*
! 		 * In the hashed case, if we haven't done any execution work yet, we
! 		 * can just return; nothing to undo. If subnode's chgParam is not NULL
! 		 * then it will be re-scanned by ExecProcNode, else no reason to
! 		 * re-scan it at all.
  		 */
! 		if (node->hash_init_state)
  			return;
  
  		/*
! 		 * If we do have the hash table, it never went to disk, and the
! 		 * subplan does not have any parameter changes, then we can just
! 		 * rescan the existing hash table; no need to build it again.
  		 */
! 		if (node->ss.ps.lefttree->chgParam == NULL && !node->hash_disk)
  		{
  			ResetTupleHashIterator(node->hashtable, &node->hashiter);
+ 			node->table_filled = true;
  			return;
  		}
  	}
***************
*** 2112,2120 **** ExecReScanAgg(AggState *node)
--- 2382,2409 ----
  
  	if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
  	{
+ 		MemoryContext oldContext;
+ 
+ 		node->hashcontext =
+ 			AllocSetContextCreateTracked(node->aggcontext,
+ 										 "HashAgg Hash Table Context",
+ 										 ALLOCSET_DEFAULT_MINSIZE,
+ 										 ALLOCSET_DEFAULT_INITSIZE,
+ 										 ALLOCSET_DEFAULT_MAXSIZE,
+ 										 true);
+ 
  		/* Rebuild an empty hash table */
  		build_hash_table(node);
+ 		node->hash_init_state = true;
  		node->table_filled = false;
+ 		node->hash_disk = false;
+ 		node->hash_work = NIL;
+ 
+ 		/* prime with initial work item to read from outer plan */
+ 		oldContext = MemoryContextSwitchTo(node->aggcontext);
+ 		node->hash_work = lappend(node->hash_work,
+ 								  hash_work(NULL, 0));
+ 		MemoryContextSwitchTo(oldContext);
  	}
  	else
  	{
*** a/src/backend/optimizer/path/costsize.c
--- b/src/backend/optimizer/path/costsize.c
***************
*** 113,118 **** bool		enable_bitmapscan = true;
--- 113,119 ----
  bool		enable_tidscan = true;
  bool		enable_sort = true;
  bool		enable_hashagg = true;
+ bool		enable_hashagg_disk = true;
  bool		enable_nestloop = true;
  bool		enable_material = true;
  bool		enable_mergejoin = true;
*** a/src/backend/optimizer/plan/planner.c
--- b/src/backend/optimizer/plan/planner.c
***************
*** 2741,2747 **** choose_hashed_grouping(PlannerInfo *root,
  	/* plus the per-hash-entry overhead */
  	hashentrysize += hash_agg_entry_size(agg_costs->numAggs);
  
! 	if (hashentrysize * dNumGroups > work_mem * 1024L)
  		return false;
  
  	/*
--- 2741,2748 ----
  	/* plus the per-hash-entry overhead */
  	hashentrysize += hash_agg_entry_size(agg_costs->numAggs);
  
! 	if (!enable_hashagg_disk &&
! 		hashentrysize * dNumGroups > work_mem * 1024L)
  		return false;
  
  	/*
***************
*** 2907,2913 **** choose_hashed_distinct(PlannerInfo *root,
  	/* plus the per-hash-entry overhead */
  	hashentrysize += hash_agg_entry_size(0);
  
! 	if (hashentrysize * dNumDistinctRows > work_mem * 1024L)
  		return false;
  
  	/*
--- 2908,2915 ----
  	/* plus the per-hash-entry overhead */
  	hashentrysize += hash_agg_entry_size(0);
  
! 	if (!enable_hashagg_disk &&
! 		hashentrysize * dNumDistinctRows > work_mem * 1024L)
  		return false;
  
  	/*
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 752,757 **** static struct config_bool ConfigureNamesBool[] =
--- 752,766 ----
  		NULL, NULL, NULL
  	},
  	{
+ 		{"enable_hashagg_disk", PGC_USERSET, QUERY_TUNING_METHOD,
+ 			gettext_noop("Enables the planner's use of disk-based hashed aggregation plans."),
+ 			NULL
+ 		},
+ 		&enable_hashagg_disk,
+ 		true,
+ 		NULL, NULL, NULL
+ 	},
+ 	{
  		{"enable_material", PGC_USERSET, QUERY_TUNING_METHOD,
  			gettext_noop("Enables the planner's use of materialization."),
  			NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 266,271 ****
--- 266,272 ----
  
  #enable_bitmapscan = on
  #enable_hashagg = on
+ #enable_hashagg_disk = on
  #enable_hashjoin = on
  #enable_indexscan = on
  #enable_indexonlyscan = on
*** a/src/backend/utils/mmgr/aset.c
--- b/src/backend/utils/mmgr/aset.c
***************
*** 242,247 **** typedef struct AllocChunkData
--- 242,249 ----
  #define AllocChunkGetPointer(chk)	\
  					((AllocPointer)(((char *)(chk)) + ALLOC_CHUNKHDRSZ))
  
+ static void update_allocation(MemoryContext context, int64 size);
+ 
  /*
   * These functions implement the MemoryContext API for AllocSet contexts.
   */
***************
*** 250,256 **** static void AllocSetFree(MemoryContext context, void *pointer);
  static void *AllocSetRealloc(MemoryContext context, void *pointer, Size size);
  static void AllocSetInit(MemoryContext context);
  static void AllocSetReset(MemoryContext context);
! static void AllocSetDelete(MemoryContext context);
  static Size AllocSetGetChunkSpace(MemoryContext context, void *pointer);
  static bool AllocSetIsEmpty(MemoryContext context);
  static void AllocSetStats(MemoryContext context, int level);
--- 252,258 ----
  static void *AllocSetRealloc(MemoryContext context, void *pointer, Size size);
  static void AllocSetInit(MemoryContext context);
  static void AllocSetReset(MemoryContext context);
! static void AllocSetDelete(MemoryContext context, MemoryContext parent);
  static Size AllocSetGetChunkSpace(MemoryContext context, void *pointer);
  static bool AllocSetIsEmpty(MemoryContext context);
  static void AllocSetStats(MemoryContext context, int level);
***************
*** 430,435 **** randomize_mem(char *ptr, size_t size)
--- 432,440 ----
   * minContextSize: minimum context size
   * initBlockSize: initial allocation block size
   * maxBlockSize: maximum allocation block size
+  *
+  * The flag determining whether this context tracks memory usage is inherited
+  * from the parent context.
   */
  MemoryContext
  AllocSetContextCreate(MemoryContext parent,
***************
*** 438,443 **** AllocSetContextCreate(MemoryContext parent,
--- 443,468 ----
  					  Size initBlockSize,
  					  Size maxBlockSize)
  {
+ 	return AllocSetContextCreateTracked(
+ 		parent, name, minContextSize, initBlockSize, maxBlockSize,
+ 		(parent == NULL) ? false : parent->track_mem);
+ }
+ 
+ /*
+  * AllocSetContextCreateTracked
+  *		Create a new AllocSet context.
+  *
+  * Implementation for AllocSetContextCreate, but also allows the caller to
+  * specify whether memory usage should be tracked or not.
+  */
+ MemoryContext
+ AllocSetContextCreateTracked(MemoryContext parent,
+ 							 const char *name,
+ 							 Size minContextSize,
+ 							 Size initBlockSize,
+ 							 Size maxBlockSize,
+ 							 bool track_mem)
+ {
  	AllocSet	context;
  
  	/* Do the type-independent part of context creation */
***************
*** 445,451 **** AllocSetContextCreate(MemoryContext parent,
  											 sizeof(AllocSetContext),
  											 &AllocSetMethods,
  											 parent,
! 											 name);
  
  	/*
  	 * Make sure alloc parameters are reasonable, and save them.
--- 470,477 ----
  											 sizeof(AllocSetContext),
  											 &AllocSetMethods,
  											 parent,
! 											 name,
! 											 track_mem);
  
  	/*
  	 * Make sure alloc parameters are reasonable, and save them.
***************
*** 500,505 **** AllocSetContextCreate(MemoryContext parent,
--- 526,534 ----
  					 errdetail("Failed while creating memory context \"%s\".",
  							   name)));
  		}
+ 
+ 		update_allocation((MemoryContext) context, blksize);
+ 
  		block->aset = context;
  		block->freeptr = ((char *) block) + ALLOC_BLOCKHDRSZ;
  		block->endptr = ((char *) block) + blksize;
***************
*** 590,595 **** AllocSetReset(MemoryContext context)
--- 619,625 ----
  		else
  		{
  			/* Normal case, release the block */
+ 			update_allocation(context, -(block->endptr - ((char*) block)));
  #ifdef CLOBBER_FREED_MEMORY
  			wipe_mem(block, block->freeptr - ((char *) block));
  #endif
***************
*** 611,617 **** AllocSetReset(MemoryContext context)
   * But note we are not responsible for deleting the context node itself.
   */
  static void
! AllocSetDelete(MemoryContext context)
  {
  	AllocSet	set = (AllocSet) context;
  	AllocBlock	block = set->blocks;
--- 641,647 ----
   * But note we are not responsible for deleting the context node itself.
   */
  static void
! AllocSetDelete(MemoryContext context, MemoryContext parent)
  {
  	AllocSet	set = (AllocSet) context;
  	AllocBlock	block = set->blocks;
***************
*** 623,628 **** AllocSetDelete(MemoryContext context)
--- 653,668 ----
  	AllocSetCheck(context);
  #endif
  
+ 	/*
+ 	 * Parent is already unlinked from context, so can't use
+ 	 * update_allocation().
+ 	 */
+ 	while (parent != NULL)
+ 	{
+ 		parent->total_allocated -= context->total_allocated;
+ 		parent = parent->parent;
+ 	}
+ 
  	/* Make it look empty, just in case... */
  	MemSetAligned(set->freelist, 0, sizeof(set->freelist));
  	set->blocks = NULL;
***************
*** 678,683 **** AllocSetAlloc(MemoryContext context, Size size)
--- 718,726 ----
  					 errmsg("out of memory"),
  					 errdetail("Failed on request of size %zu.", size)));
  		}
+ 
+ 		update_allocation(context, blksize);
+ 
  		block->aset = set;
  		block->freeptr = block->endptr = ((char *) block) + blksize;
  
***************
*** 873,878 **** AllocSetAlloc(MemoryContext context, Size size)
--- 916,923 ----
  					 errdetail("Failed on request of size %zu.", size)));
  		}
  
+ 		update_allocation(context, blksize);
+ 
  		block->aset = set;
  		block->freeptr = ((char *) block) + ALLOC_BLOCKHDRSZ;
  		block->endptr = ((char *) block) + blksize;
***************
*** 976,981 **** AllocSetFree(MemoryContext context, void *pointer)
--- 1021,1027 ----
  			set->blocks = block->next;
  		else
  			prevblock->next = block->next;
+ 		update_allocation(context, -(block->endptr - ((char*) block)));
  #ifdef CLOBBER_FREED_MEMORY
  		wipe_mem(block, block->freeptr - ((char *) block));
  #endif
***************
*** 1088,1093 **** AllocSetRealloc(MemoryContext context, void *pointer, Size size)
--- 1134,1140 ----
  		AllocBlock	prevblock = NULL;
  		Size		chksize;
  		Size		blksize;
+ 		Size		oldblksize;
  
  		while (block != NULL)
  		{
***************
*** 1105,1110 **** AllocSetRealloc(MemoryContext context, void *pointer, Size size)
--- 1152,1159 ----
  		/* Do the realloc */
  		chksize = MAXALIGN(size);
  		blksize = chksize + ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ;
+ 		oldblksize = block->endptr - ((char *)block);
+ 
  		block = (AllocBlock) realloc(block, blksize);
  		if (block == NULL)
  		{
***************
*** 1114,1119 **** AllocSetRealloc(MemoryContext context, void *pointer, Size size)
--- 1163,1169 ----
  					 errmsg("out of memory"),
  					 errdetail("Failed on request of size %zu.", size)));
  		}
+ 		update_allocation(context, blksize - oldblksize);
  		block->freeptr = block->endptr = ((char *) block) + blksize;
  
  		/* Update pointers since block has likely been moved */
***************
*** 1277,1282 **** AllocSetStats(MemoryContext context, int level)
--- 1327,1359 ----
  }
  
  
+ /*
+  * update_allocation
+  *
+  * Track newly-allocated or newly-freed memory (freed memory should be
+  * negative).
+  */
+ static void
+ update_allocation(MemoryContext context, int64 size)
+ {
+ 	MemoryContext parent;
+ 
+ 	if (!context->track_mem)
+ 		return;
+ 
+ 	context->self_allocated += size;
+ 
+ 	for (parent = context; parent != NULL; parent = parent->parent)
+ 	{
+ 		if (!parent->track_mem)
+ 			break;
+ 
+ 		parent->total_allocated += size;
+ 		Assert(parent->self_allocated >= 0);
+ 		Assert(parent->total_allocated >= 0);
+ 	}
+ }
+ 
  #ifdef MEMORY_CONTEXT_CHECKING
  
  /*
*** a/src/backend/utils/mmgr/mcxt.c
--- b/src/backend/utils/mmgr/mcxt.c
***************
*** 187,192 **** MemoryContextResetChildren(MemoryContext context)
--- 187,194 ----
  void
  MemoryContextDelete(MemoryContext context)
  {
+ 	MemoryContext parent = context->parent;
+ 
  	AssertArg(MemoryContextIsValid(context));
  	/* We had better not be deleting TopMemoryContext ... */
  	Assert(context != TopMemoryContext);
***************
*** 202,208 **** MemoryContextDelete(MemoryContext context)
  	 */
  	MemoryContextSetParent(context, NULL);
  
! 	(*context->methods->delete_context) (context);
  	VALGRIND_DESTROY_MEMPOOL(context);
  	pfree(context);
  }
--- 204,211 ----
  	 */
  	MemoryContextSetParent(context, NULL);
  
! 	/* pass the parent in case it's needed, however */
! 	(*context->methods->delete_context) (context, parent);
  	VALGRIND_DESTROY_MEMPOOL(context);
  	pfree(context);
  }
***************
*** 324,329 **** MemoryContextAllowInCriticalSection(MemoryContext context, bool allow)
--- 327,349 ----
  }
  
  /*
+  * MemoryContextGetAllocated
+  *
+  * Return memory allocated by the system to this context. If total is true,
+  * include child contexts. Context must have track_mem set.
+  */
+ int64
+ MemoryContextGetAllocated(MemoryContext context, bool total)
+ {
+ 	Assert(context->track_mem);
+ 
+ 	if (total)
+ 		return context->total_allocated;
+ 	else
+ 		return context->self_allocated;
+ }
+ 
+ /*
   * GetMemoryChunkSpace
   *		Given a currently-allocated chunk, determine the total space
   *		it occupies (including all memory-allocation overhead).
***************
*** 546,552 **** MemoryContext
  MemoryContextCreate(NodeTag tag, Size size,
  					MemoryContextMethods *methods,
  					MemoryContext parent,
! 					const char *name)
  {
  	MemoryContext node;
  	Size		needed = size + strlen(name) + 1;
--- 566,573 ----
  MemoryContextCreate(NodeTag tag, Size size,
  					MemoryContextMethods *methods,
  					MemoryContext parent,
! 					const char *name,
! 					bool track_mem)
  {
  	MemoryContext node;
  	Size		needed = size + strlen(name) + 1;
***************
*** 576,581 **** MemoryContextCreate(NodeTag tag, Size size,
--- 597,605 ----
  	node->firstchild = NULL;
  	node->nextchild = NULL;
  	node->isReset = true;
+ 	node->track_mem = track_mem;
+ 	node->total_allocated = 0;
+ 	node->self_allocated = 0;
  	node->name = ((char *) node) + size;
  	strcpy(node->name, name);
  
*** a/src/include/executor/executor.h
--- b/src/include/executor/executor.h
***************
*** 144,149 **** extern TupleHashTable BuildTupleHashTable(int numCols, AttrNumber *keyColIdx,
--- 144,155 ----
  extern TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable,
  					 TupleTableSlot *slot,
  					 bool *isnew);
+ extern uint32 TupleHashEntryHash(TupleHashTable hashtable,
+ 					 TupleTableSlot *slot);
+ extern TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable,
+ 					 TupleTableSlot *slot,
+ 					 uint32 hashvalue,
+ 					 bool *isnew);
  extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable,
  				   TupleTableSlot *slot,
  				   FmgrInfo *eqfunctions,
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 1718,1728 **** typedef struct AggState
--- 1718,1733 ----
  	AggStatePerGroup pergroup;	/* per-Aggref-per-group working state */
  	HeapTuple	grp_firstTuple; /* copy of first tuple of current group */
  	/* these fields are used in AGG_HASHED mode: */
+ 	MemoryContext hashcontext;	/* subcontext to use for hash table */
  	TupleHashTable hashtable;	/* hash table with one entry per group */
  	TupleTableSlot *hashslot;	/* slot for loading hash table */
  	List	   *hash_needed;	/* list of columns needed in hash table */
+ 	bool		hash_init_state; /* in initial state before execution? */
  	bool		table_filled;	/* hash table filled yet? */
+ 	bool		hash_disk;		/* have we exceeded memory yet? */
+ 	int64		hash_mem_min;	/* memory used by empty hash table */
  	TupleHashIterator hashiter; /* for iterating through hash table */
+ 	List	   *hash_work;		/* remaining work to be done */
  } AggState;
  
  /* ----------------
*** a/src/include/nodes/memnodes.h
--- b/src/include/nodes/memnodes.h
***************
*** 41,47 **** typedef struct MemoryContextMethods
  	void	   *(*realloc) (MemoryContext context, void *pointer, Size size);
  	void		(*init) (MemoryContext context);
  	void		(*reset) (MemoryContext context);
! 	void		(*delete_context) (MemoryContext context);
  	Size		(*get_chunk_space) (MemoryContext context, void *pointer);
  	bool		(*is_empty) (MemoryContext context);
  	void		(*stats) (MemoryContext context, int level);
--- 41,48 ----
  	void	   *(*realloc) (MemoryContext context, void *pointer, Size size);
  	void		(*init) (MemoryContext context);
  	void		(*reset) (MemoryContext context);
! 	void		(*delete_context) (MemoryContext context,
! 								   MemoryContext parent);
  	Size		(*get_chunk_space) (MemoryContext context, void *pointer);
  	bool		(*is_empty) (MemoryContext context);
  	void		(*stats) (MemoryContext context, int level);
***************
*** 60,65 **** typedef struct MemoryContextData
--- 61,69 ----
  	MemoryContext nextchild;	/* next child of same parent */
  	char	   *name;			/* context name (just for debugging) */
  	bool		isReset;		/* T = no space alloced since last reset */
+ 	bool		track_mem;		/* whether to track memory usage */
+ 	int64		total_allocated; /* including child contexts */
+ 	int64		self_allocated; /* not including child contexts */
  #ifdef USE_ASSERT_CHECKING
  	bool		allowInCritSection;	/* allow palloc in critical section */
  #endif
*** a/src/include/optimizer/cost.h
--- b/src/include/optimizer/cost.h
***************
*** 57,62 **** extern bool enable_bitmapscan;
--- 57,63 ----
  extern bool enable_tidscan;
  extern bool enable_sort;
  extern bool enable_hashagg;
+ extern bool enable_hashagg_disk;
  extern bool enable_nestloop;
  extern bool enable_material;
  extern bool enable_mergejoin;
*** a/src/include/utils/memutils.h
--- b/src/include/utils/memutils.h
***************
*** 96,101 **** extern void MemoryContextDeleteChildren(MemoryContext context);
--- 96,102 ----
  extern void MemoryContextResetAndDeleteChildren(MemoryContext context);
  extern void MemoryContextSetParent(MemoryContext context,
  					   MemoryContext new_parent);
+ extern int64 MemoryContextGetAllocated(MemoryContext context, bool total);
  extern Size GetMemoryChunkSpace(void *pointer);
  extern MemoryContext GetMemoryChunkContext(void *pointer);
  extern MemoryContext MemoryContextGetParent(MemoryContext context);
***************
*** 117,123 **** extern bool MemoryContextContains(MemoryContext context, void *pointer);
  extern MemoryContext MemoryContextCreate(NodeTag tag, Size size,
  					MemoryContextMethods *methods,
  					MemoryContext parent,
! 					const char *name);
  
  
  /*
--- 118,125 ----
  extern MemoryContext MemoryContextCreate(NodeTag tag, Size size,
  					MemoryContextMethods *methods,
  					MemoryContext parent,
! 					const char *name,
! 					bool track_mem);
  
  
  /*
***************
*** 130,135 **** extern MemoryContext AllocSetContextCreate(MemoryContext parent,
--- 132,143 ----
  					  Size minContextSize,
  					  Size initBlockSize,
  					  Size maxBlockSize);
+ extern MemoryContext AllocSetContextCreateTracked(MemoryContext parent,
+ 					  const char *name,
+ 					  Size minContextSize,
+ 					  Size initBlockSize,
+ 					  Size maxBlockSize,
+ 					  bool track_mem);
  
  /*
   * Recommended default alloc parameters, suitable for "ordinary" contexts
*** a/src/test/regress/expected/rangefuncs.out
--- b/src/test/regress/expected/rangefuncs.out
***************
*** 3,8 **** SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
--- 3,9 ----
  ----------------------+---------
   enable_bitmapscan    | on
   enable_hashagg       | on
+  enable_hashagg_disk  | on
   enable_hashjoin      | on
   enable_indexonlyscan | on
   enable_indexscan     | on
***************
*** 12,18 **** SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
   enable_seqscan       | on
   enable_sort          | on
   enable_tidscan       | on
! (11 rows)
  
  CREATE TABLE foo2(fooid int, f2 int);
  INSERT INTO foo2 VALUES(1, 11);
--- 13,19 ----
   enable_seqscan       | on
   enable_sort          | on
   enable_tidscan       | on
! (12 rows)
  
  CREATE TABLE foo2(fooid int, f2 int);
  INSERT INTO foo2 VALUES(1, 11);
