From c0afa4986b246fd7b10bd9432caa100bce592760 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 25 Apr 2018 15:40:14 +0300
Subject: [PATCH 2/2] Allow SetOps to spill.

---
 src/backend/executor/Makefile        |   2 +-
 src/backend/executor/execGrouping.c  |  37 +++++
 src/backend/executor/execHashSpill.c | 206 +++++++++++++++++++++++++++
 src/backend/executor/nodeSetOp.c     | 263 ++++++++++++++++++++++++++++++++++-
 src/include/executor/executor.h      |  11 ++
 src/include/lib/simplehash.h         |  81 ++++++-----
 src/include/nodes/execnodes.h        |  18 ++-
 7 files changed, 576 insertions(+), 42 deletions(-)
 create mode 100644 src/backend/executor/execHashSpill.c

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895fa5..12593dea81 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
-       execGrouping.o execIndexing.o execJunk.o \
+       execGrouping.o execHashSpill.o execIndexing.o execJunk.o \
        execMain.o execParallel.o execPartition.o execProcnode.o \
        execReplication.o execScan.o execSRF.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index c4d0e04058..e02ab8df87 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -180,6 +180,7 @@ BuildTupleHashTable(PlanState *parent,
 	hashtable->inputslot = NULL;
 	hashtable->in_hash_funcs = NULL;
 	hashtable->cur_eq_func = NULL;
+	hashtable->usedMem = sizeof(TupleHashTableData);
 
 	/*
 	 * If parallelism is in use, even if the master backend is performing the
@@ -195,6 +196,7 @@ BuildTupleHashTable(PlanState *parent,
 		hashtable->hash_iv = 0;
 
 	hashtable->hashtab = tuplehash_create(tablecxt, nbuckets, hashtable);
+	hashtable->usedMem += nbuckets * sizeof(TupleHashEntryData);
 
 	oldcontext = MemoryContextSwitchTo(hashtable->tablecxt);
 
@@ -266,6 +268,7 @@ LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
 			MemoryContextSwitchTo(hashtable->tablecxt);
 			/* Copy the first tuple into the table context */
 			entry->firstTuple = ExecCopySlotMinimalTuple(slot);
+			hashtable->usedMem += GetMemoryChunkSpace(entry->firstTuple);
 		}
 	}
 	else
@@ -313,6 +316,40 @@ FindTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
 }
 
 /*
+ * Choose a victim tuple to remove from hash table, and return it.
+ *
+ * The caller should remove the entry from the hash table, pfree(firstTuple),
+ * and 'additional'.
+ */
+TupleHashEntryData *
+SpillTupleHashTable(TupleHashTable hashtable)
+{
+	TupleHashEntry entry;
+
+	if (hashtable->hashtab->members == 0)
+	{
+		/* hash table is empty. Nothing to spill */
+		return NULL;
+	}
+
+	if (!hashtable->spill_iter_inited)
+	{
+		tuplehash_start_iterate(hashtable->hashtab, &hashtable->spill_iter);
+		hashtable->spill_iter_inited = true;
+	}
+
+	entry = tuplehash_iterate(hashtable->hashtab, &hashtable->spill_iter);
+	if (!entry)
+	{
+		/* reached end, restart */
+		tuplehash_start_iterate(hashtable->hashtab, &hashtable->spill_iter);
+		entry = tuplehash_iterate(hashtable->hashtab, &hashtable->spill_iter);
+	}
+
+	return entry;
+}
+
+/*
  * Compute the hash value for a tuple
  *
  * The passed-in key is a pointer to TupleHashEntryData.  In an actual hash
diff --git a/src/backend/executor/execHashSpill.c b/src/backend/executor/execHashSpill.c
new file mode 100644
index 0000000000..88e1ccec94
--- /dev/null
+++ b/src/backend/executor/execHashSpill.c
@@ -0,0 +1,206 @@
+/*
+ *
+ *
+ * TODO:
+ *
+ * - need a smarter strategy to choose spill victim. Must avoid infinite looping,
+ *   where the same tuples get spilled again and again.
+ */
+
+#include "postgres.h"
+
+#include "executor/executor.h"
+#include "utils/hashutils.h"
+#include "utils/memutils.h"
+#include "storage/buffile.h"
+
+/*
+ * Spill Set
+ *
+ */
+
+#define FANOUT_SHIFT 4
+#define FANOUT 16
+#define LEVELS 8		/* 32 bits, 4 bits per level */
+#define HASH_HIGH_MASK 0xf0000000
+
+typedef struct SubSpillSet SubSpillSet;
+
+/*
+ * Spill files form a radix tree, based on the hash key. Whenever a sub-spillset
+ * grows too large, it is split. The entries that had already been written to
+ * the file for that sub-spillset are kept in the old file, but any new entries
+ * are written to the child nodes instead.
+ *
+ * XXX: I think that doesn't do the right thing with ordered aggregates, where
+ * we have to take care to feed the input rows to the aggregate in the input
+ * order.
+ */
+struct SubSpillSet
+{
+	BufFile	   *file;
+	int			num_children;
+
+	SubSpillSet   *children[FANOUT];
+};
+
+struct HashSpillSet
+{
+	SubSpillSet root;
+	uint64		max_size;
+};
+
+/*
+ * When processing input:
+ *
+ * 1. If has table is full, choose victim.
+ * 2. Call GetSpillFile() on the victim
+ * 3. Dump the entry to the file returned by GetSpillFile .
+ * 4. Remove entry from hash table.
+ */
+
+/*
+ * To finalize:
+ *
+ * 1. Dump remaining entries from in-memory hash table, where !firstbatch
+ * 2. Return remaining entries from in-memory hash table (with firstbatch==true)
+ *
+ * 3. Call ReadNextSpillFile(). Read entries from the file until it's empty.
+ * 4. Load each entry to in-memory hash table.
+ *
+ */
+
+HashSpillSet *
+CreateHashSpillSet(int64 target_file_size)
+{
+	HashSpillSet *sp;
+
+	sp = palloc0(sizeof(HashSpillSet));
+	sp->max_size = target_file_size;
+
+	return sp;
+}
+
+static SubSpillSet *
+CreateSubSpillSet(void)
+{
+	SubSpillSet *subsp;
+
+	subsp = palloc0(sizeof(SubSpillSet));
+	subsp->file = BufFileCreateTemp(false);
+	subsp->num_children = 0;
+
+	return subsp;
+}
+
+BufFile *
+GetSpillFile(HashSpillSet *sp, uint32 hash)
+{
+	uint32		level;
+	SubSpillSet *subsp;
+	uint32		hash_shifted;
+
+	level = 0;
+	subsp = &sp->root;
+	hash_shifted = hash;
+	for (;;)
+	{
+		uint32		childno;
+
+		childno = (hash_shifted & HASH_HIGH_MASK) >> (32 - FANOUT_SHIFT);
+
+		if (!subsp->children[childno])
+		{
+			subsp->num_children++;
+			subsp = subsp->children[childno] = CreateSubSpillSet();
+			break;
+		}
+
+		subsp = subsp->children[childno];
+		if (subsp->num_children == 0)
+		{
+			/*
+			 * We found the correct sub-spillset that this tuple belongs to.
+			 *
+			 * But is it too full? If so, continue to recurse into it, to create
+			 * a new sub-spillsets at lower level.
+			 */
+			if (level == LEVELS - 1 || BufFileSize(subsp->file) < sp->max_size)
+				break;
+		}
+		Assert(level < LEVELS - 1);
+
+		level++;
+		hash_shifted <<= FANOUT_SHIFT;
+	}
+
+	return subsp->file;
+}
+
+static BufFile *
+OpenNextSpillFileRecurse(HashSpillSet *sp, SubSpillSet *thissp, bool *respill)
+{
+	BufFile	   *file = NULL;
+	int			i;
+
+	if (thissp->file)
+	{
+		file = thissp->file;
+		thissp->file = NULL;
+		*respill = (thissp->num_children > 0);
+		return file;
+	}
+
+	for (i = 0; i < FANOUT; i++)
+	{
+		SubSpillSet *subsp;
+
+		if (thissp->children[i])
+		{
+			subsp = thissp->children[i];
+
+			file = OpenNextSpillFileRecurse(sp, subsp, respill);
+
+			if (!subsp->file && subsp->num_children == 0)
+			{
+				/* This leaf entry is no longer needed. Prune it. */
+				pfree(thissp->children[i]);
+				thissp->children[i] = NULL;
+				thissp->num_children--;
+			}
+			return file;
+		}
+	}
+	return NULL;
+}
+
+/*
+ * Open the next spill file to process.
+ *
+ * The spill files form a tree. This returns the spill files in an order,
+ * so that a parent is always returned before its children.
+ */
+BufFile *
+OpenNextSpillFile(HashSpillSet *sp, bool *respill)
+{
+	/* Scan the radix tree for next batch. */
+	BufFile *file;
+	file = OpenNextSpillFileRecurse(sp, &sp->root, respill);
+	if (file)
+	{
+		BufFileSeek(file, 0, 0, SEEK_SET);
+		/* XXX check error */
+	}
+	return file;
+}
+void
+CloseHashSpillSet(HashSpillSet *sp)
+{
+	BufFile	   *file;
+	bool		respill;
+
+	while((file = OpenNextSpillFile(sp, &respill)) != NULL)
+		BufFileClose(file);
+
+	pfree(sp);
+}
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 8dd017b2ef..4f241d6104 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -22,6 +22,7 @@
  * scan the hashtable and generate the correct output using those counts.
  * We can avoid making hashtable entries for any tuples appearing only in the
  * second input relation, since they cannot result in any output.
+ * (XXX: except that we do, if the hash table has been spilled)
  *
  * This node type is not used for UNION or UNION ALL, since those can be
  * implemented more cheaply (there's no need for the junk attribute to
@@ -82,6 +83,7 @@ static TupleTableSlot *setop_retrieve_direct(SetOpState *setopstate);
 static void setop_fill_hash_table(SetOpState *setopstate);
 static TupleTableSlot *setop_retrieve_hash_table(SetOpState *setopstate);
 
+static void setop_spill_entry(SetOpState *setopstate, TupleHashEntry entry, bool in_hashtab);
 
 /*
  * Initialize state for a new group of input values.
@@ -134,6 +136,13 @@ alloc_pergroup(SetOpState *setopstate)
 	return pergroup;
 }
 
+static void
+free_pergroup(SetOpState *setopstate, SetOpStatePerGroup pergroup)
+{
+	pergroup->next = setopstate->free_pergroups;
+	setopstate->free_pergroups = pergroup;
+}
+
 /*
  * Fetch the "flag" column from an input tuple.
  * This is an integer column with value 0 for left side, 1 for right side.
@@ -177,6 +186,7 @@ build_hash_table(SetOpState *setopstate)
 												setopstate->tableContext,
 												econtext->ecxt_per_tuple_memory,
 												false);
+	setopstate->hashtable->allowedMem = work_mem * 1024L;
 }
 
 /*
@@ -371,6 +381,188 @@ setop_retrieve_direct(SetOpState *setopstate)
 	return NULL;
 }
 
+static void
+BufFileWriteNoError(BufFile *file, void *ptr, size_t size)
+{
+	if (BufFileWrite(file, ptr, size) != size)
+		elog(ERROR, "io error"); /* XXX ereport */
+}
+
+/* Callback to dump the 'additional' data */
+static void
+setop_spill_entries(SetOpState *setopstate)
+{
+	if (setopstate->hashtable->usedMem < setopstate->hashtable->allowedMem)
+		return;
+
+	if (!setopstate->spillset)
+	{
+		setopstate->spillset = CreateHashSpillSet(setopstate->hashtable->allowedMem / 2);
+
+		setopstate->spillslot = MakeSingleTupleTableSlot(CreateTupleDescCopy(ExecGetResultType(outerPlanState(setopstate))));
+	}
+
+	while (setopstate->hashtable->usedMem > setopstate->hashtable->allowedMem &&
+		setopstate->hashtable->hashtab->members > 1)
+	{
+		TupleHashEntry entry;
+#if 0
+		elog(NOTICE, "spilling: used %ld allowed %ld tuples %d",
+			 setopstate->hashtable->usedMem, setopstate->hashtable->allowedMem,
+			setopstate->hashtable->hashtab->members);
+#endif
+		entry = SpillTupleHashTable(setopstate->hashtable);
+		if (!entry)
+			break;
+
+		setop_spill_entry(setopstate, entry, true);
+	}
+}
+
+static void
+setop_spill_entry(SetOpState *setopstate, TupleHashEntry entry, bool in_hashtab)
+{
+	BufFile	   *file;
+	MinimalTuple tup;
+
+	file = GetSpillFile(setopstate->spillset, entry->hash);
+
+	BufFileWriteNoError(file, &entry->hash, sizeof(uint32));
+	BufFileWriteNoError(file, &entry->firstTuple->t_len, sizeof(uint32));
+	BufFileWriteNoError(file, entry->firstTuple, entry->firstTuple->t_len);
+	BufFileWriteNoError(file, entry->additional, sizeof(SetOpStatePerGroupData));
+
+	if (in_hashtab)
+	{
+		/* Free the entry. */
+		setopstate->hashtable->usedMem -= sizeof(SetOpStatePerGroupData);
+		setopstate->hashtable->usedMem -= GetMemoryChunkSpace(entry->firstTuple);
+		free_pergroup(setopstate, entry->additional);
+		tup = entry->firstTuple;
+		tuplehash_delete_elem(setopstate->hashtable->hashtab, entry, entry->hash);
+		pfree(tup);
+
+		if (!setopstate->spilled)
+		{
+			setopstate->spilled = true;
+
+			elog(NOTICE, "spilling: used %ld allowed %ld tuples %d",
+				 setopstate->hashtable->usedMem, setopstate->hashtable->allowedMem,
+				 setopstate->hashtable->hashtab->members);
+		}
+	}
+}
+
+static void
+BufFileReadExact(BufFile *file, void *ptr, size_t size)
+{
+	if (BufFileRead(file, ptr, size) != size)
+		elog(ERROR, "error reading file");
+}
+
+/*
+ * Reload next batch into memory.
+ *
+ * May cause further spilling. Returns false if there were no more batches.
+ */
+static bool
+setop_reload_batch(SetOpState *setopstate)
+{
+	uint32		loaded_hash;
+	uint32		tuplen;
+	MinimalTuple loaded_firstTuple;
+	SetOpStatePerGroup pergroup;
+	SetOpStatePerGroupData loaded_pergroup;
+	int			readlen;
+	BufFile	   *file;
+	TupleHashEntry entry;
+	bool		isnew;
+	bool		respill;
+
+	Assert(setopstate->hashtable->hashtab->members == 0);
+	setopstate->spilled = false;
+
+	file = OpenNextSpillFile(setopstate->spillset, &respill);
+	if (!file)
+		return false;
+
+	setopstate->spilled = false;
+
+	for (;;)
+	{
+		readlen = BufFileRead(file, &loaded_hash, sizeof(uint32));
+		if (readlen == 0)
+		{
+			//elog(NOTICE, "reloaded batch %p with %d tuples", file, setopstate->hashtable->hashtab->members);
+			BufFileClose(file);
+			break;
+		}
+		if (readlen != sizeof(uint32))
+			elog(ERROR, "error reading file");
+		BufFileReadExact(file, &tuplen, sizeof(uint32));
+		loaded_firstTuple = palloc(tuplen);
+		BufFileReadExact(file, loaded_firstTuple, tuplen);
+		Assert(loaded_firstTuple->t_len == tuplen);
+		BufFileReadExact(file, &loaded_pergroup, sizeof(SetOpStatePerGroupData));
+
+		ExecStoreMinimalTuple(loaded_firstTuple, setopstate->spillslot, true);
+
+		if (setopstate->hashtable->usedMem < setopstate->hashtable->allowedMem ||
+			setopstate->hashtable->hashtab->members < 1)
+		{
+			entry = LookupTupleHashEntry(setopstate->hashtable, setopstate->spillslot,
+										 &isnew);
+
+			if (isnew)
+			{
+				entry->additional = alloc_pergroup(setopstate);
+				setopstate->hashtable->usedMem += sizeof(SetOpStatePerGroupData);
+				initialize_counts((SetOpStatePerGroup) entry->additional);
+				if (setopstate->spilled || respill)
+					entry->firstbatch = false;
+				else
+					entry->firstbatch = true;
+			}
+
+			pergroup = (SetOpStatePerGroup) entry->additional;
+			pergroup->data.numLeft += loaded_pergroup.data.numLeft;
+			pergroup->data.numRight += loaded_pergroup.data.numRight;
+		}
+		else
+		{
+			/*
+			 * We are already over the limit. Check if it happens to be in the
+			 * hash table, but if not, re-spill it immediately. This is
+			 * important, so that we process entries in LIFO order. Otherwise,
+			 * if there are enough entries with the same hash value to not fit
+			 * in memory at the time, we might loop and not make any progress.
+			 */
+			entry = LookupTupleHashEntry(setopstate->hashtable,
+										 setopstate->spillslot,
+										 NULL);
+			if (entry)
+			{
+				pergroup = (SetOpStatePerGroup) entry->additional;
+				pergroup->data.numLeft += loaded_pergroup.data.numLeft;
+				pergroup->data.numRight += loaded_pergroup.data.numRight;
+			}
+			else
+			{
+				TupleHashEntryData loaded_entry;
+
+				loaded_entry.hash = loaded_hash;
+				loaded_entry.firstTuple = loaded_firstTuple;
+				loaded_entry.additional = &loaded_pergroup;
+
+				setop_spill_entry(setopstate, &loaded_entry, false);
+			}
+		}
+		ExecClearTuple(setopstate->spillslot);
+	}
+
+	return true;
+}
+
 /*
  * ExecSetOp for hashed case: phase 1, read input and build hash table
  */
@@ -413,6 +605,9 @@ setop_fill_hash_table(SetOpState *setopstate)
 		/* Identify whether it's left or right input */
 		flag = fetch_tuple_flag(setopstate, outerslot);
 
+		/* If we're out of memory, spill some tuples from the hash table. */
+		setop_spill_entries(setopstate);
+
 		if (flag == firstFlag)
 		{
 			/* (still) in first input relation */
@@ -426,7 +621,12 @@ setop_fill_hash_table(SetOpState *setopstate)
 			if (isnew)
 			{
 				entry->additional = alloc_pergroup(setopstate);
+				setopstate->hashtable->usedMem += sizeof(SetOpStatePerGroupData);
 				initialize_counts((SetOpStatePerGroup) entry->additional);
+				if (setopstate->spilled)
+					entry->firstbatch = false;
+				else
+					entry->firstbatch = true;
 			}
 
 			/* Advance the counts */
@@ -438,12 +638,30 @@ setop_fill_hash_table(SetOpState *setopstate)
 			in_first_rel = false;
 
 			/* For tuples not seen previously, do not make hashtable entry */
+			/* If the hash table has already been spilled, we must make entries
+			 * for everything, because we might have spilled an entry for this
+			 * key already.
+			 */
+			isnew = false;
 			entry = LookupTupleHashEntry(setopstate->hashtable, outerslot,
-										 NULL);
+										 setopstate->spilled ? &isnew : NULL);
 
 			/* Advance the counts if entry is already present */
 			if (entry)
+			{
+				/* If new tuple group, initialize counts */
+				if (isnew)
+				{
+					entry->additional = alloc_pergroup(setopstate);
+					setopstate->hashtable->usedMem += sizeof(SetOpStatePerGroupData);
+					initialize_counts((SetOpStatePerGroup) entry->additional);
+					if (setopstate->spilled)
+						entry->firstbatch = false;
+					else
+						entry->firstbatch = true;
+				}
 				advance_counts((SetOpStatePerGroup) entry->additional, flag);
+			}
 		}
 
 		/* Must reset expression context after each hashtable lookup */
@@ -453,6 +671,8 @@ setop_fill_hash_table(SetOpState *setopstate)
 	setopstate->table_filled = true;
 	/* Initialize to walk the hash table */
 	ResetTupleHashIterator(setopstate->hashtable, &setopstate->hashiter);
+
+	elog(NOTICE, "table filled");
 }
 
 /*
@@ -474,31 +694,63 @@ setop_retrieve_hash_table(SetOpState *setopstate)
 	 */
 	while (!setopstate->setop_done)
 	{
+		MinimalTuple tuple;
+
 		CHECK_FOR_INTERRUPTS();
 
 		/*
 		 * Find the next entry in the hash table
 		 */
 		entry = ScanTupleHashTable(setopstate->hashtable, &setopstate->hashiter);
+
 		if (entry == NULL)
 		{
-			/* No more entries in hashtable, so done */
+			/* No more entries in hashtable. But do we have spill files to process?  */
+			if (setopstate->spillset)
+			{
+				if (setop_reload_batch(setopstate))
+				{
+					ResetTupleHashIterator(setopstate->hashtable, &setopstate->hashiter);
+					continue;
+				}
+			}
 			setopstate->setop_done = true;
 			return NULL;
 		}
 
+		if (!entry->firstbatch)
+		{
+			/*
+			 * re-spill this entry, because we might have spilled some earlier entries
+			 * with this key already.
+			 */
+			setop_spill_entry(setopstate, entry, true);
+			continue;
+		}
+
 		/*
 		 * See if we should emit any copies of this tuple, and if so return
 		 * the first copy.
 		 */
 		set_output_count(setopstate, (SetOpStatePerGroup) entry->additional);
 
+		tuple = entry->firstTuple;
+
+		free_pergroup(setopstate, entry->additional);
+		setopstate->hashtable->usedMem -= sizeof(SetOpStatePerGroupData);
+		tuplehash_delete_elem(setopstate->hashtable->hashtab, entry, entry->hash);
+		setopstate->hashtable->usedMem -= GetMemoryChunkSpace(tuple);
+
 		if (setopstate->numOutput > 0)
 		{
 			setopstate->numOutput--;
-			return ExecStoreMinimalTuple(entry->firstTuple,
+			return ExecStoreMinimalTuple(tuple,
 										 resultTupleSlot,
-										 false);
+										 true);
+		}
+		else
+		{
+			pfree(tuple);
 		}
 	}
 
@@ -619,6 +871,9 @@ ExecEndSetOp(SetOpState *node)
 	/* clean up tuple table */
 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
+	if (node->spillset)
+		CloseHashSpillSet(node->spillset);
+
 	/* free subsidiary stuff including hashtable */
 	if (node->tableContext)
 		MemoryContextDelete(node->tableContext);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index a7ea3c7d10..0f972bf47b 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -17,6 +17,7 @@
 #include "executor/execdesc.h"
 #include "nodes/parsenodes.h"
 #include "utils/memutils.h"
+#include "storage/buffile.h"
 
 
 /*
@@ -136,6 +137,16 @@ extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable,
 				   TupleTableSlot *slot,
 				   ExprState *eqcomp,
 				   FmgrInfo *hashfunctions);
+extern TupleHashEntry SpillTupleHashTable(TupleHashTable hashtable);
+
+/*
+ * prototypes from functions in execHashSpill.c
+ */
+extern HashSpillSet *CreateHashSpillSet(int64 target_file_size);
+extern BufFile *GetSpillFile(HashSpillSet *sp, uint32 hash);
+extern BufFile *OpenNextSpillFile(HashSpillSet *sp, bool *respill);
+extern void CloseHashSpillSet(HashSpillSet *sp);
+
 
 /*
  * prototypes from functions in execJunk.c
diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h
index 5273d49460..5785c578b0 100644
--- a/src/include/lib/simplehash.h
+++ b/src/include/lib/simplehash.h
@@ -74,6 +74,7 @@
 #define SH_DESTROY SH_MAKE_NAME(destroy)
 #define SH_INSERT SH_MAKE_NAME(insert)
 #define SH_DELETE SH_MAKE_NAME(delete)
+#define SH_DELETE_ELEM SH_MAKE_NAME(delete_elem)
 #define SH_LOOKUP SH_MAKE_NAME(lookup)
 #define SH_GROW SH_MAKE_NAME(grow)
 #define SH_START_ITERATE SH_MAKE_NAME(start_iterate)
@@ -144,6 +145,7 @@ SH_SCOPE void SH_GROW(SH_TYPE * tb, uint32 newsize);
 SH_SCOPE	SH_ELEMENT_TYPE *SH_INSERT(SH_TYPE * tb, SH_KEY_TYPE key, bool *found);
 SH_SCOPE	SH_ELEMENT_TYPE *SH_LOOKUP(SH_TYPE * tb, SH_KEY_TYPE key);
 SH_SCOPE bool SH_DELETE(SH_TYPE * tb, SH_KEY_TYPE key);
+SH_SCOPE void SH_DELETE_ELEM(SH_TYPE * tb, SH_ELEMENT_TYPE *elem, uint32 hash);
 SH_SCOPE void SH_START_ITERATE(SH_TYPE * tb, SH_ITERATOR * iter);
 SH_SCOPE void SH_START_ITERATE_AT(SH_TYPE * tb, SH_ITERATOR * iter, uint32 at);
 SH_SCOPE	SH_ELEMENT_TYPE *SH_ITERATE(SH_TYPE * tb, SH_ITERATOR * iter);
@@ -697,54 +699,61 @@ SH_DELETE(SH_TYPE * tb, SH_KEY_TYPE key)
 		if (entry->status == SH_STATUS_IN_USE &&
 			SH_COMPARE_KEYS(tb, hash, key, entry))
 		{
-			SH_ELEMENT_TYPE *lastentry = entry;
+			SH_DELETE_ELEM(tb, entry, hash);
+			return true;
+		}
 
-			tb->members--;
+		/* TODO: return false; if distance too big */
 
-			/*
-			 * Backward shift following elements till either an empty element
-			 * or an element at its optimal position is encountered.
-			 *
-			 * While that sounds expensive, the average chain length is short,
-			 * and deletions would otherwise require tombstones.
-			 */
-			while (true)
-			{
-				SH_ELEMENT_TYPE *curentry;
-				uint32		curhash;
-				uint32		curoptimal;
+		curelem = SH_NEXT(tb, curelem, startelem);
+	}
+}
 
-				curelem = SH_NEXT(tb, curelem, startelem);
-				curentry = &tb->data[curelem];
+SH_SCOPE void
+SH_DELETE_ELEM(SH_TYPE * tb, SH_ELEMENT_TYPE *entry, uint32 hash)
+{
+	SH_ELEMENT_TYPE *lastentry = entry;
+	uint32		startelem = SH_INITIAL_BUCKET(tb, hash);
+	uint32		curelem = entry - &tb->data[0];
 
-				if (curentry->status != SH_STATUS_IN_USE)
-				{
-					lastentry->status = SH_STATUS_EMPTY;
-					break;
-				}
+	tb->members--;
 
-				curhash = SH_ENTRY_HASH(tb, curentry);
-				curoptimal = SH_INITIAL_BUCKET(tb, curhash);
+	/*
+	 * Backward shift following elements till either an empty element
+	 * or an element at its optimal position is encountered.
+	 *
+	 * While that sounds expensive, the average chain length is short,
+	 * and deletions would otherwise require tombstones.
+	 */
+	while (true)
+	{
+		SH_ELEMENT_TYPE *curentry;
+		uint32		curhash;
+		uint32		curoptimal;
 
-				/* current is at optimal position, done */
-				if (curoptimal == curelem)
-				{
-					lastentry->status = SH_STATUS_EMPTY;
-					break;
-				}
+		curelem = SH_NEXT(tb, curelem, startelem);
+		curentry = &tb->data[curelem];
 
-				/* shift */
-				memcpy(lastentry, curentry, sizeof(SH_ELEMENT_TYPE));
+		if (curentry->status != SH_STATUS_IN_USE)
+		{
+			lastentry->status = SH_STATUS_EMPTY;
+			break;
+		}
 
-				lastentry = curentry;
-			}
+		curhash = SH_ENTRY_HASH(tb, curentry);
+		curoptimal = SH_INITIAL_BUCKET(tb, curhash);
 
-			return true;
+		/* current is at optimal position, done */
+		if (curoptimal == curelem)
+		{
+			lastentry->status = SH_STATUS_EMPTY;
+			break;
 		}
 
-		/* TODO: return false; if distance too big */
+		/* shift */
+		memcpy(lastentry, curentry, sizeof(SH_ELEMENT_TYPE));
 
-		curelem = SH_NEXT(tb, curelem, startelem);
+		lastentry = curentry;
 	}
 }
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a62d299e67..58c8036003 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -650,7 +650,8 @@ typedef struct TupleHashEntryData
 {
 	MinimalTuple firstTuple;	/* copy of first tuple in this group */
 	void	   *additional;		/* user data */
-	uint32		status;			/* hash status */
+	bool		firstbatch;
+	uint8		status;			/* hash status */
 	uint32		hash;			/* hash value (cached) */
 } TupleHashEntryData;
 
@@ -679,6 +680,13 @@ typedef struct TupleHashTableData
 	ExprState  *cur_eq_func;	/* comparator for for input vs. table */
 	uint32		hash_iv;		/* hash-function IV */
 	ExprContext *exprcontext;	/* expression context */
+
+	int64		usedMem;
+	int64		allowedMem;
+
+	/* batches */
+	tuplehash_iterator spill_iter;
+	bool		spill_iter_inited;
 }			TupleHashTableData;
 
 typedef tuplehash_iterator TupleHashIterator;
@@ -697,6 +705,9 @@ typedef tuplehash_iterator TupleHashIterator;
 #define ScanTupleHashTable(htable, iter) \
 	tuplehash_iterate(htable->hashtab, iter)
 
+/* Support for spilling hash tables */
+typedef struct HashSpillSet HashSpillSet; /* private to execHashSpill.c */
+
 
 /* ----------------------------------------------------------------
  *				 Expression State Nodes
@@ -2167,6 +2178,11 @@ typedef struct SetOpState
 	TupleHashIterator hashiter; /* for iterating through hash table */
 
 	SetOpStatePerGroup free_pergroups; /* list of free per-group structs */
+
+	HashSpillSet *spillset;
+	bool		spilled;		/* has the current batch spilled? */
+	TupleTableSlot *spillslot;
+
 } SetOpState;
 
 /* ----------------
-- 
2.11.0


