>From fbb2baf9b60d9f57182be3b58b3678cfe52b2931 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@pgaddict.com>
Date: Sun, 27 Dec 2015 18:26:34 +0100
Subject: [PATCH 2/2] hash bloom filters v2

- extend bloom filters to unbatched case

- always start with nbatch=1 even when we assume we'll need to batch
  (helps with over-estimates and also sizing of the bloom filter)

- use HyperLogLog to count distinct values and size the bloom filter
  (we can do this thanks to always starting with nbatch=1)

- refactorings and cleanups
---
 src/backend/executor/nodeHash.c     | 259 +++++++++++++++++++++++++++++
 src/backend/executor/nodeHashjoin.c |  24 ++-
 src/backend/utils/adt/Makefile      |   2 +-
 src/backend/utils/adt/murmur3.c     | 315 ++++++++++++++++++++++++++++++++++++
 src/backend/utils/misc/guc.c        |   9 ++
 src/include/executor/hashjoin.h     |  18 +++
 src/include/optimizer/cost.h        |   1 +
 src/include/utils/murmur3.h         |  29 ++++
 8 files changed, 655 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/utils/adt/murmur3.c
 create mode 100644 src/include/utils/murmur3.h

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index c53d485..ebfc6c2 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -31,12 +31,15 @@
 #include "executor/hashjoin.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "lib/hyperloglog.h"
 #include "miscadmin.h"
 #include "utils/dynahash.h"
 #include "utils/memutils.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
+#include "utils/murmur3.h"
 
+bool		enable_hashjoin_bloom = true;
 
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
 static void ExecHashBuildBuckets(HashJoinTable hashtable);
@@ -47,9 +50,17 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
 						uint32 hashvalue,
 						int bucketNumber);
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
+static void ExecHashHLLAddValue(HashJoinTable hashtable, uint32 hashvalue);
+static void ExecHashBloomAddValue(HashJoinTable hashtable, uint32 hashvalue);
 
 static void *dense_alloc(HashJoinTable hashtable, Size size);
 
+static BloomFilter BloomFilterInit(double nrows, double error);
+
+/* let's shoot for 5% false positives error rate (arbitrary value) */
+#define BLOOM_ERROR_RATE 0.05
+
+
 /* ----------------------------------------------------------------
  *		ExecHash
  *
@@ -112,6 +123,22 @@ MultiExecHash(HashState *node)
 		{
 			int			bucketNumber;
 
+			/*
+			 * If we're interested in building the bloom filter, add the hash
+			 * value either to the hyperloglog counter or to the bloom filter,
+			 * depending on which phase we're in. If we're still not batching,
+			 * we only have the hyperloglog counter, otherwise we have the
+			 * bloom filter.
+			 *
+			 * XXX We may still not build any of the two things (HLL, bloom),
+			 *     e.g. if we decide not to based on estimates or at runtime.
+			 */
+			if (enable_hashjoin_bloom)
+			{
+				ExecHashHLLAddValue(hashtable, hashvalue);
+				ExecHashBloomAddValue(hashtable, hashvalue);
+			}
+
 			bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
 			if (bucketNumber != INVALID_SKEW_BUCKET_NO)
 			{
@@ -307,6 +334,35 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 		hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 
+	/* only one of those is assumed to be non-NULL */
+	hashtable->bloomFilter = NULL;
+	hashtable->hll = NULL;
+
+	/*
+	 * We don't quite know how many distinct values to expect, so we'll use
+	 * the estimated number of rowsto be added to the hash table, and assume
+	 * they're all distinct.
+	 *
+	 * But if we already know there'll be multiple batches, we can't use hll
+	 * because we need to start building the bloom filter right away. In that
+	 * case we simply assume all the values are unique - we'll probably use
+	 * larger (and thus less efficient) bloom filter. It may not matter that
+	 * much because the possible savings (reduced I/O etc.) are much more
+	 * significant.
+	 *
+	 * XXX What we could do is always build the first batch in memory, thus
+	 *     get ndistinct estimate from hyperloglog and then immediately
+	 *     switch to the originally estimated number of batches.
+	 *
+	 * XXX Not really sure how to size the HLL, so the bwidth value may be
+	 *     too high (and the counter too big).
+	 */
+	if (enable_hashjoin_bloom)
+	{
+		hashtable->hll = palloc0(sizeof(hyperLogLogState));
+		initHyperLogLogError(hashtable->hll, 0.05);
+	}
+
 	/*
 	 * Get info about the hash functions to be used for each hash key. Also
 	 * remember whether the join operators are strict.
@@ -588,6 +644,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	long		ninmemory;
 	long		nfreed;
 	HashMemoryChunk oldchunks;
+	bool		build_bloom_filter = false;
 
 	/* do nothing if we've decided to shut off growth */
 	if (!hashtable->growEnabled)
@@ -649,6 +706,61 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	ninmemory = nfreed = 0;
 
 	/*
+	 * If we're switching to batched mode (and the bloom filters are enabled),
+	 * we need to start build the bloom filter.
+	 *
+	 * To size the bloom filter, we'll use the hyperloglog counter to estimate
+	 * the number of distinct values in the first batch, multiplied by the
+	 * expected number of batches. This may not work when the estimate is wrong,
+	 * but we can't really do better.
+	 *
+	 * XXX The problem here is that if we started with nbatch=1 and we're
+	 *     starting to batch now, it means we've under-estimated the cardinality
+	 *     somehow. And we don't know how much, but it's quite likely it's not
+	 *     just by a factor of 2 (which is assumed in the next block). So maybe
+	 *     we should be more pessimistic and use a higher number of batches.
+	 *
+	 * XXX We also need to set some maximum number of tuples when the false
+	 *     positive rate gets too bad, and stop using the bloom filter if we
+	 *     reach it (we can't resize the filter).
+	 *
+	 * XXX We can't really resize the bloom filter if we reach the number of
+	 *     distinct values, but there was a paper about adding a larger bloom
+	 *     filter once we fill the existing one, and using them at the same
+	 *     time. Might be worth implementing if the whole bloom filter idea
+	 *     works in general.
+	 *
+	 * We also need to make sure we added the hash values into the bloom
+	 * filter in this case (that's what build_bloom_filter is for).
+	 */
+	if (enable_hashjoin_bloom && (oldnbatch == 1))
+	{
+		double ndistinct;
+
+		/* must have the counter to size bloom filter properly */
+		Assert(hashtable->hll);
+
+		ndistinct = estimateHyperLogLog(hashtable->hll);
+
+		/* We don't really want tiny bloom filters. */
+		if (ndistinct < 1024)
+			ndistinct = 1024;
+
+		elog(WARNING, "ExecHashIncreaseNumBatches: ndistinct %ld",
+					  (int64)(nbatch * ndistinct));
+
+		hashtable->bloomFilter = BloomFilterInit(nbatch * ndistinct,
+												 BLOOM_ERROR_RATE);
+
+		/* Free the HLL state (entirely). */
+		freeHyperLogLog(hashtable->hll);
+		pfree(hashtable->hll);
+		hashtable->hll = NULL;
+
+		build_bloom_filter = true;
+	}
+
+	/*
 	 * We will scan through the chunks directly, so that we can reset the
 	 * buckets now and not have to keep track which tuples in the buckets have
 	 * already been processed. We will free the old chunks as we go.
@@ -677,6 +789,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
 
+			if (build_bloom_filter)
+				ExecHashBloomAddValue(hashtable, hashTuple->hashvalue);
+
 			if (batchno == curbatch)
 			{
 				/* keep tuple in memory - copy it into the new chunk */
@@ -736,6 +851,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 static void
 ExecHashBuildBuckets(HashJoinTable hashtable)
 {
+	bool build_bloom_filter = false;
 	HashMemoryChunk chunk;
 
 #ifdef HJDEBUG
@@ -760,6 +876,33 @@ ExecHashBuildBuckets(HashJoinTable hashtable)
 	/* Don't forget to zero the buckets (AllocHuge does not do that). */
 	memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
 
+	/*
+	 * If we're still in the 'count-ndistinct phase, we need to build the
+	 * bloom filter at this point.
+	 */
+	if (hashtable->hll != NULL)
+	{
+		double ndistinct;
+
+		ndistinct = estimateHyperLogLog(hashtable->hll);
+
+		if (ndistinct < 1024)
+			ndistinct = 1024;
+
+		elog(WARNING, "ExecHashBuildBuckets: building bloom filter (ndistinct %ld)",
+					  (int64)(2*ndistinct));
+
+		hashtable->bloomFilter = BloomFilterInit(2 * ndistinct,
+												 BLOOM_ERROR_RATE);
+
+		/* Free the HLL state. */
+		freeHyperLogLog(hashtable->hll);
+		pfree(hashtable->hll);
+		hashtable->hll = NULL;
+
+		build_bloom_filter = true;
+	}
+
 	/* scan through all tuples in all chunks to rebuild the hash table */
 	for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
 	{
@@ -775,6 +918,9 @@ ExecHashBuildBuckets(HashJoinTable hashtable)
 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
 
+			if (build_bloom_filter)
+				ExecHashBloomAddValue(hashtable, hashTuple->hashvalue);
+
 			/* add the tuple to the proper bucket */
 			hashTuple->next = hashtable->buckets[bucketno];
 			hashtable->buckets[bucketno] = hashTuple;
@@ -1683,3 +1829,116 @@ dense_alloc(HashJoinTable hashtable, Size size)
 	/* return pointer to the start of the tuple memory */
 	return ptr;
 }
+
+static void
+ExecHashHLLAddValue(HashJoinTable hashtable, uint32 hashvalue)
+{
+	hyperLogLogState   *hll = hashtable->hll;
+
+	/*
+	 * If we don't have a HLL counter, then we're not supposed to build
+	 * it and we can just bail out.
+	 */
+	if (hll == NULL)
+		return;
+
+	/* We can't have both counter and filter at the same time. */
+	Assert(hashtable->bloomFilter == NULL);
+
+	addHyperLogLog(hashtable->hll, hashvalue);
+}
+
+static void
+ExecHashBloomAddValue(HashJoinTable hashtable, uint32 hashvalue)
+{
+	int			i, byteIdx, bitIdx;
+	BloomFilter	filter = hashtable->bloomFilter;
+
+	/*
+	 * If we don't have a bloom filter, then we're not supposed to build
+	 * it and we can just bail out.
+	 */
+	if (filter == NULL)
+		return;
+
+	/* We can't have both counter and filter at the same time. */
+	Assert(hashtable->hll == NULL);
+
+	/*
+	 * We only build bloom filters while in the first batch (with all
+	 * tuples, so it makes no sense to re-add them over and over).
+	 */
+	if (hashtable->curbatch > 0)
+		return;
+
+	/*
+	 * To get multiple independent hash functions, we simply use
+	 * different seeds for them. We use murmur3 with 32-bit values here,
+	 * but we might use anything sufficiently random and fast (e.g.
+	 * jenkinks hash or such).
+	 */
+	for (i = 0; i < filter->nhashes; i++)
+	{
+		uint32_t seed = i;
+		uint32_t hash = 0;
+
+		MurmurHash3_x86_32(&hashvalue, sizeof(uint32), seed, &hash);
+
+		hash = hash % filter->nbits;
+
+		byteIdx = (hash / 8);
+		bitIdx = (hash % 8);
+
+		filter->data[byteIdx] |= (0x01 << bitIdx);
+	}
+}
+
+bool
+ExecHashBloomCheckValue(HashJoinTable hashtable, uint32 hashvalue)
+{
+	int			i, byteIdx, bitIdx;
+	BloomFilter	filter = hashtable->bloomFilter;
+
+	if (! filter)
+		return true;
+
+	filter->nlookups++;
+
+	for (i = 0; i < filter->nhashes; i++)
+	{
+		uint32_t seed = i;
+		uint32_t hash = 0;
+
+		MurmurHash3_x86_32(&hashvalue, sizeof(uint32), seed, &hash);
+
+		hash = hash % filter->nbits;
+
+		byteIdx = (hash / 8);
+		bitIdx = (hash % 8);
+
+		if (! (filter->data[byteIdx] & (0x01 << bitIdx)))
+			return false;
+	}
+
+	/* if we got here, we know it's a match */
+	filter->nmatches++;
+
+	return true;
+}
+
+static BloomFilter
+BloomFilterInit(double nrows, double error)
+{
+	/* perhaps we should round nbits to multiples of 8 ? */
+	int nbits = ceil((nrows * log(error)) / log(1.0 / (pow(2.0, log(2.0)))));
+	int nhashes = round(log(2.0) * nbits / nrows);
+
+	BloomFilter filter = palloc0(offsetof(BloomFilterData, data) + ((nbits + 7) / 8));
+
+	filter->nbits = nbits;
+	filter->nhashes = nhashes;
+
+	elog(WARNING, "bloom filter: %d bits (%d bytes), %d hashes", nbits, (nbits + 7) / 8, nhashes);
+
+	return filter;
+}
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 1d78cdf..101f39a 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -48,7 +48,6 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 						  TupleTableSlot *tupleSlot);
 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
 
-
 /* ----------------------------------------------------------------
  *		ExecHashJoin
  *
@@ -210,6 +209,7 @@ ExecHashJoin(HashJoinState *node)
 				outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
 														   node,
 														   &hashvalue);
+
 				if (TupIsNull(outerTupleSlot))
 				{
 					/* end of batch, or maybe whole join */
@@ -238,6 +238,15 @@ ExecHashJoin(HashJoinState *node)
 																 hashvalue);
 				node->hj_CurTuple = NULL;
 
+				/* If still in the first batch, we check the bloom filter. */
+				if ((hashtable->curbatch == 0) &&
+					(! ExecHashBloomCheckValue(hashtable, hashvalue)))
+				{
+						/* no matches; check for possible outer-join fill */
+						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+						continue;
+				}
+
 				/*
 				 * The tuple might not belong to the current batch (where
 				 * "current batch" includes the skew buckets if any).
@@ -608,6 +617,19 @@ ExecEndHashJoin(HashJoinState *node)
 	 */
 	if (node->hj_HashTable)
 	{
+		HashJoinTable hashtable = node->hj_HashTable;
+
+		/*
+		 * If there's a bloom filter, print some debug info before destroying the
+		 * hash table.
+		 */
+		if (hashtable->bloomFilter)
+		{
+			BloomFilter filter = hashtable->bloomFilter;
+			elog(WARNING, "bloom filter lookups=%lu matches=%lu eliminated=%lu%%",
+						  filter->nlookups, filter->nmatches, 100 - (100 * filter->nmatches) / Max(1,filter->nlookups));
+		}
+
 		ExecHashTableDestroy(node->hj_HashTable);
 		node->hj_HashTable = NULL;
 	}
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
index 2cb7bab..365123b 100644
--- a/src/backend/utils/adt/Makefile
+++ b/src/backend/utils/adt/Makefile
@@ -16,7 +16,7 @@ OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \
 	float.o format_type.o formatting.o genfile.o \
 	geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \
 	int8.o json.o jsonb.o jsonb_gin.o jsonb_op.o jsonb_util.o \
-	jsonfuncs.o like.o lockfuncs.o mac.o misc.o nabstime.o name.o \
+	jsonfuncs.o like.o lockfuncs.o mac.o misc.o murmur3.o nabstime.o name.o \
 	network.o network_gist.o network_selfuncs.o \
 	numeric.o numutils.o oid.o oracle_compat.o \
 	orderedsetaggs.o pg_locale.o pg_lsn.o pg_upgrade_support.o \
diff --git a/src/backend/utils/adt/murmur3.c b/src/backend/utils/adt/murmur3.c
new file mode 100644
index 0000000..764aeab
--- /dev/null
+++ b/src/backend/utils/adt/murmur3.c
@@ -0,0 +1,315 @@
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+// Note - The x86 and x64 versions do _not_ produce the same results, as the
+// algorithms are optimized for their respective platforms. You can still
+// compile and run any of them on any platform, but your performance with the
+// non-native version will be less than optimal.
+
+#include "utils/murmur3.h"
+
+//-----------------------------------------------------------------------------
+// Platform-specific functions and macros
+
+#ifdef __GNUC__
+#define FORCE_INLINE __attribute__((always_inline)) inline
+#else
+#define FORCE_INLINE inline
+#endif
+
+static FORCE_INLINE uint32_t rotl32 ( uint32_t x, int8_t r )
+{
+  return (x << r) | (x >> (32 - r));
+}
+
+static FORCE_INLINE uint64_t rotl64 ( uint64_t x, int8_t r )
+{
+  return (x << r) | (x >> (64 - r));
+}
+
+#define	ROTL32(x,y)	rotl32(x,y)
+#define ROTL64(x,y)	rotl64(x,y)
+
+#define BIG_CONSTANT(x) (x##LLU)
+
+//-----------------------------------------------------------------------------
+// Block read - if your platform needs to do endian-swapping or can only
+// handle aligned reads, do the conversion here
+
+#define getblock(p, i) (p[i])
+
+//-----------------------------------------------------------------------------
+// Finalization mix - force all bits of a hash block to avalanche
+
+static FORCE_INLINE uint32_t fmix32 ( uint32_t h )
+{
+  h ^= h >> 16;
+  h *= 0x85ebca6b;
+  h ^= h >> 13;
+  h *= 0xc2b2ae35;
+  h ^= h >> 16;
+
+  return h;
+}
+
+//----------
+
+static FORCE_INLINE uint64_t fmix64 ( uint64_t k )
+{
+  k ^= k >> 33;
+  k *= BIG_CONSTANT(0xff51afd7ed558ccd);
+  k ^= k >> 33;
+  k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
+  k ^= k >> 33;
+
+  return k;
+}
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x86_32 ( const void * key, int len,
+                          uint32_t seed, void * out )
+{
+  const uint8_t * data = (const uint8_t*)key;
+  const int nblocks = len / 4;
+  int i;
+
+  uint32_t h1 = seed;
+
+  uint32_t c1 = 0xcc9e2d51;
+  uint32_t c2 = 0x1b873593;
+
+  //----------
+  // body
+
+  const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
+
+  for(i = -nblocks; i; i++)
+  {
+    uint32_t k1 = getblock(blocks,i);
+
+    k1 *= c1;
+    k1 = ROTL32(k1,15);
+    k1 *= c2;
+    
+    h1 ^= k1;
+    h1 = ROTL32(h1,13); 
+    h1 = h1*5+0xe6546b64;
+  }
+
+  //----------
+  // tail
+
+  const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
+
+  uint32_t k1 = 0;
+
+  switch(len & 3)
+  {
+  case 3: k1 ^= tail[2] << 16;
+  case 2: k1 ^= tail[1] << 8;
+  case 1: k1 ^= tail[0];
+          k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+  };
+
+  //----------
+  // finalization
+
+  h1 ^= len;
+
+  h1 = fmix32(h1);
+
+  *(uint32_t*)out = h1;
+} 
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x86_128 ( const void * key, const int len,
+                           uint32_t seed, void * out )
+{
+  const uint8_t * data = (const uint8_t*)key;
+  const int nblocks = len / 16;
+  int i;
+
+  uint32_t h1 = seed;
+  uint32_t h2 = seed;
+  uint32_t h3 = seed;
+  uint32_t h4 = seed;
+
+  uint32_t c1 = 0x239b961b; 
+  uint32_t c2 = 0xab0e9789;
+  uint32_t c3 = 0x38b34ae5; 
+  uint32_t c4 = 0xa1e38b93;
+
+  //----------
+  // body
+
+  const uint32_t * blocks = (const uint32_t *)(data + nblocks*16);
+
+  for(i = -nblocks; i; i++)
+  {
+    uint32_t k1 = getblock(blocks,i*4+0);
+    uint32_t k2 = getblock(blocks,i*4+1);
+    uint32_t k3 = getblock(blocks,i*4+2);
+    uint32_t k4 = getblock(blocks,i*4+3);
+
+    k1 *= c1; k1  = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+
+    h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b;
+
+    k2 *= c2; k2  = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
+
+    h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747;
+
+    k3 *= c3; k3  = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
+
+    h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35;
+
+    k4 *= c4; k4  = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
+
+    h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17;
+  }
+
+  //----------
+  // tail
+
+  const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
+
+  uint32_t k1 = 0;
+  uint32_t k2 = 0;
+  uint32_t k3 = 0;
+  uint32_t k4 = 0;
+
+  switch(len & 15)
+  {
+  case 15: k4 ^= tail[14] << 16;
+  case 14: k4 ^= tail[13] << 8;
+  case 13: k4 ^= tail[12] << 0;
+           k4 *= c4; k4  = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
+
+  case 12: k3 ^= tail[11] << 24;
+  case 11: k3 ^= tail[10] << 16;
+  case 10: k3 ^= tail[ 9] << 8;
+  case  9: k3 ^= tail[ 8] << 0;
+           k3 *= c3; k3  = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
+
+  case  8: k2 ^= tail[ 7] << 24;
+  case  7: k2 ^= tail[ 6] << 16;
+  case  6: k2 ^= tail[ 5] << 8;
+  case  5: k2 ^= tail[ 4] << 0;
+           k2 *= c2; k2  = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
+
+  case  4: k1 ^= tail[ 3] << 24;
+  case  3: k1 ^= tail[ 2] << 16;
+  case  2: k1 ^= tail[ 1] << 8;
+  case  1: k1 ^= tail[ 0] << 0;
+           k1 *= c1; k1  = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+  };
+
+  //----------
+  // finalization
+
+  h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len;
+
+  h1 += h2; h1 += h3; h1 += h4;
+  h2 += h1; h3 += h1; h4 += h1;
+
+  h1 = fmix32(h1);
+  h2 = fmix32(h2);
+  h3 = fmix32(h3);
+  h4 = fmix32(h4);
+
+  h1 += h2; h1 += h3; h1 += h4;
+  h2 += h1; h3 += h1; h4 += h1;
+
+  ((uint32_t*)out)[0] = h1;
+  ((uint32_t*)out)[1] = h2;
+  ((uint32_t*)out)[2] = h3;
+  ((uint32_t*)out)[3] = h4;
+}
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x64_128 ( const void * key, const int len,
+                           const uint32_t seed, void * out )
+{
+  const uint8_t * data = (const uint8_t*)key;
+  const int nblocks = len / 16;
+  int i;
+
+  uint64_t h1 = seed;
+  uint64_t h2 = seed;
+
+  uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
+  uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
+
+  //----------
+  // body
+
+  const uint64_t * blocks = (const uint64_t *)(data);
+
+  for(i = 0; i < nblocks; i++)
+  {
+    uint64_t k1 = getblock(blocks,i*2+0);
+    uint64_t k2 = getblock(blocks,i*2+1);
+
+    k1 *= c1; k1  = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
+
+    h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729;
+
+    k2 *= c2; k2  = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
+
+    h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5;
+  }
+
+  //----------
+  // tail
+
+  const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
+
+  uint64_t k1 = 0;
+  uint64_t k2 = 0;
+
+  switch(len & 15)
+  {
+  case 15: k2 ^= (uint64_t)(tail[14]) << 48;
+  case 14: k2 ^= (uint64_t)(tail[13]) << 40;
+  case 13: k2 ^= (uint64_t)(tail[12]) << 32;
+  case 12: k2 ^= (uint64_t)(tail[11]) << 24;
+  case 11: k2 ^= (uint64_t)(tail[10]) << 16;
+  case 10: k2 ^= (uint64_t)(tail[ 9]) << 8;
+  case  9: k2 ^= (uint64_t)(tail[ 8]) << 0;
+           k2 *= c2; k2  = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
+
+  case  8: k1 ^= (uint64_t)(tail[ 7]) << 56;
+  case  7: k1 ^= (uint64_t)(tail[ 6]) << 48;
+  case  6: k1 ^= (uint64_t)(tail[ 5]) << 40;
+  case  5: k1 ^= (uint64_t)(tail[ 4]) << 32;
+  case  4: k1 ^= (uint64_t)(tail[ 3]) << 24;
+  case  3: k1 ^= (uint64_t)(tail[ 2]) << 16;
+  case  2: k1 ^= (uint64_t)(tail[ 1]) << 8;
+  case  1: k1 ^= (uint64_t)(tail[ 0]) << 0;
+           k1 *= c1; k1  = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
+  };
+
+  //----------
+  // finalization
+
+  h1 ^= len; h2 ^= len;
+
+  h1 += h2;
+  h2 += h1;
+
+  h1 = fmix64(h1);
+  h2 = fmix64(h2);
+
+  h1 += h2;
+  h2 += h1;
+
+  ((uint64_t*)out)[0] = h1;
+  ((uint64_t*)out)[1] = h2;
+}
+
+//-----------------------------------------------------------------------------
+
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a185749..ffc1281 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -864,6 +864,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_hashjoin_bloom", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the use of bloom filters in hash joins."),
+			NULL
+		},
+		&enable_hashjoin_bloom,
+		true,
+		NULL, NULL, NULL
+	},
+	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
 			gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 255c506..df22361 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -16,6 +16,7 @@
 
 #include "nodes/execnodes.h"
 #include "storage/buffile.h"
+#include "lib/hyperloglog.h"
 
 /* ----------------------------------------------------------------
  *				hash-join hash table structures
@@ -72,6 +73,17 @@ typedef struct HashJoinTupleData
 #define HJTUPLE_MINTUPLE(hjtup)  \
 	((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
 
+typedef struct BloomFilterData
+{
+	uint64	nlookups;		/* number of lookups */
+	uint64	nmatches;		/* number of matches */
+	int		nbits;			/* m */
+	int		nhashes;		/* k */
+	char	data[1];		/* bits */
+}	BloomFilterData;
+
+typedef BloomFilterData *BloomFilter;
+
 /*
  * If the outer relation's distribution is sufficiently nonuniform, we attempt
  * to optimize the join by treating the hash values corresponding to the outer
@@ -181,6 +193,12 @@ typedef struct HashJoinTableData
 
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
+
+	/* used only when the hash join has multiple batches */
+	BloomFilter	bloomFilter;	/* bloom filter on the hash values */
+	hyperLogLogState   *hll;	/* used to to size bloom filter */
 }	HashJoinTableData;
 
+bool ExecHashBloomCheckValue(HashJoinTable hashtable, uint32 hashvalue);
+
 #endif   /* HASHJOIN_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac21a3a..999bf84 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -66,6 +66,7 @@ extern bool enable_nestloop;
 extern bool enable_material;
 extern bool enable_mergejoin;
 extern bool enable_hashjoin;
+extern bool enable_hashjoin_bloom;
 extern int	constraint_exclusion;
 
 extern double clamp_row_est(double nrows);
diff --git a/src/include/utils/murmur3.h b/src/include/utils/murmur3.h
new file mode 100644
index 0000000..e12bf08
--- /dev/null
+++ b/src/include/utils/murmur3.h
@@ -0,0 +1,29 @@
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the
+// public domain. The author hereby disclaims copyright to this source
+// code.
+
+#ifndef _MURMURHASH3_H_
+#define _MURMURHASH3_H_
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//-----------------------------------------------------------------------------
+
+void MurmurHash3_x86_32 (const void *key, int len, uint32_t seed, void *out);
+
+void MurmurHash3_x86_128(const void *key, int len, uint32_t seed, void *out);
+
+void MurmurHash3_x64_128(const void *key, int len, uint32_t seed, void *out);
+
+//-----------------------------------------------------------------------------
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // _MURMURHASH3_H_
-- 
2.1.0

