From 610fed13c7e2f418c8d574e85e9fa6fef97dbef6 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Mon, 19 Feb 2018 18:56:10 +0100
Subject: [PATCH] v2 of hashjoin-bloom patch

---
 src/backend/commands/explain.c         |  24 +++
 src/backend/executor/nodeHash.c        | 276 ++++++++++++++++++++++++++++++++-
 src/backend/executor/nodeHashjoin.c    |  22 +++
 src/backend/utils/misc/guc.c           |  18 +++
 src/include/executor/hashjoin.h        |  16 ++
 src/include/nodes/execnodes.h          |   5 +
 src/include/optimizer/cost.h           |   2 +
 src/include/utils/hashutils.h          |  35 +++++
 src/test/regress/expected/sysviews.out |   3 +-
 9 files changed, 399 insertions(+), 2 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 900fa74..2ff7949 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2474,6 +2474,30 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 							 spacePeakKb);
 		}
 	}
+
+	if (hinstrument.bloom_nbytes > 0)
+	{
+		if (es->format != EXPLAIN_FORMAT_TEXT)
+		{
+			ExplainPropertyLong("Bloom Filter Bytes", hinstrument.bloom_nbytes, es);
+			ExplainPropertyLong("Bloom Filter Hashes", hinstrument.bloom_nhashes, es);
+			ExplainPropertyLong("Bloom Filter Lookups", hinstrument.bloom_nlookups, es);
+			ExplainPropertyLong("Bloom Filter Matches", hinstrument.bloom_nmatches, es);
+			ExplainPropertyFloat("Bloom Filter Bits Set",
+				hinstrument.bloom_nbits * 100.0 / (hinstrument.bloom_nbytes * 8), 2, es);
+		}
+		else
+		{
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str,
+							 "Bloom Filter Size: %ldkB  Hashes: %d  Lookups: %d  Matches: %d  Bits Set: %.2f%%\n",
+							 hinstrument.bloom_nbytes/1024,
+							 hinstrument.bloom_nhashes,
+							 hinstrument.bloom_nlookups,
+							 hinstrument.bloom_nmatches,
+							 hinstrument.bloom_nbits * 100.0 / (hinstrument.bloom_nbytes * 8));
+		}
+	}
 }
 
 /*
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 06bb44b..aef423a 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -38,11 +38,11 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "utils/dynahash.h"
+#include "utils/hashutils.h"
 #include "utils/memutils.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
 static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
 static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable);
@@ -54,6 +54,10 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
 						uint32 hashvalue,
 						int bucketNumber);
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
+static void ExecHashBloomAddValue(HashJoinTable hashtable, uint32 hashvalue);
+
+static BloomFilter BloomFilterInitRows(double nrows, double error);
+static BloomFilter BloomFilterInitBytes(Size nbytes, double error);
 
 static void *dense_alloc(HashJoinTable hashtable, Size size);
 static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable,
@@ -80,6 +84,12 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+/* should we build bloom filter? should we force building it? */
+bool		enable_hashjoin_bloom = true;
+bool		force_hashjoin_bloom = false;
+
+/* Shoot for 5% false positives error rate (arbitrary value). */
+#define BLOOM_ERROR_RATE	0.05
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -172,6 +182,9 @@ MultiExecPrivateHash(HashState *node)
 		{
 			int			bucketNumber;
 
+			/* Add the hash value to the bloom filter (if needed). */
+			ExecHashBloomAddValue(hashtable, hashvalue);
+
 			bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
 			if (bucketNumber != INVALID_SKEW_BUCKET_NO)
 			{
@@ -198,6 +211,56 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->spaceUsed > hashtable->spacePeak)
 		hashtable->spacePeak = hashtable->spaceUsed;
 
+	/*
+	 * If forced to build a bloom filter, do that now, but only when
+	 * there's a single batch. For batched mode the decision would have
+	 * happened elsewehere earlier (we can't make it here, as the hash
+	 * table contains only subset of the data).
+	 */
+	if (enable_hashjoin_bloom &&
+		force_hashjoin_bloom &&
+		(hashtable->nbatch == 1))
+	{
+		HashMemoryChunk chunks;
+
+		/* no bloom filter allocated yet */
+		Assert(!hashtable->bloomFilter);
+
+		/* batching, so build bloom filter (assume all values are unique) */
+		hashtable->bloomFilter
+			= BloomFilterInitRows(hashtable->totalTuples, BLOOM_ERROR_RATE);
+
+		chunks = hashtable->chunks;
+
+		/* so, let's scan through the chunks, and all tuples in each chunk */
+		while (chunks != NULL)
+		{
+			HashMemoryChunk nextchunks = chunks->next.unshared;
+
+			/* position within the buffer (up to chunks->used) */
+			size_t		idx = 0;
+
+			/* process all tuples stored in this chunk */
+			while (idx < chunks->used)
+			{
+				HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunks) + idx);
+				MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+				int	hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+				ExecHashBloomAddValue(hashtable, hashTuple->hashvalue);
+
+				/* next tuple in this chunk */
+				idx += MAXALIGN(hashTupleSize);
+
+				/* allow this loop to be cancellable */
+				CHECK_FOR_INTERRUPTS();
+			}
+
+			/* we're done with this chunk - proceed to the next one */
+			chunks = nextchunks;
+		}
+	}
+
 	hashtable->partialTuples = hashtable->totalTuples;
 }
 
@@ -509,6 +572,56 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
 
+	/*
+	 * We don't quite know how many distinct values to expect, which is
+	 * essential for proper sizing of the bloom filter. One option is to
+	 * just use the estimated number of rows, and assume they are all
+	 * distinct. That value may be inaccurate for a number of reasons.
+	 * The actual number of rows may be very different, there may be
+	 * duplicate rows, etc.
+	 *
+	 * If we start in multi-batch mode, we rely on the estimated total
+	 * number of rows, and assume all hash values are unique (which is
+	 * often the case, e.g. when joining on PK-FK pair), and use that
+	 * to size the filter. We could also assume there is certain number
+	 * of duplicate values (and use e.g. nrows/10 to size the filter),
+	 * but that would simply hurt other cases.
+	 *
+	 * When starting in a single-batch mode, we do nothing initially.
+	 * If the whole hash table fits into a single batch, we can get
+	 * sufficiently accurate ndistinct estimate by simply counting
+	 * occupied buckets (thanks to shooting for NTUP_PER_BUCKET=1),
+	 * or perhaps we could use something more elaborate (e.g. HLL).
+	 * But we only build the bloom filter if the hash table is large
+	 * enough to exceed on-CPU caches (e.g. 4MB).
+	 *
+	 * If we have to start batching, we don't have much choice. We
+	 * can't really use the original rows estimate, because if it was
+	 * correct we would probably start batching right away.
+	 *
+	 * The best we can do is using some multiple of the ndistinct
+	 * estimate derived from the current hash table. We could double
+	 * it, but it's quite likely the misestimate is worse and we will
+	 * end up adding more batches. So we don't really know what's the
+	 * proper number.
+	 *
+	 * But we can approach this from another angle and consider memory
+	 * consumption. We cerainly don't want to end up with huge bloom
+	 * filter, so we can limit it to 1/8 of the work_mem, and size the
+	 * bloom filter using that.
+	 *
+	 * XXX There are cases where we know the hash table contains all
+	 * rows, e.g. FK-PK join with no restrictions on the PK side.
+	 */
+	if (enable_hashjoin_bloom && (nbatch > 1) && (!node->plan.parallel_aware))
+	{
+		/* batching, so build bloom filter */
+		hashtable->bloomFilter
+			= BloomFilterInitRows(rows, BLOOM_ERROR_RATE);
+	}
+	else	/* not batching, so no bloom filter */
+		hashtable->bloomFilter = NULL;
+
 #ifdef HJDEBUG
 	printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
 		   hashtable, nbatch, nbuckets);
@@ -954,6 +1067,24 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 
 	/*
+	 * If switching to batched mode, we start building the bloom filter.
+	 * But we don't know how to size it, because the estimates we have are
+	 * clearly off (if they were correct, we'd start in batch mode right away).
+	 *
+	 * Instead, we size the bloom filter to use work_mem/8 (mostly arbitrary
+	 * fraction).
+	 *
+	 * We also need to make sure we added the hash values into the bloom
+	 * filter in this case (that's what build_bloom_filter is for).
+	 */
+	if (enable_hashjoin_bloom && (oldnbatch == 1))
+	{
+		hashtable->bloomFilter
+			= BloomFilterInitBytes(work_mem * 1024L/8,
+						BLOOM_ERROR_RATE);
+	}
+
+	/*
 	 * We will scan through the chunks directly, so that we can reset the
 	 * buckets now and not have to keep track which tuples in the buckets have
 	 * already been processed. We will free the old chunks as we go.
@@ -984,6 +1115,13 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
 
+			/*
+			 * Only bother to add the values to bloom filter during the initial
+			 * doubling. For subsequent doublings the bloom filter is up to date.
+			 */
+			if (oldnbatch == 1)
+				ExecHashBloomAddValue(hashtable, hashTuple->hashvalue);
+
 			if (batchno == curbatch)
 			{
 				/* keep tuple in memory - copy it into the new chunk */
@@ -2640,6 +2778,26 @@ ExecHashGetInstrumentation(HashInstrumentation *instrument,
 	instrument->nbatch = hashtable->nbatch;
 	instrument->nbatch_original = hashtable->nbatch_original;
 	instrument->space_peak = hashtable->spacePeak;
+
+	if (hashtable->bloomFilter)
+	{
+		int i, j;
+
+		instrument->bloom_nhashes = hashtable->bloomFilter->nhashes;
+		instrument->bloom_nbytes = hashtable->bloomFilter->nbits/8;
+		instrument->bloom_nlookups = hashtable->bloomFilter->nlookups;
+		instrument->bloom_nmatches = hashtable->bloomFilter->nmatches;
+		instrument->bloom_nbits = 0;
+
+		for (i = 0; i < hashtable->bloomFilter->nbits/8; i++)
+		{
+			for (j = 0; j < 8; j++)
+			{
+				if (hashtable->bloomFilter->data[i] & (0x01 << j))
+					instrument->bloom_nbits++;
+			}
+		}
+	}
 }
 
 /*
@@ -3306,3 +3464,119 @@ ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
 
 	return true;
 }
+
+static void
+ExecHashBloomAddValue(HashJoinTable hashtable, uint32 hashvalue)
+{
+	int		i;
+	BloomFilter	filter;
+
+	/* If bloom filter not initialized/requested, we're done. */
+	if (!hashtable->bloomFilter)
+		return;
+
+	/*
+	 * We only build the bloom filter during the initial pass, so do
+	 * nothing if we're processing the other batches.
+	 */
+	if (hashtable->curbatch > 0)
+		return;
+
+	/* for convenience */
+	filter = hashtable->bloomFilter;
+
+	/*
+	 * To get multiple independent hash functions, we simply use
+	 * 32-bit murmur3 with seeds. Seems to be working fine.
+	 */
+	for (i = 0; i < filter->nhashes; i++)
+	{
+		int	byteIdx, bitIdx;
+		uint32	hash;
+
+		hash = murmurhash32_seed(i, hashvalue) % filter->nbits;
+
+		byteIdx = (hash / 8);
+		bitIdx = (hash % 8);
+
+		filter->data[byteIdx] |= (0x01 << bitIdx);
+	}
+}
+
+bool
+ExecHashBloomCheckValue(HashJoinTable hashtable, uint32 hashvalue)
+{
+	int		i;
+	BloomFilter	filter = hashtable->bloomFilter;
+
+	/* if there's no filter, we simply assume match */
+	if (!filter)
+		return true;
+
+	++filter->nlookups;
+
+	for (i = 0; i < filter->nhashes; i++)
+	{
+		int byteIdx, bitIdx;
+		uint32 hash;
+
+		hash = murmurhash32_seed(i, hashvalue) % filter->nbits;
+
+		byteIdx = (hash / 8);
+		bitIdx = (hash % 8);
+
+		/* found missing bit -> mismatch */
+		if (! (filter->data[byteIdx] & (0x01 << bitIdx)))
+			return false;
+	}
+
+	/* if we got here, we know it's a match */
+	++filter->nmatches;
+
+	return true;
+}
+
+/*
+ * Size the bloom filter starting with expected number of entries and
+ * requested error rate.
+ */
+static BloomFilter
+BloomFilterInitRows(double nrows, double error)
+{
+	BloomFilter filter;
+	Size nbits = ceil((nrows * log(error)) / log(1.0 / (pow(2.0, log(2.0)))));
+	int nhashes = round(log(2.0) * nbits / nrows);
+
+	/* round the size to whole bytes */
+	nbits = 8 * ((nbits + 7) / 8);
+
+	filter = palloc0(offsetof(BloomFilterData, data) + (nbits/8));
+
+	filter->nbits = nbits;
+	filter->nhashes = nhashes;
+
+	return filter;
+}
+
+/*
+ * Size the bloom filter starting with available space and error
+ * rate. We do compute the number of entries it can handle with the
+ * requested error rate, but that is mostly useless as we only use
+ * this version when switching from non-batched to batched mode, i.e.
+ * when the original estimate turned out to be incorrect.
+ */
+static BloomFilter
+BloomFilterInitBytes(Size nbytes, double error)
+{
+	BloomFilter filter;
+
+	double nrows = 8 * nbytes * log(1.0 / (pow(2.0, log(2.0)))) / log(error);
+	int nhashes = round(log(2.0) * (8 * nbytes) / nrows);
+
+	filter = palloc0(offsetof(BloomFilterData, data) + nbytes);
+
+	filter->nbits = 8 * nbytes;
+	filter->nhashes = nhashes;
+
+	return filter;
+}
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ab91eb2..cd3d975 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -382,6 +382,15 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 																 hashvalue);
 				node->hj_CurTuple = NULL;
 
+				/* If still in the first batch, we check the bloom filter. */
+				if ((hashtable->curbatch == 0) &&
+					(! ExecHashBloomCheckValue(hashtable, hashvalue)))
+				{
+						/* no matches; check for possible outer-join fill */
+						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+						continue;
+				}
+
 				/*
 				 * The tuple might not belong to the current batch (where
 				 * "current batch" includes the skew buckets if any).
@@ -763,6 +772,19 @@ ExecEndHashJoin(HashJoinState *node)
 	 */
 	if (node->hj_HashTable)
 	{
+		HashJoinTable hashtable = node->hj_HashTable;
+
+		/*
+		 * If there's a bloom filter, print some debug info before destroying the
+		 * hash table.
+		 */
+		if (hashtable->bloomFilter)
+		{
+			BloomFilter filter = hashtable->bloomFilter;
+			elog(LOG, "bloom filter lookups=%lu matches=%lu eliminated=%lu%%",
+					  filter->nlookups, filter->nmatches, 100 - (100 * filter->nmatches) / Max(1,filter->nlookups));
+		}
+
 		ExecHashTableDestroy(node->hj_HashTable);
 		node->hj_HashTable = NULL;
 	}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 1db7845..9de2840 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -941,6 +941,24 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_hashjoin_bloom", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the use of bloom filters in hash joins."),
+			NULL
+		},
+		&enable_hashjoin_bloom,
+		true,
+		NULL, NULL, NULL
+	},
+	{
+		{"force_hashjoin_bloom", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Forces the use of bloom filters in hash joins."),
+			NULL
+		},
+		&force_hashjoin_bloom,
+		false,
+		NULL, NULL, NULL
+	},
+	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
 			gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 6fb2dc0..64978e5 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -80,6 +80,17 @@ typedef struct HashJoinTupleData
 #define HJTUPLE_MINTUPLE(hjtup)  \
 	((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
 
+typedef struct BloomFilterData
+{
+	uint64	nlookups;		/* number of lookups */
+	uint64	nmatches;		/* number of matches */
+	int		nbits;			/* m */
+	int		nhashes;		/* k */
+	char	data[1];		/* bits */
+}	BloomFilterData;
+
+typedef BloomFilterData *BloomFilter;
+
 /*
  * If the outer relation's distribution is sufficiently nonuniform, we attempt
  * to optimize the join by treating the hash values corresponding to the outer
@@ -350,6 +361,9 @@ typedef struct HashJoinTableData
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
+	/* bloom filter on Hash (used with batched hash joins) */
+	BloomFilter	bloomFilter;	/* bloom filter on the hash values */
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
@@ -358,4 +372,6 @@ typedef struct HashJoinTableData
 	dsa_pointer current_chunk_shared;
 }			HashJoinTableData;
 
+bool ExecHashBloomCheckValue(HashJoinTable hashtable, uint32 hashvalue);
+
 #endif							/* HASHJOIN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a953820..249a7a0 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2037,6 +2037,11 @@ typedef struct HashInstrumentation
 	int			nbatch;			/* number of batches at end of execution */
 	int			nbatch_original;	/* planned number of batches */
 	size_t		space_peak;		/* speak memory usage in bytes */
+	int			bloom_nhashes;		/* number of hash functions */
+	size_t		bloom_nbytes;		/* number of bytes */
+	int			bloom_nlookups;		/* number of lookups */
+	int			bloom_nmatches;		/* number of matches */
+	int			bloom_nbits;		/* number of bits set */
 } HashInstrumentation;
 
 /* ----------------
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 132e355..5db07ba 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -66,6 +66,8 @@ extern PGDLLIMPORT bool enable_nestloop;
 extern PGDLLIMPORT bool enable_material;
 extern PGDLLIMPORT bool enable_mergejoin;
 extern PGDLLIMPORT bool enable_hashjoin;
+extern PGDLLIMPORT bool enable_hashjoin_bloom;
+extern PGDLLIMPORT bool force_hashjoin_bloom;
 extern PGDLLIMPORT bool enable_gathermerge;
 extern PGDLLIMPORT bool enable_partitionwise_join;
 extern PGDLLIMPORT bool enable_parallel_append;
diff --git a/src/include/utils/hashutils.h b/src/include/utils/hashutils.h
index 5e9fe65..3582c9f 100644
--- a/src/include/utils/hashutils.h
+++ b/src/include/utils/hashutils.h
@@ -50,4 +50,39 @@ murmurhash32(uint32 data)
 	return h;
 }
 
+#define	ROTL32(x,r)	((x << r) | (x >> (32 - r)))
+
+/*
+ * Simple inline murmur hash implementation hashing a 32 bit integer and
+ * 32 bit seed, for performance.
+ *
+ * XXX Check this actually produces same results as MurmurHash3_x86_32.
+ */
+static inline uint32
+murmurhash32_seed(uint32 seed, uint32 data)
+{
+	uint32	h = seed;
+	uint32	k = data;
+	uint32	c1 = 0xcc9e2d51;
+	uint32	c2 = 0x1b873593;
+
+	k *= c1;
+	k = ROTL32(k,15);
+	k *= c2;
+
+	h ^= k;
+	h = ROTL32(h,13); 
+	h = h * 5 + 0xe6546b64;
+
+	h ^= sizeof(uint32);
+
+	h ^= h >> 16;
+	h *= 0x85ebca6b;
+	h ^= h >> 13;
+	h *= 0xc2b2ae35;
+	h ^= h >> 16;
+
+	return h;
+}
+
 #endif							/* HASHUTILS_H */
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 759f7d9..de2e716 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -76,6 +76,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_gathermerge        | on
  enable_hashagg            | on
  enable_hashjoin           | on
+ enable_hashjoin_bloom     | on
  enable_indexonlyscan      | on
  enable_indexscan          | on
  enable_material           | on
@@ -87,7 +88,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan            | on
  enable_sort               | on
  enable_tidscan            | on
-(15 rows)
+(16 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
-- 
2.9.5

