Hash tables in dynamic shared memory

Started by Thomas Munroover 9 years ago13 messages
#1Thomas Munro
thomas.munro@enterprisedb.com
2 attachment(s)

Hi hackers,

Here is an experimental hash table implementation that uses DSA memory
so that hash tables can be shared by multiple backends and yet grow
dynamically. Development name: "DHT".

It's impossible to write a general purpose hash table that will be
suitable for every use case, considering all the combinations of
design trade offs and subsets of functionality someone might want.
What I'm trying to do here is produce something that is generally
useful for many cases and easy to use, along the lines of dynahash,
but use dynamic shared memory. Here is the bullet point summary:

* DSA memory-backed
* open hashing (separate chaining)
* incremental resizing
* built-in partition-based locking
* concurrent iteration

There is other work being done in this space: I'm aware of Andres's
current thread[1]/messages/by-id/20161003041215.tfrifbeext3i7nkj@alap3.anarazel.de on Robin Hood-style closed hashing tables with
macro-ised size, hash and comparator operations à la C++ container
templates, and I'm following along with interest. He vigorously
rejects chaining hash tables as the natural enemies of memory
hardware. Obviously there are pros and cons: this node-based chaining
design allows resizing, iteration with stronger guarantees, and users
can point directly to (or into) the entries from other data
structures, but yeah... it has to follow pointers into far away cache
lines to do that. I guess Andres's simplehash should already work in
DSA memory nicely anyway since it doesn't have any internal pointers
IIUC (?). As for concurrency, Robert Haas also did some interesting
experiments[2]/messages/by-id/CA+TgmoYE4t-Pt+v08kMO5u_XN-HNKBWtfMgcUXEGBrQiVgdV9Q@mail.gmail.com with (nearly) lock-free hash tables and hazard
pointers. I'm not sure what the optimum number of different
implementations or levels of configurability is, and I'm certainly not
wedded to this particular set of design tradeoffs.

Potential use cases for DHT include caches, in-memory database objects
and working state for parallel execution. (Not a use case: the shared
hash table for my as-yet-unposted DSA-based shared parallel hash join
table work, which follows the existing open-coded dense_alloc-based
approach for reasons I'll explain later.)

There are a couple of things I'm not happy about in this version of
DHT: the space overhead per item is too high (hash + wasted padding +
next pointer), and the iterator system is overly complicated. I have
another version in development which gets rid of the hash and padding
and sometimes gets rid of the next pointer to achieve zero overhead,
and I'm working on a simpler iteration algorithm that keeps the
current guarantees but doesn't need to reorder bucket chains. More on
that, with some notes on performance and a testing module, soon.

Please find attached dht-v1.patch, which depends on dsa-v2.patch[3]/messages/by-id/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com.
I'm posting this version of DHT today because it is a dependency of
another patch due on this list shortly.

See also simple-cache.patch which contains a simple smoke test
extension so you can type SELECT simple_cache_put('42', 'Hello
world'), SELECT simple_cache_get('42') etc.

Thanks to my colleagues Amit Khandekar, Amul Sul, Dilip Kumar and John
Gorman for bug fixes and suggestions, and Robert Haas for feedback.

Please let me know your thoughts, and thanks for reading.

[1]: /messages/by-id/20161003041215.tfrifbeext3i7nkj@alap3.anarazel.de
[2]: /messages/by-id/CA+TgmoYE4t-Pt+v08kMO5u_XN-HNKBWtfMgcUXEGBrQiVgdV9Q@mail.gmail.com
[3]: /messages/by-id/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

dht-v1.patchapplication/octet-stream; name=dht-v1.patchDownload
diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index e99ebd2..0c80371 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = dsa.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
+OBJS = dht.o dsa.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
 	procsignal.o  shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \
 	sinvaladt.o standby.o
 
diff --git a/src/backend/storage/ipc/dht.c b/src/backend/storage/ipc/dht.c
new file mode 100644
index 0000000..0916a3f
--- /dev/null
+++ b/src/backend/storage/ipc/dht.c
@@ -0,0 +1,1426 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.c
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * This is an open hashing hash table, with a linked list at each table
+ * entry.  It supports dynamic resizing, as required to prevent the linked
+ * lists from growing too long on average.  Currently, only growing is
+ * supported: the hash table never becomes smaller.
+ *
+ * To deal with currency, it has a fixed size set of partitions, each of which
+ * is independently locked.  Each bucket maps to a partition; so insert, find
+ * and iterate operations normally only acquire one lock.  Therefore, good
+ * concurrency is achieved whenever they don't collide at the lock partition
+ * level.  However, when a resize operation begins, all partition locks must
+ * be acquired simultaneously for a brief period.  This is only expected to
+ * happen a small number of times until a stable size is found, since growth is
+ * geometric.
+ *
+ * Resizing is done incrementally so that no individual insert operation pays
+ * for the potentially large cost of splitting all buckets.  A resize
+ * operation begins when any partition exceeds a certain load factor, and
+ * splits every bucket into two new buckets but doesn't yet transfer any items
+ * into the new buckets.  Then, future dht_release operations involving an
+ * exclusive lock transfer one item from an old bucket into the appropriate
+ * new bucket.  If possible, an item from the already-locked partition is
+ * migrated.  If none remain, a lock on another partition will be acquired and
+ * one item from that partition will be migrated.  This continues until there
+ * are no items left in old buckets, and the resize operation is finished.
+ * While a resize is in progress, find, insert and delete operations must look
+ * in a new bucket and an old bucket, but this is not too expensive since they
+ * are covered by the same lock.  That is, a given hash value maps to the same
+ * lock partition, no matter how many times buckets have been split.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/ipc/dht.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/dht.h"
+#include "storage/dsa.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/memutils.h"
+
+/*
+ * An item in the hash table.  This wraps the user's entry object in an
+ * envelop that holds a pointer back to the bucket and a pointer to the next
+ * item in the bucket.
+ */
+struct dht_hash_table_item
+{
+	/* The hashed key, to avoid having to recompute it. */
+	dht_hash	hash;
+	/* The next item in the same bucket. */
+	dsa_pointer next;
+	/* The user's entry object follows here. */
+	char		entry[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* The number of partitions for locking purposes. */
+#define DHT_NUM_PARTITIONS_LOG2 7
+#define DHT_NUM_PARTITIONS (1 << DHT_NUM_PARTITIONS_LOG2)
+
+/* A magic value used to identify DHT hash tables. */
+#define DHT_MAGIC 0x75ff6a20
+
+/*
+ * Tracking information for each lock partition.  Initially, each partition
+ * corresponds to one bucket, but each time the hash table grows, the buckets
+ * covered by each partition split so the number of buckets covered doubles.
+ *
+ * We might want to add padding here so that each partition is on a different
+ * cache line, but doing so would bloat this structure considerably.
+ */
+typedef struct
+{
+	LWLock		lock;			/* Protects all buckets in this partition. */
+	Size		count;			/* # of items in active buckets */
+	Size		old_count;		/* # of items in old buckets */
+	Index		next_bucket_to_split;	/* do_increment_resize progress
+										 * tracker */
+} dht_partition;
+
+/*
+ * The head object for a hash table.  This will be stored in dynamic shared
+ * memory.
+ */
+typedef struct
+{
+	dht_hash_table_handle handle;
+	uint32		magic;
+	dht_partition partitions[DHT_NUM_PARTITIONS];
+	int			lwlock_tranche_id;
+	char		lwlock_tranche_name[NAMEDATALEN];
+
+	/*
+	 * The following members are written to only when ALL partitions locks are
+	 * held.  They can be read when any one partition lock is held.
+	 */
+
+	/* Number of buckets expressed as power of 2 (8 = 256 buckets). */
+	Size		size_log2;		/* log2(number of buckets) */
+	dsa_pointer buckets;		/* current bucket array */
+	dsa_pointer old_buckets;	/* previous bucket array, if splitting */
+}	dht_hash_table_control;
+
+/*
+ * Per-backend state for a dynamic hash table.
+ */
+struct dht_hash_table
+{
+	dsa_area   *area;			/* Backing dynamic shared memory area. */
+	dht_parameters params;		/* Parameters. */
+	dht_hash_table_control *control;	/* Control object in DSM. */
+	dsa_pointer *buckets;		/* Current bucket pointers in DSM. */
+	Size		size_log2;		/* log2(number of buckets) */
+	dsa_pointer *old_buckets;	/* Old bucket pointers in DSM. */
+	LWLockTranche lwlock_tranche;		/* Tranche ID for this table's
+										 * lwlocks. */
+	bool		exclusively_locked;		/* Is an exclusive partition lock
+										 * held? */
+};
+
+/* Given a pointer to an entry, find the pointer to the item that holds it. */
+#define ITEM_FROM_ENTRY(entry)										\
+	((dht_hash_table_item *)((char *)entry -						\
+							 offsetof(dht_hash_table_item, entry)))
+
+/* How many resize operations (bucket splits) have there been? */
+#define NUM_SPLITS(size_log2)					\
+	(size_log2 - DHT_NUM_PARTITIONS_LOG2)
+
+/* How many buckets are there in each partition at a given size? */
+#define BUCKETS_PER_PARTITION(size_log2)		\
+	(1 << NUM_SPLITS(size_log2))
+
+/* Max entries before we need to grow.  Half + quarter = 75% load factor. */
+#define MAX_COUNT_PER_PARTITION(hash_table)				\
+	(BUCKETS_PER_PARTITION(hash_table->size_log2) / 2 + \
+	 BUCKETS_PER_PARTITION(hash_table->size_log2) / 4)
+
+/* Choose partition based on the highest order bits of the hash. */
+#define PARTITION_FOR_HASH(hash)										\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - DHT_NUM_PARTITIONS_LOG2))
+
+/*
+ * Find the bucket index for a given hash and table size.  Each time the table
+ * doubles in size, the appropriate bucket for a given hash value doubles and
+ * possibly adds one, depending on the newly revealed bit, so that all buckets
+ * are split.
+ */
+#define BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, size_log2)		\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - (size_log2)))
+
+/* The index of the first bucket in a given partition. */
+#define BUCKET_INDEX_FOR_PARTITION(partition, size_log2)	\
+	((partition) << NUM_SPLITS(size_log2))
+
+/* The head of the active bucket for a given hash value (lvalue). */
+#define BUCKET_FOR_HASH(hash_table, hash)								\
+	(hash_table->buckets[												\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2)])
+
+/* The head of the old bucket for a given hash value (lvalue). */
+#define OLD_BUCKET_FOR_HASH(hash_table, hash)							\
+	(hash_table->old_buckets[											\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2 - 1)])
+
+/* Is a resize currently in progress? */
+#define RESIZE_IN_PROGRESS(hash_table)			\
+	(hash_table->old_buckets != NULL)
+
+static void delete_item(dht_hash_table *hash_table, dht_hash_table_item * item);
+static void advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next);
+static dsa_pointer next_in_bucket(dht_iterator *iterator);
+static bool begin_resize(dht_hash_table *hash_table, Size new_size);
+static void finish_resize(dht_hash_table *hash_table);
+static void do_incremental_resize(dht_hash_table *hash_table, int p);
+static void search_for_resize_work(dht_hash_table *hash_table,
+					   int start_partition);
+static inline void ensure_valid_bucket_pointers(dht_hash_table *hash_table);
+static inline dht_hash_table_item *find_in_bucket(dht_hash_table *hash_table,
+			   const void *key,
+			   dsa_pointer item_pointer);
+static void insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket);
+static dht_hash_table_item *insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket);
+static bool delete_key_from_bucket(dht_hash_table *hash_table, const void *key,
+					   dsa_pointer *bucket_head);
+static bool delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head);
+
+#define PARTITION_LOCK(hash_table, i)			\
+	(&hash_table->control->partitions[i].lock)
+
+/*
+ * Create a new hash table backed by the given dynamic shared area, with the
+ * given parameters.
+ */
+dht_hash_table *
+dht_create(dsa_area *area, const dht_parameters *params)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Allocate the control object in shared memory. */
+	control = dsa_allocate(area, sizeof(dht_hash_table_control));
+	if (!DsaPointerIsValid(control))
+	{
+		/* Can't allocate memory, give up. */
+		pfree(hash_table);
+		return NULL;
+	}
+
+	/* Set up the local and shared hash table structs. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->control->handle = control;
+	hash_table->control->magic = DHT_MAGIC;
+
+	/* Set up the tranche of locks, and register it in this process. */
+	hash_table->control->lwlock_tranche_id = params->tranche_id;
+	strcpy(hash_table->control->lwlock_tranche_name,
+		   params->tranche_name);
+	hash_table->lwlock_tranche.name = params->tranche_name;
+	hash_table->lwlock_tranche.array_base =
+		&hash_table->control->partitions[0].lock;
+	hash_table->lwlock_tranche.array_stride = sizeof(dht_partition);
+	LWLockRegisterTranche(hash_table->control->lwlock_tranche_id,
+						  &hash_table->lwlock_tranche);
+
+	/* Set up the array of lock partitions. */
+	{
+		int			i;
+
+		for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		{
+			LWLockInitialize(PARTITION_LOCK(hash_table, i),
+							 hash_table->control->lwlock_tranche_id);
+			hash_table->control->partitions[i].count = 0;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+
+	/*
+	 * Set up the initial array of buckets.  Our initial size is the same as
+	 * the number of partitions.
+	 */
+	hash_table->control->size_log2 = DHT_NUM_PARTITIONS_LOG2;
+	hash_table->control->buckets =
+		dsa_allocate(area, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+	hash_table->buckets = dsa_get_address(area,
+										  hash_table->control->buckets);
+	hash_table->control->old_buckets = InvalidDsaPointer;
+	hash_table->old_buckets = NULL;
+	memset(hash_table->buckets, 0, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+
+	return hash_table;
+}
+
+/*
+ * Attach to an existing hash table using a handle.
+ */
+dht_hash_table *
+dht_attach(dsa_area *area, const dht_parameters *params,
+		   dht_hash_table_handle handle)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Find the control object in shared memory. */
+	control = handle;
+
+	/* Set up the local hash table struct. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->exclusively_locked = false;
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/* Register the tranche of locks in this process. */
+	hash_table->lwlock_tranche.name =
+		hash_table->control->lwlock_tranche_name;
+	hash_table->lwlock_tranche.array_base =
+		&hash_table->control->partitions[0].lock;
+	hash_table->lwlock_tranche.array_stride = sizeof(dht_partition);
+	LWLockRegisterTranche(hash_table->control->lwlock_tranche_id,
+						  &hash_table->lwlock_tranche);
+
+	return hash_table;
+}
+
+/*
+ * Detach from a hash table.  This frees backend-local resources associated
+ * with the hash table, but the hash table will continue to exist until it is
+ * either explicitly destroyed (by a backend that is still attached to it), or
+ * the area that backs it is returned to the operating system.
+ */
+void
+dht_detach(dht_hash_table *hash_table)
+{
+	/* The hash table may have been destroyed.  Just free local memory. */
+	pfree(hash_table);
+}
+
+/*
+ * Destroy a hash table, returning all memory to the area.  The caller must be
+ * certain that no other backend will attempt to access the hash table before
+ * calling this function.  Other backend must explicitly call dht_detach to
+ * free up backend-local memory associated with the hash table.  The backend
+ * that calls dht_destroy must not call dht_detach.
+ */
+void
+dht_destroy(dht_hash_table *hash_table)
+{
+	dht_iterator iterator;
+	void	   *entry;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/* Free all the entries. */
+	dht_iterate_begin(hash_table, &iterator, true);
+	while ((entry = dht_iterate_next(&iterator)))
+		dht_iterate_delete(&iterator);
+	dht_iterate_end(&iterator);
+
+	/*
+	 * Vandalize the control block to help catch programming errors where
+	 * other backends access the memory formerly occupied by this hash table.
+	 */
+	hash_table->control->magic = 0;
+
+	/* Free the active and (if appropriate) old table, and control object. */
+	if (DsaPointerIsValid(hash_table->old_buckets))
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+	dsa_free(hash_table->area, hash_table->control->buckets);
+	dsa_free(hash_table->area, hash_table->control->handle);
+
+	pfree(hash_table);
+}
+
+/*
+ * Get a handle that can be used by other processes to attach to this hash
+ * table.
+ */
+dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	return hash_table->control->handle;
+}
+
+/*
+ * Look up an entry, given a key.  Returns a pointer to an entry if one can be
+ * found with the given key.  Returns NULL if the key is not found.  If a
+ * non-NULL value is returned, the entry is locked and must be released by
+ * calling dht_release.  If an error is raised before dht_release is called,
+ * the lock will be released automatically, but the caller must take care to
+ * ensure that the entry is not left corrupted.  The lock mode is either
+ * shared or exclusive depending on 'exclusive'.
+ *
+ * Note that the lock held is in fact an LWLock, so interrupts will be held
+ * on return from this function, and not resumed until dht_release is called.
+ * It is a very good idea for the caller to release the lock quickly.
+ */
+void *
+dht_find(dht_hash_table *hash_table, const void *key, bool exclusive)
+{
+	Size		hash;
+	Size		partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition = PARTITION_FOR_HASH(hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition),
+				  exclusive ? LW_EXCLUSIVE : LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (!item)
+	{
+		/* Not found. */
+		LWLockRelease(PARTITION_LOCK(hash_table, partition));
+		return NULL;
+	}
+	else
+	{
+		/* The caller will free the lock by calling dht_release. */
+		hash_table->exclusively_locked = exclusive;
+		return &item->entry;
+	}
+}
+
+/*
+ * Returns a pointer to an exclusively locked item which must be released with
+ * dht_release.  If the key is found in the hash table, 'found' is set to true
+ * and a pointer to the existing entry is returned.  If the key is not found,
+ * 'found' is set to false, and a pointer to a newly created entry is related.
+ */
+void *
+dht_find_or_insert(dht_hash_table *hash_table,
+				   const void *key,
+				   bool *found)
+{
+	Size		hash;
+	Size		partition_index;
+	dht_partition *partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition_index = PARTITION_FOR_HASH(hash);
+	partition = &hash_table->control->partitions[partition_index];
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+restart:
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition_index),
+				  LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (item)
+		*found = true;
+	else
+	{
+		*found = false;
+
+		/* Check if we are getting too full. */
+		if (partition->count > MAX_COUNT_PER_PARTITION(hash_table) &&
+			!RESIZE_IN_PROGRESS(hash_table))
+		{
+			/*
+			 * The load factor (= keys / buckets) for all buckets protected by
+			 * this partition is > 0.75.  Presumably the same applies
+			 * generally across the whole hash table (though we don't attempt
+			 * to track that directly to avoid contention on some kind of
+			 * central counter; we just assume that this partition is
+			 * representative).  This is a good time to start a new resize
+			 * operation, if there isn't one already in progress.
+			 *
+			 * Give up our existing lock first, because resizing needs to
+			 * reacquire all the locks in the right order.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+			if (!begin_resize(hash_table, hash_table->size_log2 + 1))
+			{
+				/* Out of memory. */
+				return NULL;
+			}
+
+			goto restart;
+		}
+
+		/* Finally we can try to insert the new item. */
+		item = insert_into_bucket(hash_table, key,
+								  &BUCKET_FOR_HASH(hash_table, hash));
+		if (item)
+		{
+			item->hash = hash;
+			/* Adjust per-lock-partition counter for load factor knowledge. */
+			++partition->count;
+		}
+	}
+
+	if (!item)
+	{
+		/* Out of memory. */
+		LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+		return NULL;
+	}
+	else
+	{
+		/* The caller must release the lock with dht_release. */
+		hash_table->exclusively_locked = true;
+		return &item->entry;
+	}
+}
+
+/*
+ * Remove an entry by key.  Returns true if the key was found and the
+ * corresponding entry was removed.
+ *
+ * To delete an entry that you already have a pointer to, see
+ * dht_delete_entry.
+ */
+bool
+dht_delete_key(dht_hash_table *hash_table, const void *key)
+{
+	Size		hash;
+	Size		partition;
+	bool		found;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	partition = PARTITION_FOR_HASH(hash);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition), LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_key_from_bucket(hash_table, key,
+							   &BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].count;
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_key_from_bucket(hash_table, key,
+									&OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].old_count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].old_count;
+	}
+	else
+		found = false;
+
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+
+	return found;
+}
+
+/*
+ * Remove an entry.  The entry must already be exclusively locked, and must
+ * have been obtained by dht_find or dht_find_or_insert.  Note that this
+ * function releases the lock just like dht_release.
+ *
+ * To delete an entry by key, see dht_delete_key.  To delete an entry while
+ * iterating, see dht_iterator_delete.
+ */
+void
+dht_delete_entry(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	Size		partition = PARTITION_FOR_HASH(item->hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(hash_table->exclusively_locked);
+
+	delete_item(hash_table, item);
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+}
+
+/*
+ * Unlock an entry which was locked by dht_find or dht_find_or_insert.
+ */
+void
+dht_release(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	Size		partition_index = PARTITION_FOR_HASH(item->hash);
+	bool		deferred_resize_work = false;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/*
+	 * If there is a resize in progress and we took an exclusive lock, try to
+	 * migrate a key from the old bucket array to the new bucket array.
+	 * Ideally, we'll find some work to do in the partition on which we
+	 * already hold a lock, but if not, we'll make a note to search other lock
+	 * partitions after releasing the lock we currently hold.
+	 */
+	if (RESIZE_IN_PROGRESS(hash_table) && hash_table->exclusively_locked)
+	{
+		dht_partition *partition;
+
+		partition = &hash_table->control->partitions[partition_index];
+
+		if (partition->old_count > 0)
+		{
+			/* Do some resizing work in this partition. */
+			do_incremental_resize(hash_table, partition_index);
+		}
+		else
+		{
+			/* Nothing left to do in this partition.  Look elsewhere. */
+			deferred_resize_work = true;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+
+	/*
+	 * If we need to contribute some work to the ongoing resize effort, and we
+	 * didn't find something to migrate in our own partition, go search for
+	 * such work in other partitions.  We'll start with the one after ours.
+	 */
+	if (deferred_resize_work)
+		search_for_resize_work(hash_table,
+							   (partition_index + 1) % DHT_NUM_PARTITIONS);
+}
+
+/*
+ * Begin iterating through the whole hash table.  The caller must supply a
+ * dht_iterator object, which can then be used to call dht_iterate_next to get
+ * values until the end is reached.
+ */
+void
+dht_iterate_begin(dht_hash_table *hash_table,
+				  dht_iterator *iterator,
+				  bool exclusive)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	iterator->hash_table = hash_table;
+	iterator->exclusive = exclusive;
+	iterator->partition = 0;
+	iterator->bucket = 0;
+	iterator->item = NULL;
+	iterator->locked = false;
+
+	/* Snapshot the size (arbitrary lock to prevent size changing). */
+	LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+	iterator->table_size_log2 = hash_table->size_log2;
+	LWLockRelease(PARTITION_LOCK(hash_table, 0));
+}
+
+/*
+ * Delete the entry that was most recently returned by dht_iterate_next.  The
+ * iterator must have been initialized in exclusive lock mode.
+ */
+void
+dht_iterate_delete(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	Assert(iterator->exclusive);
+	Assert(iterator->item != NULL);
+	delete_item(iterator->hash_table, iterator->item);
+	iterator->item = NULL;
+}
+
+/*
+ * Move to the next item in the hash table.  Returns a pointer to an entry, or
+ * NULL if the end of the hash table has been reached.  The item is locked in
+ * exclusive or shared mode depending on the argument given to
+ * dht_iterate_begin.  The caller can optionally release the lock by calling
+ * dht_iterate_release, and then call dht_iterate_next again to move to the
+ * next entry.  If the iteration is in exclusive mode, client code can also
+ * call dht_iterate_delete.  When the end of the hash table is reached, or at
+ * any time, the client may call dht_iterate_end to abandon iteration.
+ */
+void *
+dht_iterate_next(dht_iterator *iterator)
+{
+	dsa_pointer item_pointer;
+
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+
+	while (iterator->partition < DHT_NUM_PARTITIONS)
+	{
+		if (!iterator->locked)
+		{
+			LWLockAcquire(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition),
+						  iterator->exclusive ? LW_EXCLUSIVE : LW_SHARED);
+			iterator->locked = true;
+		}
+
+		item_pointer = next_in_bucket(iterator);
+		if (DsaPointerIsValid(item_pointer))
+		{
+			/* Remember this item, so that we can step over it next time. */
+			iterator->last_item_pointer = item_pointer;
+			iterator->item = dsa_get_address(iterator->hash_table->area,
+											 item_pointer);
+			return &(iterator->item->entry);
+		}
+
+		/* We have reached the end of the bucket. */
+		iterator->last_item_pointer = InvalidDsaPointer;
+		iterator->bucket += 1;
+
+		/*
+		 * If the last bucket was split while we were iterating, then we have
+		 * been doing some expensive bucket merging to get each item.  Now
+		 * that we've completed that whole pre-split bucket (by merging all
+		 * its descendants), we can update the table size of our iterator and
+		 * start iterating cheaply again.
+		 */
+		iterator->bucket <<=
+			iterator->hash_table->size_log2 - iterator->table_size_log2;
+		iterator->table_size_log2 = iterator->hash_table->size_log2;
+		/* Have we gone past the last bucket in this partition? */
+		if (iterator->bucket >=
+			BUCKET_INDEX_FOR_PARTITION(iterator->partition + 1,
+									   iterator->table_size_log2))
+		{
+			/* No more buckets.  Try next partition. */
+			LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition));
+			iterator->locked = false;
+			++iterator->partition;
+			iterator->bucket =
+				BUCKET_INDEX_FOR_PARTITION(iterator->partition,
+										   iterator->table_size_log2);
+		}
+	}
+	iterator->item = NULL;
+	return NULL;
+}
+
+/*
+ * Release the most recently obtained lock.  This can optionally be called in
+ * between calls to dht_iterator_next to allow other processes to access the
+ * same partition of the hash table.
+ */
+void
+dht_iterate_release(dht_iterator *iterator)
+{
+	Assert(iterator->locked);
+	LWLockRelease(PARTITION_LOCK(iterator->hash_table, iterator->partition));
+	iterator->locked = false;
+}
+
+/*
+ * Terminate iteration.  This must be called after iteration completes,
+ * whether or not the end was reached.  The iterator object may then be reused
+ * for another iteration.
+ */
+void
+dht_iterate_end(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	if (iterator->locked)
+		LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+									 iterator->partition));
+}
+
+/*
+ * Print out debugging information about the internal state of the hash table.
+ */
+void
+dht_dump(dht_hash_table *hash_table)
+{
+	Size		i;
+	Size		j;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
+
+	ensure_valid_bucket_pointers(hash_table);
+
+	fprintf(stderr,
+			"hash table size = %zu\n", (Size) 1 << hash_table->size_log2);
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+		Size		begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2);
+		Size		end = BUCKET_INDEX_FOR_PARTITION(i + 1, hash_table->size_log2);
+
+		fprintf(stderr, "  partition %zu\n", i);
+		fprintf(stderr,
+				"    active buckets (key count = %zu)\n", partition->count);
+
+		for (j = begin; j < end; ++j)
+		{
+			Size		count = 0;
+			dsa_pointer bucket = hash_table->buckets[j];
+
+			while (DsaPointerIsValid(bucket))
+			{
+				dht_hash_table_item *item;
+
+				item = dsa_get_address(hash_table->area, bucket);
+
+				bucket = item->next;
+				++count;
+			}
+			fprintf(stderr, "      bucket %zu (key count = %zu)\n", j, count);
+		}
+		if (RESIZE_IN_PROGRESS(hash_table))
+		{
+			Size		begin;
+			Size		end;
+
+			begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2 - 1);
+			end = BUCKET_INDEX_FOR_PARTITION(i + 1,
+											 hash_table->size_log2 - 1);
+
+			fprintf(stderr, "    old buckets (key count = %zu)\n",
+					partition->old_count);
+
+			for (j = begin; j < end; ++j)
+			{
+				Size		count = 0;
+				dsa_pointer bucket = hash_table->old_buckets[j];
+
+				while (DsaPointerIsValid(bucket))
+				{
+					dht_hash_table_item *item;
+
+					item = dsa_get_address(hash_table->area, bucket);
+
+					bucket = item->next;
+					++count;
+				}
+				fprintf(stderr,
+						"      bucket %zu (key count = %zu)\n", j, count);
+			}
+		}
+	}
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
+
+/*
+ * Delete a locked item to which we have a pointer.
+ */
+static void
+delete_item(dht_hash_table *hash_table, dht_hash_table_item * item)
+{
+	Size		hash = item->hash;
+	Size		partition = PARTITION_FOR_HASH(hash);
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, partition)));
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_item_from_bucket(hash_table, item,
+								&BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].count;
+		Assert(hash_table->control->partitions[partition].count >= 0);
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_item_from_bucket(hash_table, item,
+									 &OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].old_count;
+		Assert(hash_table->control->partitions[partition].old_count >= 0);
+	}
+	else
+	{
+		Assert(false);
+	}
+}
+
+/*
+ * Find the item that comes after iterator->last_item_pointer but before
+ * *next, and write it into *next.
+ *
+ * Calling this on several buckets allows us to 'merge' the items from a
+ * bucket with its ancestors and descendents during resize operations, and
+ * iterate through the items in all of them in a stable order, despite items
+ * being able to move while we iterate.
+ *
+ * This works because buckets are sorted by descending item pointer.
+ */
+static void
+advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next)
+{
+	dht_hash_table_item *item;
+
+	while (DsaPointerIsValid(bucket_head))
+	{
+		item = dsa_get_address(iterator->hash_table->area,
+							   bucket_head);
+		if ((!DsaPointerIsValid(iterator->last_item_pointer) ||
+			 bucket_head < iterator->last_item_pointer) &&
+			bucket_head > *next &&
+			BUCKET_INDEX_FOR_HASH_AND_SIZE(item->hash,
+										   iterator->table_size_log2) ==
+			iterator->bucket)
+		{
+			*next = bucket_head;
+			return;
+		}
+		bucket_head = item->next;
+	}
+}
+
+/*
+ * Find the next item in the bucket we are visiting.  If a resize is in
+ * progress or the bucket has been split since we started iterating over this
+ * bucket, we may need to combine items from multiple buckets to synthesize
+ * the original bucket.
+ */
+static dsa_pointer
+next_in_bucket(dht_iterator *iterator)
+{
+	Size		bucket;
+	Size		begin;
+	Size		end;
+	dsa_pointer next = InvalidDsaPointer;
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition)));
+
+	/* Handle old buckets if a resize is in progress. */
+	if (RESIZE_IN_PROGRESS(iterator->hash_table))
+	{
+		if (iterator->table_size_log2 == iterator->hash_table->size_log2)
+		{
+			/*
+			 * The table hasn't grown since we started iterating, but there
+			 * was already a resize in progress so there may be items in an
+			 * old bucket that belong in the bucket we're iterating over which
+			 * haven't been migrated yet.
+			 */
+			advance_iterator(iterator,
+					 iterator->hash_table->old_buckets[iterator->bucket / 2],
+							 &next);
+		}
+		else
+		{
+			/*
+			 * The bucket has been split one or more times, and a resize is
+			 * still in progress so there may be items still in an old bucket.
+			 * If N splits have occurred, there will be 2^N current buckets
+			 * that correspond to the one original bucket.
+			 */
+			Assert(iterator->hash_table->size_log2 > iterator->table_size_log2);
+			begin = iterator->bucket <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			end = (iterator->bucket + 1) <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			for (bucket = begin; bucket < end; bucket += 1)
+				advance_iterator(iterator,
+								 iterator->hash_table->old_buckets[bucket],
+								 &next);
+		}
+	}
+
+	/*
+	 * Scan the bucket.  This is a loop because it may have split any number
+	 * of times since we started iterating through it, but in the common case
+	 * we just loop once.
+	 */
+	begin = iterator->bucket <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	end = (iterator->bucket + 1) <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	for (bucket = begin; bucket < end; ++bucket)
+		advance_iterator(iterator,
+						 iterator->hash_table->buckets[iterator->bucket],
+						 &next);
+
+	return next;
+}
+
+/*
+ * Grow the hash table if necessary to the requested number of buckets.  The
+ * requested size must be double some previously observed size.  Returns true
+ * if the table was successfully expanded or found to be big enough already
+ * (because another backend expanded it).  Returns false for out-of-memory.
+ * The process of transferring items from the old table to the new one will be
+ * carried out with a small tax on all future operations until the work is
+ * done.
+ */
+static bool
+begin_resize(dht_hash_table *hash_table, Size new_size_log2)
+{
+	int			i;
+	dsa_pointer new_buckets;
+	bool		result;
+
+	/*
+	 * Acquire the locks for all lock partitions.  This is expensive, but we
+	 * shouldn't have to do it many times.
+	 */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0 && hash_table->control->size_log2 >= new_size_log2)
+		{
+			/*
+			 * Another backend has already increased the size; we can avoid
+			 * obtaining all the locks and return early.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, 0));
+			return true;
+		}
+	}
+
+	Assert(new_size_log2 == hash_table->control->size_log2 + 1);
+	Assert(!RESIZE_IN_PROGRESS(hash_table));
+
+	/* Allocate the space for the new table. */
+	new_buckets = dsa_allocate(hash_table->area,
+							   sizeof(dsa_pointer) * (1 << new_size_log2));
+	if (DsaPointerIsValid(new_buckets))
+	{
+		result = true;
+		hash_table->control->old_buckets = hash_table->control->buckets;
+		hash_table->control->buckets = new_buckets;
+		hash_table->control->size_log2 = new_size_log2;
+		hash_table->buckets = dsa_get_address(hash_table->area,
+											  hash_table->control->buckets);
+		hash_table->old_buckets =
+			dsa_get_address(hash_table->area,
+							hash_table->control->old_buckets);
+		hash_table->size_log2 = new_size_log2;
+		/* InvalidDsaPointer in every bucket */
+		memset(hash_table->buckets, 0,
+			   sizeof(dsa_pointer) * (1 << new_size_log2));
+		for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		{
+			dht_partition *partition = &hash_table->control->partitions[i];
+
+			partition->old_count = partition->count;
+			partition->count = 0;
+			partition->next_bucket_to_split =
+				BUCKET_INDEX_FOR_PARTITION(i, new_size_log2 - 1);
+		}
+	}
+	else
+	{
+		/* Out of memory. */
+		result = false;
+	}
+
+	/* Release all the locks. */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+
+	return result;
+}
+
+/*
+ * Check if the resize operation is finished.  If it is, then the old table
+ * can be returned to the shared memory area.
+ */
+static void
+finish_resize(dht_hash_table *hash_table)
+{
+	int			i,
+				j;
+	bool		finished = true;
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0)
+		{
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* Another backend has already finished the resize. */
+				LWLockRelease(PARTITION_LOCK(hash_table, i));
+				return;
+			}
+		}
+
+		/*
+		 * We can only actually finish the resize when every partition has no
+		 * more keys in old buckets.  This might not be true yet, because
+		 * search_for_resize_work calls us when it thinks there might be no
+		 * more resizing work to do, but it could be wrong because it judges
+		 * that with an unsynchronized view of memory.  Now that we can
+		 * examine the synchronized counters, we can check if the resize
+		 * really is complete.
+		 */
+		if (partition->old_count > 0)
+		{
+			finished = false;
+			break;
+		}
+	}
+
+	if (finished)
+	{
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+		hash_table->control->old_buckets = InvalidDsaPointer;
+	}
+
+	/*
+	 * Only release as many locks as we acquired.  When finished is true, this
+	 * will be all of them, but if we exited the lock-acquire loop early it
+	 * might be fewer.
+	 */
+	for (j = 0; j < i; ++j)
+		LWLockRelease(PARTITION_LOCK(hash_table, j));
+}
+
+/*
+ * Migrate one item from partition 'p' from an old bucket to the appropriate
+ * new bucket.  The partition lock must already be held and there must be at
+ * least one item to be migrated.
+ */
+static void
+do_incremental_resize(dht_hash_table *hash_table, int p)
+{
+	Size		old_size_log2 = hash_table->control->size_log2 - 1;
+	dht_partition *partition = &hash_table->control->partitions[p];
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, p)));
+	Assert(partition->old_count > 0);
+
+	/*
+	 * Walk through all the buckets that are covered by this partition until
+	 * we find one that is not empty, and migrate one item from it.
+	 */
+	for (;;)
+	{
+		dsa_pointer item_pointer;
+
+		item_pointer =
+			hash_table->old_buckets[partition->next_bucket_to_split];
+
+		if (DsaPointerIsValid(item_pointer))
+		{
+			dht_hash_table_item *item;
+
+			item = dsa_get_address(hash_table->area, item_pointer);
+
+			/* Remove it from the old bucket. */
+			hash_table->old_buckets[partition->next_bucket_to_split] =
+				item->next;
+			--partition->old_count;
+
+			/* Add it to the active bucket. */
+			insert_item_into_bucket(hash_table,
+									item_pointer,
+									item,
+									&BUCKET_FOR_HASH(hash_table, item->hash));
+			++partition->count;
+
+			return;
+		}
+		else
+		{
+			/* Move to the next bucket protected by this partition. */
+			partition->next_bucket_to_split += 1;
+
+			/*
+			 * We shouldn't be able to reach past the end of the partition
+			 * without finding an item.  That would imply that the partition's
+			 * old_count was corrupted.
+			 *
+			 * This is an elog() because we want to catch this even in builds
+			 * that are not Assert-enabled.
+			 */
+			if (partition->next_bucket_to_split >=
+				BUCKET_INDEX_FOR_PARTITION(p + 1, old_size_log2))
+				elog(ERROR, "dht corrupted");
+		}
+	}
+}
+
+/*
+ * Try to find a partition that has outstanding resizing work and do some of
+ * that work, possibly completing the resize operation.  The caller must hold
+ * no locks.
+ *
+ * Consider 'start_partition' first.  dht_release tries to spread this around
+ * to try to reduce contention.
+ */
+static void
+search_for_resize_work(dht_hash_table *hash_table, int start_partition)
+{
+	int			i = start_partition;
+
+	Assert(start_partition >= 0);
+	Assert(start_partition < DHT_NUM_PARTITIONS);
+
+	/*
+	 * Using a non-synchronized view of memory (no locks), see if we can find
+	 * any partition that looks like it still has keys to be migrated.
+	 */
+	do
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		if (partition->old_count > 0)
+		{
+			bool		done = false;
+
+			/* Now check again with a lock. */
+			LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* The resize finished underneath us. */
+				done = true;
+			}
+			else if (partition->old_count > 0)
+			{
+				/* There is in fact work to be done in this partition. */
+				do_incremental_resize(hash_table, i);
+				done = true;
+			}
+			LWLockRelease(PARTITION_LOCK(hash_table, i));
+
+			if (done)
+				return;
+		}
+		i = (i + 1) % DHT_NUM_PARTITIONS;
+	} while (i != start_partition);
+
+	/*
+	 * We didn't find any partitions that seem to have remaining migration
+	 * work, though we can't be 100% sure due to unsynchronized reads of
+	 * memory.  We will try to finish the resize, which may or may not
+	 * actually succeed, depending on what it finds the true state to be.
+	 */
+	finish_resize(hash_table);
+}
+
+/*
+ * Make sure that our backend-local bucket pointers are up to date.  The
+ * caller must have locked one lock partition, which prevents begin_resize()
+ * from running concurrently.
+ */
+static inline void
+ensure_valid_bucket_pointers(dht_hash_table *hash_table)
+{
+	if (hash_table->size_log2 != hash_table->control->size_log2)
+	{
+		/*
+		 * A resize has started since we last looked, so our pointers are out
+		 * of date.
+		 */
+		hash_table->buckets = dsa_get_address(hash_table->area,
+											  hash_table->control->buckets);
+		if (DsaPointerIsValid(hash_table->control->old_buckets))
+			hash_table->old_buckets =
+				dsa_get_address(hash_table->area,
+								hash_table->control->old_buckets);
+		else
+			hash_table->old_buckets = NULL;
+		hash_table->size_log2 = hash_table->control->size_log2;
+	}
+	if (hash_table->old_buckets &&
+		!DsaPointerIsValid(hash_table->control->old_buckets))
+	{
+		/* A resize has finished since we last looked. */
+		hash_table->old_buckets = NULL;
+	}
+}
+
+/*
+ * Scan a locked bucket for a match, using the provided compare-function.
+ */
+static inline dht_hash_table_item *
+find_in_bucket(dht_hash_table *hash_table, const void *key,
+			   dsa_pointer item_pointer)
+{
+	while (DsaPointerIsValid(item_pointer))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, item_pointer);
+		if (hash_table->params.compare_function(key, &item->entry,
+										   hash_table->params.key_size) == 0)
+			return item;
+		item_pointer = item->next;
+	}
+	return NULL;
+}
+
+/*
+ * Insert an already-allocated item into a bucket.
+ */
+static void
+insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket)
+{
+	Assert(item == dsa_get_address(hash_table->area, item_pointer));
+
+	/*
+	 * Find the insertion point for this item in the bucket.  Buckets are
+	 * sorted by descending dsa_pointer, as part of the protocol for ensuring
+	 * that iterators don't visit items twice when buckets are split during a
+	 * scan.  (We don't actually expect them to have more than 1 item unless
+	 * the hash function is of low quality.)
+	 */
+	while (*bucket > item_pointer)
+	{
+		dht_hash_table_item *this_item;
+
+		this_item = dsa_get_address(hash_table->area, *bucket);
+
+		bucket = &this_item->next;
+	}
+
+	item->next = *bucket;
+	*bucket = item_pointer;
+}
+
+/*
+ * Allocate space for an entry with the given key and insert it into the
+ * provided bucket.
+ */
+static dht_hash_table_item *
+insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket)
+{
+	dsa_pointer item_pointer;
+	dht_hash_table_item *item;
+
+	item_pointer = dsa_allocate(hash_table->area,
+								hash_table->params.entry_size +
+								offsetof(dht_hash_table_item, entry));
+	if (!DsaPointerIsValid(item_pointer))
+		/* Out of memory. */
+		return NULL;
+
+	item = dsa_get_address(hash_table->area, item_pointer);
+	memcpy(&(item->entry), key, hash_table->params.key_size);
+	insert_item_into_bucket(hash_table, item_pointer, item, bucket);
+	return item;
+}
+
+/*
+ * Search a bucket for a matching key and delete it.
+ */
+static bool
+delete_key_from_bucket(dht_hash_table *hash_table,
+					   const void *key,
+					   dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (hash_table->params.compare_function(key, &item->entry,
+												hash_table->params.key_size)
+			== 0)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &item->next;
+	}
+	return false;
+}
+
+/*
+ * Delete the specified item from the bucket.
+ */
+static bool
+delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *bucket_item;
+
+		bucket_item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (bucket_item == item)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &bucket_item->next;
+	}
+	return false;
+}
diff --git a/src/include/storage/dht.h b/src/include/storage/dht.h
new file mode 100644
index 0000000..ccf9d17
--- /dev/null
+++ b/src/include/storage/dht.h
@@ -0,0 +1,117 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.h
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/include/storage/dht.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DHT_H
+#define DHT_H
+
+#include "postgres.h"
+
+#include "storage/dsa.h"
+
+/* The opaque type representing a hash table. */
+struct dht_hash_table;
+typedef struct dht_hash_table dht_hash_table;
+
+/* The opaque type used for tracking iterator state. */
+struct dht_iterator;
+typedef struct dht_iterator dht_iterator;
+
+/* A handle for a dht_hash_table which can be shared with other processes. */
+typedef dsa_pointer dht_hash_table_handle;
+
+/* The type for hash values. */
+typedef uint32 dht_hash;
+
+/*
+ * The function type for computing hash values for keys.  Compatible with
+ * HashValueFunc in hsearch.h and hash functions like tag_hash.
+ */
+typedef dht_hash (*dht_hash_function)(const void *v, Size size);
+
+/*
+ * The function used for comparing keys.  Compatible with HashCompareFunction
+ * in hsearch.h and memcmp.
+ */
+typedef int (*dht_compare_function)(const void *a, const void *b, Size size);
+
+/*
+ * The set of parameters needed to create or attach to a hash table.  The
+ * members tranche_id and tranche_name do not need to be initialized when
+ * attaching to an existing hash table.
+ */
+typedef struct
+{
+	Size key_size;				/* Size of the key (initial bytes of entry) */
+	Size entry_size;			/* Total size of entry */
+	dht_compare_function compare_function;	/* Compare function */
+	dht_hash_function hash_function;	/* Hash function */
+	int tranche_id;				/* The tranche ID to use for locks. */
+	char tranche_name[NAMEDATALEN];	/* The tranche name to use for locks. */
+} dht_parameters;
+
+/* Forward declaration of private types for use only by dht.c. */
+struct dht_hash_table_bucket;
+struct dht_hash_table_item;
+typedef struct dht_hash_table_item dht_hash_table_item;
+typedef struct dht_hash_table_bucket dht_hash_table_bucket;
+
+/*
+ * The state used to track a walk over all entries in a hash table.  The
+ * members of this struct are only for use by code in dht.c, but it is
+ * included in the header because it's useful to be able to create objects of
+ * this type on the stack to pass into the dht_iterator_XXX functions.
+ */
+struct dht_iterator
+{
+	dht_hash_table *hash_table;	/* The hash table we are iterating over. */
+	bool exclusive;				/* Whether to lock buckets exclusively. */
+	Size partition;				/* The index of the next partition to visit. */
+	Size bucket;				/* The index of the next bucket to visit. */
+	dht_hash_table_item *item;  /* The most recently returned item. */
+	dsa_pointer last_item_pointer; /* The last item visited. */
+	Size table_size_log2; 	/* The table size when we started iterating. */
+	bool locked;			/* Whether the current partition is locked. */
+};
+
+/* Creating, sharing and destroying from hash tables. */
+extern dht_hash_table *dht_create(dsa_area *area,
+								  const dht_parameters *params);
+extern dht_hash_table *dht_attach(dsa_area *area,
+								  const dht_parameters *params,
+								  dht_hash_table_handle handle);
+extern void dht_detach(dht_hash_table *hash_table);
+extern dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table);
+extern void dht_destroy(dht_hash_table *hash_table);
+
+/* Iterating over the whole hash table. */
+extern void dht_iterate_begin(dht_hash_table *hash_table,
+							  dht_iterator *iterator, bool exclusive);
+extern void *dht_iterate_next(dht_iterator *iterator);
+extern void dht_iterate_delete(dht_iterator *iterator);
+extern void dht_iterate_release(dht_iterator *iterator);
+extern void dht_iterate_end(dht_iterator *iterator);
+
+/* Finding, creating, deleting entries. */
+extern void *dht_find(dht_hash_table *hash_table,
+					  const void *key, bool exclusive);
+extern void *dht_find_or_insert(dht_hash_table *hash_table,
+								const void *key, bool *found);
+extern bool dht_delete_key(dht_hash_table *hash_table, const void *key);
+extern void dht_delete_entry(dht_hash_table *hash_table, void *entry);
+extern void dht_release(dht_hash_table *hash_table, void *entry);
+
+/* Debugging support. */
+extern void dht_dump(dht_hash_table *hash_table);
+
+#endif   /* DHT_H */
simple-cache.patchapplication/octet-stream; name=simple-cache.patchDownload
diff --git a/src/test/modules/simple_cache/Makefile b/src/test/modules/simple_cache/Makefile
new file mode 100644
index 0000000..b9a0b28
--- /dev/null
+++ b/src/test/modules/simple_cache/Makefile
@@ -0,0 +1,18 @@
+# src/test/modules/simple_cache/Makefile
+
+MODULES = simple_cache
+
+EXTENSION = simple_cache
+DATA = simple_cache--1.0.sql
+PGFILEDESC = "simple_cache -- a simple cache extension"
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/simple_cache
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/simple_cache/simple_cache--1.0.sql b/src/test/modules/simple_cache/simple_cache--1.0.sql
new file mode 100644
index 0000000..d5ce5fa
--- /dev/null
+++ b/src/test/modules/simple_cache/simple_cache--1.0.sql
@@ -0,0 +1,39 @@
+/* src/test/modules/simple_cache/simple_cache--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION simple_cache" to load this file. \quit
+
+CREATE FUNCTION simple_cache_put(key text, value text)
+RETURNS boolean
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION simple_cache_get(key text)
+RETURNS pg_catalog.text
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION simple_cache_drop(key text)
+RETURNS boolean
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION simple_cache_list(out key text, out value text)
+RETURNS setof record
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION simple_cache_set_size_limit(n integer)
+RETURNS VOID
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION simple_cache_dump_area()
+RETURNS VOID
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
+
+CREATE FUNCTION simple_cache_dump_hash_table()
+RETURNS VOID
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
diff --git a/src/test/modules/simple_cache/simple_cache.c b/src/test/modules/simple_cache/simple_cache.c
new file mode 100644
index 0000000..7ea149d
--- /dev/null
+++ b/src/test/modules/simple_cache/simple_cache.c
@@ -0,0 +1,393 @@
+/* -------------------------------------------------------------------------
+ *
+ * simple_cache.c
+ *		A toy module to demonstrate DHT hash tables.
+ *
+ * Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/simple_cache/simple_cache.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "storage/dht.h"
+#include "storage/dsa.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/hsearch.h" /* for tag_hash */
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(simple_cache_put);
+PG_FUNCTION_INFO_V1(simple_cache_get);
+PG_FUNCTION_INFO_V1(simple_cache_drop);
+PG_FUNCTION_INFO_V1(simple_cache_list);
+PG_FUNCTION_INFO_V1(simple_cache_set_size_limit);
+PG_FUNCTION_INFO_V1(simple_cache_dump_area);
+PG_FUNCTION_INFO_V1(simple_cache_dump_hash_table);
+
+void _PG_init(void);
+
+/*
+ * The shmem struct we use to give dynamic shared area and hash table handles
+ * to other processes.
+ */
+typedef struct
+{
+	/* The handle for attaching to the shared area. */
+	dsa_handle area_handle;
+	/* The handle for attaching to the hash table. */
+	dht_hash_table_handle hash_table_handle;
+} simple_cache_shared;
+
+/*
+ * The state we need for each backend.
+ */
+typedef struct
+{
+	/* The shared memory area. */
+	dsa_area *area;
+	/* The hash table. */
+	dht_hash_table *hash_table;
+} simple_cache_state;
+
+#define SIMPLE_CACHE_KEY_SIZE 48
+
+typedef struct
+{
+	char key[SIMPLE_CACHE_KEY_SIZE];
+	Size value_size;
+	dsa_pointer value;
+} simple_cache_entry;
+
+void
+_PG_init(void)
+{
+}
+
+static simple_cache_state *
+get_simple_cache_state(void)
+{
+	static simple_cache_state result = { 0 };
+
+	if (result.hash_table == NULL)
+	{
+		dht_parameters params = {
+			SIMPLE_CACHE_KEY_SIZE,
+			sizeof(simple_cache_entry),
+			memcmp,
+			tag_hash,
+			0, /* assigned below */
+			"dht lock tranche"
+		};
+		simple_cache_shared *shared;
+		bool found;
+
+		/* Create or attach to the memory area and hash table on demand. */
+		LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+		shared = ShmemInitStruct("simple_cache", sizeof(simple_cache_shared), &found);
+		if (!found)
+		{
+			/* First caller.  Set up 'not yet created' state. */
+			shared->area_handle = 0;
+			shared->hash_table_handle = 0;
+		}
+		/* Create shared area if not yet created. */
+		if (shared->area_handle == 0)
+		{
+			MemoryContext old_context;
+
+			/*
+			 * Here, and in several places below, we switch to
+			 * TopMemoryContext because the backend-local dsa_area
+			 * and dht_hash_table objects are palloc'd and we want them
+			 * to stick around until backend exit.  All of this jiggery-pokery
+			 * and pinning is required because we're creating objects that
+			 * we want to use until cluster shutdown.  None of that stuff
+			 * is necessary when using DSA or DHT for parallel execution:
+			 * in that context we want the automatic cleanup that we're
+			 * effectively disabling here.
+			 */
+			old_context = MemoryContextSwitchTo(TopMemoryContext);
+			result.area = dsa_create_dynamic(LWLockNewTrancheId(),
+											 "simple_cache dsa_area");
+			MemoryContextSwitchTo(old_context);
+			if (result.area == NULL)
+				elog(ERROR, "simple_cache: could not create area");
+
+			/*
+			 * We want this area and its memory to survive even when
+			 * there are no backends attached.
+			 */
+			dsa_pin(result.area);
+			/*
+			 * We want this area's memory to stay mapped in, regardless of
+			 * what happens to the current resource owner.
+			 */
+			dsa_pin_mapping(result.area);
+			/* We want other backends to be able to attach. */
+			shared->area_handle = dsa_get_handle(result.area);
+		}
+		/* Create hash table if not yet created. */
+		if (shared->hash_table_handle == 0)
+		{
+			MemoryContext old_context;
+
+			params.tranche_id = LWLockNewTrancheId();
+			old_context = MemoryContextSwitchTo(TopMemoryContext);
+			result.hash_table = dht_create(result.area, &params);
+			MemoryContextSwitchTo(old_context);
+			if (result.hash_table == NULL)
+				elog(ERROR, "simple_cache: could not create hash table");
+			shared->hash_table_handle = dht_get_hash_table_handle(result.hash_table);
+		}
+		/* Attach to shared area if not yet attached. */
+		if (result.area == NULL)
+		{
+			MemoryContext old_context;
+
+			old_context = MemoryContextSwitchTo(TopMemoryContext);
+			result.area = dsa_attach_dynamic(shared->area_handle);
+			MemoryContextSwitchTo(old_context);
+
+			dsa_pin_mapping(result.area);
+		}
+		/* Attach to hash table if not yet attached. */
+		if (result.hash_table == NULL)
+		{
+			MemoryContext old_context;
+
+			old_context = MemoryContextSwitchTo(TopMemoryContext);
+			result.hash_table = dht_attach(result.area, &params,
+										   shared->hash_table_handle);
+			MemoryContextSwitchTo(old_context);
+		}
+		LWLockRelease(AddinShmemInitLock);
+	}
+
+	return &result;
+}
+
+Datum
+simple_cache_put(PG_FUNCTION_ARGS)
+{
+	bool found;
+	char padded_key[SIMPLE_CACHE_KEY_SIZE];
+	text *key;
+	text *value;
+	simple_cache_state *state;
+	simple_cache_entry *entry;
+	dsa_pointer new_value;
+
+	if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
+		elog(ERROR, "simple_cache_put: arguments must be non-NULL");
+
+	key = PG_GETARG_TEXT_PP(0);
+	value = PG_GETARG_TEXT_PP(1);
+
+	/* Build a NUL-padded version of the key. */
+	if (VARSIZE_ANY_EXHDR(key) + 1 > SIMPLE_CACHE_KEY_SIZE)
+		elog(ERROR, "simple_cache_put: key too long");
+	memset(padded_key, 0, sizeof(padded_key));
+	memcpy(padded_key, VARDATA_ANY(key), VARSIZE_ANY_EXHDR(key));
+
+	state = get_simple_cache_state();
+
+	/* Find or create an exclusively locked hash table entry. */
+	entry = dht_find_or_insert(state->hash_table, padded_key, &found);
+	if (entry == NULL)
+		elog(ERROR, "simple_cache_put: out of memory");
+
+	/* Copy the value into DSA memory. */
+	new_value = dsa_allocate(state->area, VARSIZE_ANY_EXHDR(value));
+	if (!DsaPointerIsValid(new_value))
+	{
+		/*
+		 * No memory.  Delete this entry if we just created it, or just unlock
+		 * if it was already there, and bail out.
+		 */
+		if (!found)
+			dht_delete_entry(state->hash_table, entry);
+		else
+			dht_release(state->hash_table, entry);
+		elog(ERROR, "simple_cache_put: out of memory");
+	}
+	memcpy(dsa_get_address(state->area, new_value),
+		   VARDATA_ANY(value),
+		   VARSIZE_ANY_EXHDR(value));
+	/* Put it into the entry (freeing the old value first if needed). */
+	if (found)
+		dsa_free(state->area, entry->value);
+	entry->value = new_value;
+	entry->value_size = VARSIZE_ANY_EXHDR(value);
+
+	/* Unlock the hash table entry. */
+	dht_release(state->hash_table, entry);
+
+	PG_RETURN_BOOL(found);
+}
+
+Datum
+simple_cache_get(PG_FUNCTION_ARGS)
+{
+	char padded_key[SIMPLE_CACHE_KEY_SIZE];
+	text *key;
+	text *value;
+	simple_cache_state *state;
+	simple_cache_entry *entry;
+
+	/* Reject invalid keys with NULL. */
+	if (PG_ARGISNULL(0))
+		PG_RETURN_NULL();
+	key = PG_GETARG_TEXT_PP(0);
+	if (VARSIZE_ANY_EXHDR(key) + 1 > SIMPLE_CACHE_KEY_SIZE)
+		PG_RETURN_NULL();
+
+	/* Build a NUL-padded version of the key. */
+	memset(padded_key, 0, sizeof(padded_key));
+	memcpy(padded_key, VARDATA_ANY(key), VARSIZE_ANY_EXHDR(key));
+
+	state = get_simple_cache_state();
+
+	/* Try to find the key. */
+	entry = dht_find(state->hash_table, padded_key, false);
+	if (entry == NULL)
+		PG_RETURN_NULL();
+	value = palloc(entry->value_size + VARHDRSZ);
+	SET_VARSIZE(value, entry->value_size + VARHDRSZ);
+	memcpy(VARDATA_ANY(value),
+		   dsa_get_address(state->area, entry->value),
+		   entry->value_size);
+	dht_release(state->hash_table, entry);
+
+	PG_RETURN_TEXT_P(value);
+}
+
+Datum
+simple_cache_drop(PG_FUNCTION_ARGS)
+{
+	char padded_key[SIMPLE_CACHE_KEY_SIZE];
+	text *key;
+	simple_cache_state *state;
+	simple_cache_entry *entry;
+
+	/* Reject invalid keys with NULL. */
+	if (PG_ARGISNULL(0))
+		PG_RETURN_NULL();
+	key = PG_GETARG_TEXT_PP(0);
+	if (VARSIZE_ANY_EXHDR(key) + 1 > SIMPLE_CACHE_KEY_SIZE)
+		PG_RETURN_NULL();
+
+	/* Build a NUL-padded version of the key. */
+	memset(padded_key, 0, sizeof(padded_key));
+	memcpy(padded_key, VARDATA_ANY(key), VARSIZE_ANY_EXHDR(key));
+
+	state = get_simple_cache_state();
+
+	/* Find it and lock it exclusively (prerequisite for deleting). */
+	entry = dht_find(state->hash_table, padded_key, true);
+	if (entry != NULL)
+	{
+		/* Free the value that we own in DSA memory. */
+		dsa_free(state->area, entry->value);
+		dht_delete_entry(state->hash_table, entry);
+	}
+	PG_RETURN_BOOL(entry != NULL);
+}
+
+Datum
+simple_cache_list(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Tuplestorestate *tupstore;
+	TupleDesc       tupdesc;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	simple_cache_state *state = get_simple_cache_state();
+	dht_iterator iterator;
+	simple_cache_entry *entry;
+
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")))
+			;
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	tupstore = tuplestore_begin_heap(true, false, 1024 * 1024);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	dht_iterate_begin(state->hash_table, &iterator, false);
+	while ((entry = dht_iterate_next(&iterator)))
+	{
+		Datum           values[2];
+		bool            nulls[2] = {false, false};
+
+		Assert(tupdesc->natts == 2);
+		values[0] = CStringGetDatum(cstring_to_text(entry->key));
+		values[1] =
+			CStringGetDatum(cstring_to_text_with_len(dsa_get_address(state->area,
+																	 entry->value),
+													 entry->value_size));
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+	dht_iterate_end(&iterator);
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
+
+Datum
+simple_cache_set_size_limit(PG_FUNCTION_ARGS)
+{
+	simple_cache_state *state = get_simple_cache_state();
+	int limit = PG_GETARG_INT32(0);
+
+	dsa_set_size_limit(state->area, limit);
+
+	return (Datum) 0;
+}
+
+Datum
+simple_cache_dump_area(PG_FUNCTION_ARGS)
+{
+	simple_cache_state *state = get_simple_cache_state();
+
+	dsa_dump(state->area);
+
+	return (Datum) 0;
+}
+
+Datum
+simple_cache_dump_hash_table(PG_FUNCTION_ARGS)
+{
+	simple_cache_state *state = get_simple_cache_state();
+
+	dht_dump(state->hash_table);
+
+	return (Datum) 0;
+}
diff --git a/src/test/modules/simple_cache/simple_cache.control b/src/test/modules/simple_cache/simple_cache.control
new file mode 100644
index 0000000..bcddbd1
--- /dev/null
+++ b/src/test/modules/simple_cache/simple_cache.control
@@ -0,0 +1,5 @@
+# simple_cache extension
+comment = 'Simple cache extension'
+default_version = '1.0'
+module_pathname = '$libdir/simple_cache'
+relocatable = true
\ No newline at end of file
#2Andres Freund
andres@anarazel.de
In reply to: Thomas Munro (#1)
Re: Hash tables in dynamic shared memory

Hi,

It's impossible to write a general purpose hash table that will be
suitable for every use case, considering all the combinations of
design trade offs and subsets of functionality someone might want.

Very much agreed.

There is other work being done in this space: I'm aware of Andres's
current thread[1] on Robin Hood-style closed hashing tables with
macro-ised size, hash and comparator operations � la C++ container
templates, and I'm following along with interest.

He vigorously
rejects chaining hash tables as the natural enemies of memory
hardware.

I'd not go quite that far. There are good arguments for open addressing,
and there's good arguments for open chaining. The latter is a lot more
convincing if you want concurrent access with partition based locking,
for example...

I guess Andres's simplehash should already work in DSA memory nicely
anyway since it doesn't have any internal pointers IIUC (?).

The data access doesn't have pointers, but the "metadata" does have a
pointer to the data. But that's a very solvable problem. But
incremental resizing and granular locking aren't going to happen for it,
so it'd really only be suitable for presenting a read-only (or possibly
read-mostly) view.

Potential use cases for DHT include caches, in-memory database objects
and working state for parallel execution.

Is there a more concrete example, i.e. a user we'd convert to this at
the same time as introducing this hashtable?

There are a couple of things I'm not happy about in this version of
DHT: the space overhead per item is too high (hash + wasted padding +
next pointer), and the iterator system is overly complicated. I have
another version in development which gets rid of the hash and padding
and sometimes gets rid of the next pointer to achieve zero overhead,
and I'm working on a simpler iteration algorithm that keeps the
current guarantees but doesn't need to reorder bucket chains. More on
that, with some notes on performance and a testing module, soon.

FWIW, my experimentation showed that getting rid of the hash itself is
quite often *NOT* a worthwhile tradeof. Comparing keys and recomputing
hashes is often quite expensive (e.g. for GROUP BY it's where the
majority of time is spent after the simplehash patch).

Greetings,

Andres Freund

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Andres Freund (#2)
Re: Hash tables in dynamic shared memory

On Wed, Oct 5, 2016 at 11:22 AM, Andres Freund <andres@anarazel.de> wrote:

Potential use cases for DHT include caches, in-memory database objects
and working state for parallel execution.

Is there a more concrete example, i.e. a user we'd convert to this at
the same time as introducing this hashtable?

A colleague of mine will shortly post a concrete patch to teach an
existing executor node how to be parallel aware, using DHT. I'll let
him explain.

I haven't looked into whether it would make sense to convert any
existing shmem dynahash hash table to use DHT. The reason for doing
so would be to move it out to DSM segments and enable dynamically
growing. I suspect that the bounded size of things like the hash
tables involved in (for example) predicate locking is considered a
feature, not a bug, so any such cluster-lifetime core-infrastructure
hash table would not be a candidate. More likely candidates would be
ephemeral data used by the executor, as in the above-mentioned patch,
and long lived caches of dynamic size owned by core code or
extensions. Like a shared query plan cache, if anyone can figure out
the invalidation magic required.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Thomas Munro (#3)
Re: Hash tables in dynamic shared memory

On Wed, Oct 5, 2016 at 12:11 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Wed, Oct 5, 2016 at 11:22 AM, Andres Freund <andres@anarazel.de> wrote:

Potential use cases for DHT include caches, in-memory database objects
and working state for parallel execution.

Is there a more concrete example, i.e. a user we'd convert to this at
the same time as introducing this hashtable?

A colleague of mine will shortly post a concrete patch to teach an
existing executor node how to be parallel aware, using DHT. I'll let
him explain.

I haven't looked into whether it would make sense to convert any
existing shmem dynahash hash table to use DHT. The reason for doing
so would be to move it out to DSM segments and enable dynamically
growing. I suspect that the bounded size of things like the hash
tables involved in (for example) predicate locking is considered a
feature, not a bug, so any such cluster-lifetime core-infrastructure
hash table would not be a candidate. More likely candidates would be
ephemeral data used by the executor, as in the above-mentioned patch,
and long lived caches of dynamic size owned by core code or
extensions. Like a shared query plan cache, if anyone can figure out
the invalidation magic required.

Another thought: it could be used to make things like
pg_stat_statements not have to be in shared_preload_libraries.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Magnus Hagander
magnus@hagander.net
In reply to: Thomas Munro (#4)
Re: Hash tables in dynamic shared memory

On Oct 5, 2016 1:23 AM, "Thomas Munro" <thomas.munro@enterprisedb.com>
wrote:

On Wed, Oct 5, 2016 at 12:11 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Wed, Oct 5, 2016 at 11:22 AM, Andres Freund <andres@anarazel.de>

wrote:

Potential use cases for DHT include caches, in-memory database objects
and working state for parallel execution.

Is there a more concrete example, i.e. a user we'd convert to this at
the same time as introducing this hashtable?

A colleague of mine will shortly post a concrete patch to teach an
existing executor node how to be parallel aware, using DHT. I'll let
him explain.

I haven't looked into whether it would make sense to convert any
existing shmem dynahash hash table to use DHT. The reason for doing
so would be to move it out to DSM segments and enable dynamically
growing. I suspect that the bounded size of things like the hash
tables involved in (for example) predicate locking is considered a
feature, not a bug, so any such cluster-lifetime core-infrastructure
hash table would not be a candidate. More likely candidates would be
ephemeral data used by the executor, as in the above-mentioned patch,
and long lived caches of dynamic size owned by core code or
extensions. Like a shared query plan cache, if anyone can figure out
the invalidation magic required.

Another thought: it could be used to make things like
pg_stat_statements not have to be in shared_preload_libraries.

That would indeed be a great improvement. And possibly also allow the
changing of the max number of statements it can track without a restart?

I was also wondering if it might be useful for a replacement for some of
the pgstats stuff to get rid of the cost of spooling to file and then
rebuilding the hash tables in the receiving end. I've been waiting for this
patch to figure out if that's useful. I mean keep the stats collector doing
what it does now over udp, but present the results in shared hash tables
instead of files.

/Magnus

#6Dilip Kumar
dilipbalaut@gmail.com
In reply to: Thomas Munro (#1)
Re: Hash tables in dynamic shared memory

On Wed, Oct 5, 2016 at 3:10 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Here is an experimental hash table implementation that uses DSA memory
so that hash tables can be shared by multiple backends and yet grow
dynamically. Development name: "DHT".

+dht_iterate_begin(dht_hash_table *hash_table,
+  dht_iterator *iterator,
+  bool exclusive)
+{
+ Assert(hash_table->control->magic == DHT_MAGIC);
+
+ iterator->hash_table = hash_table;
+ iterator->exclusive = exclusive;
+ iterator->partition = 0;
+ iterator->bucket = 0;
+ iterator->item = NULL;
+ iterator->locked = false;

While reviewing , I found that in dht_iterate_begin function, we are
not initializing
iterator->last_item_pointer to InvalidDsaPointer;
and during scan this variable will be used in advance_iterator
function. (I think this will create problem, even loss of some tuple
?)

+advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+ dsa_pointer *next)
+{
+ dht_hash_table_item *item;
+
+ while (DsaPointerIsValid(bucket_head))
+ {
+ item = dsa_get_address(iterator->hash_table->area,
+   bucket_head);
+ if ((!DsaPointerIsValid(iterator->last_item_pointer) ||
+ bucket_head < iterator->last_item_pointer) &&

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Dilip Kumar (#6)
1 attachment(s)
Re: Hash tables in dynamic shared memory

On Thu, Oct 6, 2016 at 12:02 AM, Dilip Kumar <dilipbalaut@gmail.com> wrote:

While reviewing , I found that in dht_iterate_begin function, we are
not initializing
iterator->last_item_pointer to InvalidDsaPointer;

Fixed, thanks.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

dht-v2.patchapplication/octet-stream; name=dht-v2.patchDownload
diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index e99ebd2..0c80371 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = dsa.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
+OBJS = dht.o dsa.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
 	procsignal.o  shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \
 	sinvaladt.o standby.o
 
diff --git a/src/backend/storage/ipc/dht.c b/src/backend/storage/ipc/dht.c
new file mode 100644
index 0000000..1fc6aa1
--- /dev/null
+++ b/src/backend/storage/ipc/dht.c
@@ -0,0 +1,1427 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.c
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * This is an open hashing hash table, with a linked list at each table
+ * entry.  It supports dynamic resizing, as required to prevent the linked
+ * lists from growing too long on average.  Currently, only growing is
+ * supported: the hash table never becomes smaller.
+ *
+ * To deal with currency, it has a fixed size set of partitions, each of which
+ * is independently locked.  Each bucket maps to a partition; so insert, find
+ * and iterate operations normally only acquire one lock.  Therefore, good
+ * concurrency is achieved whenever they don't collide at the lock partition
+ * level.  However, when a resize operation begins, all partition locks must
+ * be acquired simultaneously for a brief period.  This is only expected to
+ * happen a small number of times until a stable size is found, since growth is
+ * geometric.
+ *
+ * Resizing is done incrementally so that no individual insert operation pays
+ * for the potentially large cost of splitting all buckets.  A resize
+ * operation begins when any partition exceeds a certain load factor, and
+ * splits every bucket into two new buckets but doesn't yet transfer any items
+ * into the new buckets.  Then, future dht_release operations involving an
+ * exclusive lock transfer one item from an old bucket into the appropriate
+ * new bucket.  If possible, an item from the already-locked partition is
+ * migrated.  If none remain, a lock on another partition will be acquired and
+ * one item from that partition will be migrated.  This continues until there
+ * are no items left in old buckets, and the resize operation is finished.
+ * While a resize is in progress, find, insert and delete operations must look
+ * in a new bucket and an old bucket, but this is not too expensive since they
+ * are covered by the same lock.  That is, a given hash value maps to the same
+ * lock partition, no matter how many times buckets have been split.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/ipc/dht.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/dht.h"
+#include "storage/dsa.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/memutils.h"
+
+/*
+ * An item in the hash table.  This wraps the user's entry object in an
+ * envelop that holds a pointer back to the bucket and a pointer to the next
+ * item in the bucket.
+ */
+struct dht_hash_table_item
+{
+	/* The hashed key, to avoid having to recompute it. */
+	dht_hash	hash;
+	/* The next item in the same bucket. */
+	dsa_pointer next;
+	/* The user's entry object follows here. */
+	char		entry[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* The number of partitions for locking purposes. */
+#define DHT_NUM_PARTITIONS_LOG2 7
+#define DHT_NUM_PARTITIONS (1 << DHT_NUM_PARTITIONS_LOG2)
+
+/* A magic value used to identify DHT hash tables. */
+#define DHT_MAGIC 0x75ff6a20
+
+/*
+ * Tracking information for each lock partition.  Initially, each partition
+ * corresponds to one bucket, but each time the hash table grows, the buckets
+ * covered by each partition split so the number of buckets covered doubles.
+ *
+ * We might want to add padding here so that each partition is on a different
+ * cache line, but doing so would bloat this structure considerably.
+ */
+typedef struct
+{
+	LWLock		lock;			/* Protects all buckets in this partition. */
+	Size		count;			/* # of items in active buckets */
+	Size		old_count;		/* # of items in old buckets */
+	Index		next_bucket_to_split;	/* do_increment_resize progress
+										 * tracker */
+} dht_partition;
+
+/*
+ * The head object for a hash table.  This will be stored in dynamic shared
+ * memory.
+ */
+typedef struct
+{
+	dht_hash_table_handle handle;
+	uint32		magic;
+	dht_partition partitions[DHT_NUM_PARTITIONS];
+	int			lwlock_tranche_id;
+	char		lwlock_tranche_name[NAMEDATALEN];
+
+	/*
+	 * The following members are written to only when ALL partitions locks are
+	 * held.  They can be read when any one partition lock is held.
+	 */
+
+	/* Number of buckets expressed as power of 2 (8 = 256 buckets). */
+	Size		size_log2;		/* log2(number of buckets) */
+	dsa_pointer buckets;		/* current bucket array */
+	dsa_pointer old_buckets;	/* previous bucket array, if splitting */
+}	dht_hash_table_control;
+
+/*
+ * Per-backend state for a dynamic hash table.
+ */
+struct dht_hash_table
+{
+	dsa_area   *area;			/* Backing dynamic shared memory area. */
+	dht_parameters params;		/* Parameters. */
+	dht_hash_table_control *control;	/* Control object in DSM. */
+	dsa_pointer *buckets;		/* Current bucket pointers in DSM. */
+	Size		size_log2;		/* log2(number of buckets) */
+	dsa_pointer *old_buckets;	/* Old bucket pointers in DSM. */
+	LWLockTranche lwlock_tranche;		/* Tranche ID for this table's
+										 * lwlocks. */
+	bool		exclusively_locked;		/* Is an exclusive partition lock
+										 * held? */
+};
+
+/* Given a pointer to an entry, find the pointer to the item that holds it. */
+#define ITEM_FROM_ENTRY(entry)										\
+	((dht_hash_table_item *)((char *)entry -						\
+							 offsetof(dht_hash_table_item, entry)))
+
+/* How many resize operations (bucket splits) have there been? */
+#define NUM_SPLITS(size_log2)					\
+	(size_log2 - DHT_NUM_PARTITIONS_LOG2)
+
+/* How many buckets are there in each partition at a given size? */
+#define BUCKETS_PER_PARTITION(size_log2)		\
+	(1 << NUM_SPLITS(size_log2))
+
+/* Max entries before we need to grow.  Half + quarter = 75% load factor. */
+#define MAX_COUNT_PER_PARTITION(hash_table)				\
+	(BUCKETS_PER_PARTITION(hash_table->size_log2) / 2 + \
+	 BUCKETS_PER_PARTITION(hash_table->size_log2) / 4)
+
+/* Choose partition based on the highest order bits of the hash. */
+#define PARTITION_FOR_HASH(hash)										\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - DHT_NUM_PARTITIONS_LOG2))
+
+/*
+ * Find the bucket index for a given hash and table size.  Each time the table
+ * doubles in size, the appropriate bucket for a given hash value doubles and
+ * possibly adds one, depending on the newly revealed bit, so that all buckets
+ * are split.
+ */
+#define BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, size_log2)		\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - (size_log2)))
+
+/* The index of the first bucket in a given partition. */
+#define BUCKET_INDEX_FOR_PARTITION(partition, size_log2)	\
+	((partition) << NUM_SPLITS(size_log2))
+
+/* The head of the active bucket for a given hash value (lvalue). */
+#define BUCKET_FOR_HASH(hash_table, hash)								\
+	(hash_table->buckets[												\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2)])
+
+/* The head of the old bucket for a given hash value (lvalue). */
+#define OLD_BUCKET_FOR_HASH(hash_table, hash)							\
+	(hash_table->old_buckets[											\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2 - 1)])
+
+/* Is a resize currently in progress? */
+#define RESIZE_IN_PROGRESS(hash_table)			\
+	(hash_table->old_buckets != NULL)
+
+static void delete_item(dht_hash_table *hash_table, dht_hash_table_item * item);
+static void advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next);
+static dsa_pointer next_in_bucket(dht_iterator *iterator);
+static bool begin_resize(dht_hash_table *hash_table, Size new_size);
+static void finish_resize(dht_hash_table *hash_table);
+static void do_incremental_resize(dht_hash_table *hash_table, int p);
+static void search_for_resize_work(dht_hash_table *hash_table,
+					   int start_partition);
+static inline void ensure_valid_bucket_pointers(dht_hash_table *hash_table);
+static inline dht_hash_table_item *find_in_bucket(dht_hash_table *hash_table,
+			   const void *key,
+			   dsa_pointer item_pointer);
+static void insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket);
+static dht_hash_table_item *insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket);
+static bool delete_key_from_bucket(dht_hash_table *hash_table, const void *key,
+					   dsa_pointer *bucket_head);
+static bool delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head);
+
+#define PARTITION_LOCK(hash_table, i)			\
+	(&hash_table->control->partitions[i].lock)
+
+/*
+ * Create a new hash table backed by the given dynamic shared area, with the
+ * given parameters.
+ */
+dht_hash_table *
+dht_create(dsa_area *area, const dht_parameters *params)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Allocate the control object in shared memory. */
+	control = dsa_allocate(area, sizeof(dht_hash_table_control));
+	if (!DsaPointerIsValid(control))
+	{
+		/* Can't allocate memory, give up. */
+		pfree(hash_table);
+		return NULL;
+	}
+
+	/* Set up the local and shared hash table structs. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->control->handle = control;
+	hash_table->control->magic = DHT_MAGIC;
+
+	/* Set up the tranche of locks, and register it in this process. */
+	hash_table->control->lwlock_tranche_id = params->tranche_id;
+	strcpy(hash_table->control->lwlock_tranche_name,
+		   params->tranche_name);
+	hash_table->lwlock_tranche.name = params->tranche_name;
+	hash_table->lwlock_tranche.array_base =
+		&hash_table->control->partitions[0].lock;
+	hash_table->lwlock_tranche.array_stride = sizeof(dht_partition);
+	LWLockRegisterTranche(hash_table->control->lwlock_tranche_id,
+						  &hash_table->lwlock_tranche);
+
+	/* Set up the array of lock partitions. */
+	{
+		int			i;
+
+		for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		{
+			LWLockInitialize(PARTITION_LOCK(hash_table, i),
+							 hash_table->control->lwlock_tranche_id);
+			hash_table->control->partitions[i].count = 0;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+
+	/*
+	 * Set up the initial array of buckets.  Our initial size is the same as
+	 * the number of partitions.
+	 */
+	hash_table->control->size_log2 = DHT_NUM_PARTITIONS_LOG2;
+	hash_table->control->buckets =
+		dsa_allocate(area, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+	hash_table->buckets = dsa_get_address(area,
+										  hash_table->control->buckets);
+	hash_table->control->old_buckets = InvalidDsaPointer;
+	hash_table->old_buckets = NULL;
+	memset(hash_table->buckets, 0, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+
+	return hash_table;
+}
+
+/*
+ * Attach to an existing hash table using a handle.
+ */
+dht_hash_table *
+dht_attach(dsa_area *area, const dht_parameters *params,
+		   dht_hash_table_handle handle)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Find the control object in shared memory. */
+	control = handle;
+
+	/* Set up the local hash table struct. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->exclusively_locked = false;
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/* Register the tranche of locks in this process. */
+	hash_table->lwlock_tranche.name =
+		hash_table->control->lwlock_tranche_name;
+	hash_table->lwlock_tranche.array_base =
+		&hash_table->control->partitions[0].lock;
+	hash_table->lwlock_tranche.array_stride = sizeof(dht_partition);
+	LWLockRegisterTranche(hash_table->control->lwlock_tranche_id,
+						  &hash_table->lwlock_tranche);
+
+	return hash_table;
+}
+
+/*
+ * Detach from a hash table.  This frees backend-local resources associated
+ * with the hash table, but the hash table will continue to exist until it is
+ * either explicitly destroyed (by a backend that is still attached to it), or
+ * the area that backs it is returned to the operating system.
+ */
+void
+dht_detach(dht_hash_table *hash_table)
+{
+	/* The hash table may have been destroyed.  Just free local memory. */
+	pfree(hash_table);
+}
+
+/*
+ * Destroy a hash table, returning all memory to the area.  The caller must be
+ * certain that no other backend will attempt to access the hash table before
+ * calling this function.  Other backend must explicitly call dht_detach to
+ * free up backend-local memory associated with the hash table.  The backend
+ * that calls dht_destroy must not call dht_detach.
+ */
+void
+dht_destroy(dht_hash_table *hash_table)
+{
+	dht_iterator iterator;
+	void	   *entry;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/* Free all the entries. */
+	dht_iterate_begin(hash_table, &iterator, true);
+	while ((entry = dht_iterate_next(&iterator)))
+		dht_iterate_delete(&iterator);
+	dht_iterate_end(&iterator);
+
+	/*
+	 * Vandalize the control block to help catch programming errors where
+	 * other backends access the memory formerly occupied by this hash table.
+	 */
+	hash_table->control->magic = 0;
+
+	/* Free the active and (if appropriate) old table, and control object. */
+	if (DsaPointerIsValid(hash_table->old_buckets))
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+	dsa_free(hash_table->area, hash_table->control->buckets);
+	dsa_free(hash_table->area, hash_table->control->handle);
+
+	pfree(hash_table);
+}
+
+/*
+ * Get a handle that can be used by other processes to attach to this hash
+ * table.
+ */
+dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	return hash_table->control->handle;
+}
+
+/*
+ * Look up an entry, given a key.  Returns a pointer to an entry if one can be
+ * found with the given key.  Returns NULL if the key is not found.  If a
+ * non-NULL value is returned, the entry is locked and must be released by
+ * calling dht_release.  If an error is raised before dht_release is called,
+ * the lock will be released automatically, but the caller must take care to
+ * ensure that the entry is not left corrupted.  The lock mode is either
+ * shared or exclusive depending on 'exclusive'.
+ *
+ * Note that the lock held is in fact an LWLock, so interrupts will be held
+ * on return from this function, and not resumed until dht_release is called.
+ * It is a very good idea for the caller to release the lock quickly.
+ */
+void *
+dht_find(dht_hash_table *hash_table, const void *key, bool exclusive)
+{
+	Size		hash;
+	Size		partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition = PARTITION_FOR_HASH(hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition),
+				  exclusive ? LW_EXCLUSIVE : LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (!item)
+	{
+		/* Not found. */
+		LWLockRelease(PARTITION_LOCK(hash_table, partition));
+		return NULL;
+	}
+	else
+	{
+		/* The caller will free the lock by calling dht_release. */
+		hash_table->exclusively_locked = exclusive;
+		return &item->entry;
+	}
+}
+
+/*
+ * Returns a pointer to an exclusively locked item which must be released with
+ * dht_release.  If the key is found in the hash table, 'found' is set to true
+ * and a pointer to the existing entry is returned.  If the key is not found,
+ * 'found' is set to false, and a pointer to a newly created entry is related.
+ */
+void *
+dht_find_or_insert(dht_hash_table *hash_table,
+				   const void *key,
+				   bool *found)
+{
+	Size		hash;
+	Size		partition_index;
+	dht_partition *partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition_index = PARTITION_FOR_HASH(hash);
+	partition = &hash_table->control->partitions[partition_index];
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+restart:
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition_index),
+				  LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (item)
+		*found = true;
+	else
+	{
+		*found = false;
+
+		/* Check if we are getting too full. */
+		if (partition->count > MAX_COUNT_PER_PARTITION(hash_table) &&
+			!RESIZE_IN_PROGRESS(hash_table))
+		{
+			/*
+			 * The load factor (= keys / buckets) for all buckets protected by
+			 * this partition is > 0.75.  Presumably the same applies
+			 * generally across the whole hash table (though we don't attempt
+			 * to track that directly to avoid contention on some kind of
+			 * central counter; we just assume that this partition is
+			 * representative).  This is a good time to start a new resize
+			 * operation, if there isn't one already in progress.
+			 *
+			 * Give up our existing lock first, because resizing needs to
+			 * reacquire all the locks in the right order.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+			if (!begin_resize(hash_table, hash_table->size_log2 + 1))
+			{
+				/* Out of memory. */
+				return NULL;
+			}
+
+			goto restart;
+		}
+
+		/* Finally we can try to insert the new item. */
+		item = insert_into_bucket(hash_table, key,
+								  &BUCKET_FOR_HASH(hash_table, hash));
+		if (item)
+		{
+			item->hash = hash;
+			/* Adjust per-lock-partition counter for load factor knowledge. */
+			++partition->count;
+		}
+	}
+
+	if (!item)
+	{
+		/* Out of memory. */
+		LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+		return NULL;
+	}
+	else
+	{
+		/* The caller must release the lock with dht_release. */
+		hash_table->exclusively_locked = true;
+		return &item->entry;
+	}
+}
+
+/*
+ * Remove an entry by key.  Returns true if the key was found and the
+ * corresponding entry was removed.
+ *
+ * To delete an entry that you already have a pointer to, see
+ * dht_delete_entry.
+ */
+bool
+dht_delete_key(dht_hash_table *hash_table, const void *key)
+{
+	Size		hash;
+	Size		partition;
+	bool		found;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	partition = PARTITION_FOR_HASH(hash);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition), LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_key_from_bucket(hash_table, key,
+							   &BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].count;
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_key_from_bucket(hash_table, key,
+									&OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].old_count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].old_count;
+	}
+	else
+		found = false;
+
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+
+	return found;
+}
+
+/*
+ * Remove an entry.  The entry must already be exclusively locked, and must
+ * have been obtained by dht_find or dht_find_or_insert.  Note that this
+ * function releases the lock just like dht_release.
+ *
+ * To delete an entry by key, see dht_delete_key.  To delete an entry while
+ * iterating, see dht_iterator_delete.
+ */
+void
+dht_delete_entry(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	Size		partition = PARTITION_FOR_HASH(item->hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(hash_table->exclusively_locked);
+
+	delete_item(hash_table, item);
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+}
+
+/*
+ * Unlock an entry which was locked by dht_find or dht_find_or_insert.
+ */
+void
+dht_release(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	Size		partition_index = PARTITION_FOR_HASH(item->hash);
+	bool		deferred_resize_work = false;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/*
+	 * If there is a resize in progress and we took an exclusive lock, try to
+	 * migrate a key from the old bucket array to the new bucket array.
+	 * Ideally, we'll find some work to do in the partition on which we
+	 * already hold a lock, but if not, we'll make a note to search other lock
+	 * partitions after releasing the lock we currently hold.
+	 */
+	if (RESIZE_IN_PROGRESS(hash_table) && hash_table->exclusively_locked)
+	{
+		dht_partition *partition;
+
+		partition = &hash_table->control->partitions[partition_index];
+
+		if (partition->old_count > 0)
+		{
+			/* Do some resizing work in this partition. */
+			do_incremental_resize(hash_table, partition_index);
+		}
+		else
+		{
+			/* Nothing left to do in this partition.  Look elsewhere. */
+			deferred_resize_work = true;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+
+	/*
+	 * If we need to contribute some work to the ongoing resize effort, and we
+	 * didn't find something to migrate in our own partition, go search for
+	 * such work in other partitions.  We'll start with the one after ours.
+	 */
+	if (deferred_resize_work)
+		search_for_resize_work(hash_table,
+							   (partition_index + 1) % DHT_NUM_PARTITIONS);
+}
+
+/*
+ * Begin iterating through the whole hash table.  The caller must supply a
+ * dht_iterator object, which can then be used to call dht_iterate_next to get
+ * values until the end is reached.
+ */
+void
+dht_iterate_begin(dht_hash_table *hash_table,
+				  dht_iterator *iterator,
+				  bool exclusive)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	iterator->hash_table = hash_table;
+	iterator->exclusive = exclusive;
+	iterator->partition = 0;
+	iterator->bucket = 0;
+	iterator->item = NULL;
+	iterator->last_item_pointer = InvalidDsaPointer;
+	iterator->locked = false;
+
+	/* Snapshot the size (arbitrary lock to prevent size changing). */
+	LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+	iterator->table_size_log2 = hash_table->size_log2;
+	LWLockRelease(PARTITION_LOCK(hash_table, 0));
+}
+
+/*
+ * Delete the entry that was most recently returned by dht_iterate_next.  The
+ * iterator must have been initialized in exclusive lock mode.
+ */
+void
+dht_iterate_delete(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	Assert(iterator->exclusive);
+	Assert(iterator->item != NULL);
+	delete_item(iterator->hash_table, iterator->item);
+	iterator->item = NULL;
+}
+
+/*
+ * Move to the next item in the hash table.  Returns a pointer to an entry, or
+ * NULL if the end of the hash table has been reached.  The item is locked in
+ * exclusive or shared mode depending on the argument given to
+ * dht_iterate_begin.  The caller can optionally release the lock by calling
+ * dht_iterate_release, and then call dht_iterate_next again to move to the
+ * next entry.  If the iteration is in exclusive mode, client code can also
+ * call dht_iterate_delete.  When the end of the hash table is reached, or at
+ * any time, the client may call dht_iterate_end to abandon iteration.
+ */
+void *
+dht_iterate_next(dht_iterator *iterator)
+{
+	dsa_pointer item_pointer;
+
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+
+	while (iterator->partition < DHT_NUM_PARTITIONS)
+	{
+		if (!iterator->locked)
+		{
+			LWLockAcquire(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition),
+						  iterator->exclusive ? LW_EXCLUSIVE : LW_SHARED);
+			iterator->locked = true;
+		}
+
+		item_pointer = next_in_bucket(iterator);
+		if (DsaPointerIsValid(item_pointer))
+		{
+			/* Remember this item, so that we can step over it next time. */
+			iterator->last_item_pointer = item_pointer;
+			iterator->item = dsa_get_address(iterator->hash_table->area,
+											 item_pointer);
+			return &(iterator->item->entry);
+		}
+
+		/* We have reached the end of the bucket. */
+		iterator->last_item_pointer = InvalidDsaPointer;
+		iterator->bucket += 1;
+
+		/*
+		 * If the last bucket was split while we were iterating, then we have
+		 * been doing some expensive bucket merging to get each item.  Now
+		 * that we've completed that whole pre-split bucket (by merging all
+		 * its descendants), we can update the table size of our iterator and
+		 * start iterating cheaply again.
+		 */
+		iterator->bucket <<=
+			iterator->hash_table->size_log2 - iterator->table_size_log2;
+		iterator->table_size_log2 = iterator->hash_table->size_log2;
+		/* Have we gone past the last bucket in this partition? */
+		if (iterator->bucket >=
+			BUCKET_INDEX_FOR_PARTITION(iterator->partition + 1,
+									   iterator->table_size_log2))
+		{
+			/* No more buckets.  Try next partition. */
+			LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition));
+			iterator->locked = false;
+			++iterator->partition;
+			iterator->bucket =
+				BUCKET_INDEX_FOR_PARTITION(iterator->partition,
+										   iterator->table_size_log2);
+		}
+	}
+	iterator->item = NULL;
+	return NULL;
+}
+
+/*
+ * Release the most recently obtained lock.  This can optionally be called in
+ * between calls to dht_iterator_next to allow other processes to access the
+ * same partition of the hash table.
+ */
+void
+dht_iterate_release(dht_iterator *iterator)
+{
+	Assert(iterator->locked);
+	LWLockRelease(PARTITION_LOCK(iterator->hash_table, iterator->partition));
+	iterator->locked = false;
+}
+
+/*
+ * Terminate iteration.  This must be called after iteration completes,
+ * whether or not the end was reached.  The iterator object may then be reused
+ * for another iteration.
+ */
+void
+dht_iterate_end(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	if (iterator->locked)
+		LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+									 iterator->partition));
+}
+
+/*
+ * Print out debugging information about the internal state of the hash table.
+ */
+void
+dht_dump(dht_hash_table *hash_table)
+{
+	Size		i;
+	Size		j;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
+
+	ensure_valid_bucket_pointers(hash_table);
+
+	fprintf(stderr,
+			"hash table size = %zu\n", (Size) 1 << hash_table->size_log2);
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+		Size		begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2);
+		Size		end = BUCKET_INDEX_FOR_PARTITION(i + 1, hash_table->size_log2);
+
+		fprintf(stderr, "  partition %zu\n", i);
+		fprintf(stderr,
+				"    active buckets (key count = %zu)\n", partition->count);
+
+		for (j = begin; j < end; ++j)
+		{
+			Size		count = 0;
+			dsa_pointer bucket = hash_table->buckets[j];
+
+			while (DsaPointerIsValid(bucket))
+			{
+				dht_hash_table_item *item;
+
+				item = dsa_get_address(hash_table->area, bucket);
+
+				bucket = item->next;
+				++count;
+			}
+			fprintf(stderr, "      bucket %zu (key count = %zu)\n", j, count);
+		}
+		if (RESIZE_IN_PROGRESS(hash_table))
+		{
+			Size		begin;
+			Size		end;
+
+			begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2 - 1);
+			end = BUCKET_INDEX_FOR_PARTITION(i + 1,
+											 hash_table->size_log2 - 1);
+
+			fprintf(stderr, "    old buckets (key count = %zu)\n",
+					partition->old_count);
+
+			for (j = begin; j < end; ++j)
+			{
+				Size		count = 0;
+				dsa_pointer bucket = hash_table->old_buckets[j];
+
+				while (DsaPointerIsValid(bucket))
+				{
+					dht_hash_table_item *item;
+
+					item = dsa_get_address(hash_table->area, bucket);
+
+					bucket = item->next;
+					++count;
+				}
+				fprintf(stderr,
+						"      bucket %zu (key count = %zu)\n", j, count);
+			}
+		}
+	}
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
+
+/*
+ * Delete a locked item to which we have a pointer.
+ */
+static void
+delete_item(dht_hash_table *hash_table, dht_hash_table_item * item)
+{
+	Size		hash = item->hash;
+	Size		partition = PARTITION_FOR_HASH(hash);
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, partition)));
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_item_from_bucket(hash_table, item,
+								&BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].count;
+		Assert(hash_table->control->partitions[partition].count >= 0);
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_item_from_bucket(hash_table, item,
+									 &OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].old_count;
+		Assert(hash_table->control->partitions[partition].old_count >= 0);
+	}
+	else
+	{
+		Assert(false);
+	}
+}
+
+/*
+ * Find the item that comes after iterator->last_item_pointer but before
+ * *next, and write it into *next.
+ *
+ * Calling this on several buckets allows us to 'merge' the items from a
+ * bucket with its ancestors and descendents during resize operations, and
+ * iterate through the items in all of them in a stable order, despite items
+ * being able to move while we iterate.
+ *
+ * This works because buckets are sorted by descending item pointer.
+ */
+static void
+advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next)
+{
+	dht_hash_table_item *item;
+
+	while (DsaPointerIsValid(bucket_head))
+	{
+		item = dsa_get_address(iterator->hash_table->area,
+							   bucket_head);
+		if ((!DsaPointerIsValid(iterator->last_item_pointer) ||
+			 bucket_head < iterator->last_item_pointer) &&
+			bucket_head > *next &&
+			BUCKET_INDEX_FOR_HASH_AND_SIZE(item->hash,
+										   iterator->table_size_log2) ==
+			iterator->bucket)
+		{
+			*next = bucket_head;
+			return;
+		}
+		bucket_head = item->next;
+	}
+}
+
+/*
+ * Find the next item in the bucket we are visiting.  If a resize is in
+ * progress or the bucket has been split since we started iterating over this
+ * bucket, we may need to combine items from multiple buckets to synthesize
+ * the original bucket.
+ */
+static dsa_pointer
+next_in_bucket(dht_iterator *iterator)
+{
+	Size		bucket;
+	Size		begin;
+	Size		end;
+	dsa_pointer next = InvalidDsaPointer;
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition)));
+
+	/* Handle old buckets if a resize is in progress. */
+	if (RESIZE_IN_PROGRESS(iterator->hash_table))
+	{
+		if (iterator->table_size_log2 == iterator->hash_table->size_log2)
+		{
+			/*
+			 * The table hasn't grown since we started iterating, but there
+			 * was already a resize in progress so there may be items in an
+			 * old bucket that belong in the bucket we're iterating over which
+			 * haven't been migrated yet.
+			 */
+			advance_iterator(iterator,
+					 iterator->hash_table->old_buckets[iterator->bucket / 2],
+							 &next);
+		}
+		else
+		{
+			/*
+			 * The bucket has been split one or more times, and a resize is
+			 * still in progress so there may be items still in an old bucket.
+			 * If N splits have occurred, there will be 2^N current buckets
+			 * that correspond to the one original bucket.
+			 */
+			Assert(iterator->hash_table->size_log2 > iterator->table_size_log2);
+			begin = iterator->bucket <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			end = (iterator->bucket + 1) <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			for (bucket = begin; bucket < end; bucket += 1)
+				advance_iterator(iterator,
+								 iterator->hash_table->old_buckets[bucket],
+								 &next);
+		}
+	}
+
+	/*
+	 * Scan the bucket.  This is a loop because it may have split any number
+	 * of times since we started iterating through it, but in the common case
+	 * we just loop once.
+	 */
+	begin = iterator->bucket <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	end = (iterator->bucket + 1) <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	for (bucket = begin; bucket < end; ++bucket)
+		advance_iterator(iterator,
+						 iterator->hash_table->buckets[iterator->bucket],
+						 &next);
+
+	return next;
+}
+
+/*
+ * Grow the hash table if necessary to the requested number of buckets.  The
+ * requested size must be double some previously observed size.  Returns true
+ * if the table was successfully expanded or found to be big enough already
+ * (because another backend expanded it).  Returns false for out-of-memory.
+ * The process of transferring items from the old table to the new one will be
+ * carried out with a small tax on all future operations until the work is
+ * done.
+ */
+static bool
+begin_resize(dht_hash_table *hash_table, Size new_size_log2)
+{
+	int			i;
+	dsa_pointer new_buckets;
+	bool		result;
+
+	/*
+	 * Acquire the locks for all lock partitions.  This is expensive, but we
+	 * shouldn't have to do it many times.
+	 */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0 && hash_table->control->size_log2 >= new_size_log2)
+		{
+			/*
+			 * Another backend has already increased the size; we can avoid
+			 * obtaining all the locks and return early.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, 0));
+			return true;
+		}
+	}
+
+	Assert(new_size_log2 == hash_table->control->size_log2 + 1);
+	Assert(!RESIZE_IN_PROGRESS(hash_table));
+
+	/* Allocate the space for the new table. */
+	new_buckets = dsa_allocate(hash_table->area,
+							   sizeof(dsa_pointer) * (1 << new_size_log2));
+	if (DsaPointerIsValid(new_buckets))
+	{
+		result = true;
+		hash_table->control->old_buckets = hash_table->control->buckets;
+		hash_table->control->buckets = new_buckets;
+		hash_table->control->size_log2 = new_size_log2;
+		hash_table->buckets = dsa_get_address(hash_table->area,
+											  hash_table->control->buckets);
+		hash_table->old_buckets =
+			dsa_get_address(hash_table->area,
+							hash_table->control->old_buckets);
+		hash_table->size_log2 = new_size_log2;
+		/* InvalidDsaPointer in every bucket */
+		memset(hash_table->buckets, 0,
+			   sizeof(dsa_pointer) * (1 << new_size_log2));
+		for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		{
+			dht_partition *partition = &hash_table->control->partitions[i];
+
+			partition->old_count = partition->count;
+			partition->count = 0;
+			partition->next_bucket_to_split =
+				BUCKET_INDEX_FOR_PARTITION(i, new_size_log2 - 1);
+		}
+	}
+	else
+	{
+		/* Out of memory. */
+		result = false;
+	}
+
+	/* Release all the locks. */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+
+	return result;
+}
+
+/*
+ * Check if the resize operation is finished.  If it is, then the old table
+ * can be returned to the shared memory area.
+ */
+static void
+finish_resize(dht_hash_table *hash_table)
+{
+	int			i,
+				j;
+	bool		finished = true;
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0)
+		{
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* Another backend has already finished the resize. */
+				LWLockRelease(PARTITION_LOCK(hash_table, i));
+				return;
+			}
+		}
+
+		/*
+		 * We can only actually finish the resize when every partition has no
+		 * more keys in old buckets.  This might not be true yet, because
+		 * search_for_resize_work calls us when it thinks there might be no
+		 * more resizing work to do, but it could be wrong because it judges
+		 * that with an unsynchronized view of memory.  Now that we can
+		 * examine the synchronized counters, we can check if the resize
+		 * really is complete.
+		 */
+		if (partition->old_count > 0)
+		{
+			finished = false;
+			break;
+		}
+	}
+
+	if (finished)
+	{
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+		hash_table->control->old_buckets = InvalidDsaPointer;
+	}
+
+	/*
+	 * Only release as many locks as we acquired.  When finished is true, this
+	 * will be all of them, but if we exited the lock-acquire loop early it
+	 * might be fewer.
+	 */
+	for (j = 0; j < i; ++j)
+		LWLockRelease(PARTITION_LOCK(hash_table, j));
+}
+
+/*
+ * Migrate one item from partition 'p' from an old bucket to the appropriate
+ * new bucket.  The partition lock must already be held and there must be at
+ * least one item to be migrated.
+ */
+static void
+do_incremental_resize(dht_hash_table *hash_table, int p)
+{
+	Size		old_size_log2 = hash_table->control->size_log2 - 1;
+	dht_partition *partition = &hash_table->control->partitions[p];
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, p)));
+	Assert(partition->old_count > 0);
+
+	/*
+	 * Walk through all the buckets that are covered by this partition until
+	 * we find one that is not empty, and migrate one item from it.
+	 */
+	for (;;)
+	{
+		dsa_pointer item_pointer;
+
+		item_pointer =
+			hash_table->old_buckets[partition->next_bucket_to_split];
+
+		if (DsaPointerIsValid(item_pointer))
+		{
+			dht_hash_table_item *item;
+
+			item = dsa_get_address(hash_table->area, item_pointer);
+
+			/* Remove it from the old bucket. */
+			hash_table->old_buckets[partition->next_bucket_to_split] =
+				item->next;
+			--partition->old_count;
+
+			/* Add it to the active bucket. */
+			insert_item_into_bucket(hash_table,
+									item_pointer,
+									item,
+									&BUCKET_FOR_HASH(hash_table, item->hash));
+			++partition->count;
+
+			return;
+		}
+		else
+		{
+			/* Move to the next bucket protected by this partition. */
+			partition->next_bucket_to_split += 1;
+
+			/*
+			 * We shouldn't be able to reach past the end of the partition
+			 * without finding an item.  That would imply that the partition's
+			 * old_count was corrupted.
+			 *
+			 * This is an elog() because we want to catch this even in builds
+			 * that are not Assert-enabled.
+			 */
+			if (partition->next_bucket_to_split >=
+				BUCKET_INDEX_FOR_PARTITION(p + 1, old_size_log2))
+				elog(ERROR, "dht corrupted");
+		}
+	}
+}
+
+/*
+ * Try to find a partition that has outstanding resizing work and do some of
+ * that work, possibly completing the resize operation.  The caller must hold
+ * no locks.
+ *
+ * Consider 'start_partition' first.  dht_release tries to spread this around
+ * to try to reduce contention.
+ */
+static void
+search_for_resize_work(dht_hash_table *hash_table, int start_partition)
+{
+	int			i = start_partition;
+
+	Assert(start_partition >= 0);
+	Assert(start_partition < DHT_NUM_PARTITIONS);
+
+	/*
+	 * Using a non-synchronized view of memory (no locks), see if we can find
+	 * any partition that looks like it still has keys to be migrated.
+	 */
+	do
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		if (partition->old_count > 0)
+		{
+			bool		done = false;
+
+			/* Now check again with a lock. */
+			LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* The resize finished underneath us. */
+				done = true;
+			}
+			else if (partition->old_count > 0)
+			{
+				/* There is in fact work to be done in this partition. */
+				do_incremental_resize(hash_table, i);
+				done = true;
+			}
+			LWLockRelease(PARTITION_LOCK(hash_table, i));
+
+			if (done)
+				return;
+		}
+		i = (i + 1) % DHT_NUM_PARTITIONS;
+	} while (i != start_partition);
+
+	/*
+	 * We didn't find any partitions that seem to have remaining migration
+	 * work, though we can't be 100% sure due to unsynchronized reads of
+	 * memory.  We will try to finish the resize, which may or may not
+	 * actually succeed, depending on what it finds the true state to be.
+	 */
+	finish_resize(hash_table);
+}
+
+/*
+ * Make sure that our backend-local bucket pointers are up to date.  The
+ * caller must have locked one lock partition, which prevents begin_resize()
+ * from running concurrently.
+ */
+static inline void
+ensure_valid_bucket_pointers(dht_hash_table *hash_table)
+{
+	if (hash_table->size_log2 != hash_table->control->size_log2)
+	{
+		/*
+		 * A resize has started since we last looked, so our pointers are out
+		 * of date.
+		 */
+		hash_table->buckets = dsa_get_address(hash_table->area,
+											  hash_table->control->buckets);
+		if (DsaPointerIsValid(hash_table->control->old_buckets))
+			hash_table->old_buckets =
+				dsa_get_address(hash_table->area,
+								hash_table->control->old_buckets);
+		else
+			hash_table->old_buckets = NULL;
+		hash_table->size_log2 = hash_table->control->size_log2;
+	}
+	if (hash_table->old_buckets &&
+		!DsaPointerIsValid(hash_table->control->old_buckets))
+	{
+		/* A resize has finished since we last looked. */
+		hash_table->old_buckets = NULL;
+	}
+}
+
+/*
+ * Scan a locked bucket for a match, using the provided compare-function.
+ */
+static inline dht_hash_table_item *
+find_in_bucket(dht_hash_table *hash_table, const void *key,
+			   dsa_pointer item_pointer)
+{
+	while (DsaPointerIsValid(item_pointer))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, item_pointer);
+		if (hash_table->params.compare_function(key, &item->entry,
+										   hash_table->params.key_size) == 0)
+			return item;
+		item_pointer = item->next;
+	}
+	return NULL;
+}
+
+/*
+ * Insert an already-allocated item into a bucket.
+ */
+static void
+insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket)
+{
+	Assert(item == dsa_get_address(hash_table->area, item_pointer));
+
+	/*
+	 * Find the insertion point for this item in the bucket.  Buckets are
+	 * sorted by descending dsa_pointer, as part of the protocol for ensuring
+	 * that iterators don't visit items twice when buckets are split during a
+	 * scan.  (We don't actually expect them to have more than 1 item unless
+	 * the hash function is of low quality.)
+	 */
+	while (*bucket > item_pointer)
+	{
+		dht_hash_table_item *this_item;
+
+		this_item = dsa_get_address(hash_table->area, *bucket);
+
+		bucket = &this_item->next;
+	}
+
+	item->next = *bucket;
+	*bucket = item_pointer;
+}
+
+/*
+ * Allocate space for an entry with the given key and insert it into the
+ * provided bucket.
+ */
+static dht_hash_table_item *
+insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket)
+{
+	dsa_pointer item_pointer;
+	dht_hash_table_item *item;
+
+	item_pointer = dsa_allocate(hash_table->area,
+								hash_table->params.entry_size +
+								offsetof(dht_hash_table_item, entry));
+	if (!DsaPointerIsValid(item_pointer))
+		/* Out of memory. */
+		return NULL;
+
+	item = dsa_get_address(hash_table->area, item_pointer);
+	memcpy(&(item->entry), key, hash_table->params.key_size);
+	insert_item_into_bucket(hash_table, item_pointer, item, bucket);
+	return item;
+}
+
+/*
+ * Search a bucket for a matching key and delete it.
+ */
+static bool
+delete_key_from_bucket(dht_hash_table *hash_table,
+					   const void *key,
+					   dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (hash_table->params.compare_function(key, &item->entry,
+												hash_table->params.key_size)
+			== 0)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &item->next;
+	}
+	return false;
+}
+
+/*
+ * Delete the specified item from the bucket.
+ */
+static bool
+delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *bucket_item;
+
+		bucket_item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (bucket_item == item)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &bucket_item->next;
+	}
+	return false;
+}
diff --git a/src/include/storage/dht.h b/src/include/storage/dht.h
new file mode 100644
index 0000000..ccf9d17
--- /dev/null
+++ b/src/include/storage/dht.h
@@ -0,0 +1,117 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.h
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/include/storage/dht.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DHT_H
+#define DHT_H
+
+#include "postgres.h"
+
+#include "storage/dsa.h"
+
+/* The opaque type representing a hash table. */
+struct dht_hash_table;
+typedef struct dht_hash_table dht_hash_table;
+
+/* The opaque type used for tracking iterator state. */
+struct dht_iterator;
+typedef struct dht_iterator dht_iterator;
+
+/* A handle for a dht_hash_table which can be shared with other processes. */
+typedef dsa_pointer dht_hash_table_handle;
+
+/* The type for hash values. */
+typedef uint32 dht_hash;
+
+/*
+ * The function type for computing hash values for keys.  Compatible with
+ * HashValueFunc in hsearch.h and hash functions like tag_hash.
+ */
+typedef dht_hash (*dht_hash_function)(const void *v, Size size);
+
+/*
+ * The function used for comparing keys.  Compatible with HashCompareFunction
+ * in hsearch.h and memcmp.
+ */
+typedef int (*dht_compare_function)(const void *a, const void *b, Size size);
+
+/*
+ * The set of parameters needed to create or attach to a hash table.  The
+ * members tranche_id and tranche_name do not need to be initialized when
+ * attaching to an existing hash table.
+ */
+typedef struct
+{
+	Size key_size;				/* Size of the key (initial bytes of entry) */
+	Size entry_size;			/* Total size of entry */
+	dht_compare_function compare_function;	/* Compare function */
+	dht_hash_function hash_function;	/* Hash function */
+	int tranche_id;				/* The tranche ID to use for locks. */
+	char tranche_name[NAMEDATALEN];	/* The tranche name to use for locks. */
+} dht_parameters;
+
+/* Forward declaration of private types for use only by dht.c. */
+struct dht_hash_table_bucket;
+struct dht_hash_table_item;
+typedef struct dht_hash_table_item dht_hash_table_item;
+typedef struct dht_hash_table_bucket dht_hash_table_bucket;
+
+/*
+ * The state used to track a walk over all entries in a hash table.  The
+ * members of this struct are only for use by code in dht.c, but it is
+ * included in the header because it's useful to be able to create objects of
+ * this type on the stack to pass into the dht_iterator_XXX functions.
+ */
+struct dht_iterator
+{
+	dht_hash_table *hash_table;	/* The hash table we are iterating over. */
+	bool exclusive;				/* Whether to lock buckets exclusively. */
+	Size partition;				/* The index of the next partition to visit. */
+	Size bucket;				/* The index of the next bucket to visit. */
+	dht_hash_table_item *item;  /* The most recently returned item. */
+	dsa_pointer last_item_pointer; /* The last item visited. */
+	Size table_size_log2; 	/* The table size when we started iterating. */
+	bool locked;			/* Whether the current partition is locked. */
+};
+
+/* Creating, sharing and destroying from hash tables. */
+extern dht_hash_table *dht_create(dsa_area *area,
+								  const dht_parameters *params);
+extern dht_hash_table *dht_attach(dsa_area *area,
+								  const dht_parameters *params,
+								  dht_hash_table_handle handle);
+extern void dht_detach(dht_hash_table *hash_table);
+extern dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table);
+extern void dht_destroy(dht_hash_table *hash_table);
+
+/* Iterating over the whole hash table. */
+extern void dht_iterate_begin(dht_hash_table *hash_table,
+							  dht_iterator *iterator, bool exclusive);
+extern void *dht_iterate_next(dht_iterator *iterator);
+extern void dht_iterate_delete(dht_iterator *iterator);
+extern void dht_iterate_release(dht_iterator *iterator);
+extern void dht_iterate_end(dht_iterator *iterator);
+
+/* Finding, creating, deleting entries. */
+extern void *dht_find(dht_hash_table *hash_table,
+					  const void *key, bool exclusive);
+extern void *dht_find_or_insert(dht_hash_table *hash_table,
+								const void *key, bool *found);
+extern bool dht_delete_key(dht_hash_table *hash_table, const void *key);
+extern void dht_delete_entry(dht_hash_table *hash_table, void *entry);
+extern void dht_release(dht_hash_table *hash_table, void *entry);
+
+/* Debugging support. */
+extern void dht_dump(dht_hash_table *hash_table);
+
+#endif   /* DHT_H */
#8Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Magnus Hagander (#5)
Re: Hash tables in dynamic shared memory

On Wed, Oct 5, 2016 at 7:02 PM, Magnus Hagander <magnus@hagander.net> wrote:

On Oct 5, 2016 1:23 AM, "Thomas Munro" <thomas.munro@enterprisedb.com>
wrote:

On Wed, Oct 5, 2016 at 12:11 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

On Wed, Oct 5, 2016 at 11:22 AM, Andres Freund <andres@anarazel.de>
wrote:

Potential use cases for DHT include caches, in-memory database objects
and working state for parallel execution.

Is there a more concrete example, i.e. a user we'd convert to this at
the same time as introducing this hashtable?

A colleague of mine will shortly post a concrete patch to teach an
existing executor node how to be parallel aware, using DHT. I'll let
him explain.

I haven't looked into whether it would make sense to convert any
existing shmem dynahash hash table to use DHT. The reason for doing
so would be to move it out to DSM segments and enable dynamically
growing. I suspect that the bounded size of things like the hash
tables involved in (for example) predicate locking is considered a
feature, not a bug, so any such cluster-lifetime core-infrastructure
hash table would not be a candidate. More likely candidates would be
ephemeral data used by the executor, as in the above-mentioned patch,
and long lived caches of dynamic size owned by core code or
extensions. Like a shared query plan cache, if anyone can figure out
the invalidation magic required.

Another thought: it could be used to make things like
pg_stat_statements not have to be in shared_preload_libraries.

That would indeed be a great improvement. And possibly also allow the
changing of the max number of statements it can track without a restart?

Yeah. You don't explicitly size a DHT hash table, it just grows as
required to keep the load factor low enough, possibly leading the DSA
area it lives in to create DSM segments. Currently it never gets
smaller though, so if you throw out a bunch of entries you will be
freeing up the memory occupied by the entries themselves (meaning:
giving it back to the DSA area, which might eventually give back to
the OS if the planets are correctly aligned rendering a DSM segment
entirely unused), but the hash table's bucket array won't ever shrink.

I was also wondering if it might be useful for a replacement for some of the
pgstats stuff to get rid of the cost of spooling to file and then rebuilding
the hash tables in the receiving end. I've been waiting for this patch to
figure out if that's useful. I mean keep the stats collector doing what it
does now over udp, but present the results in shared hash tables instead of
files.

Interesting thought. I haven't studied how that stuff works... I've
put it on my reading list.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Andres Freund
andres@anarazel.de
In reply to: Magnus Hagander (#5)
Re: Hash tables in dynamic shared memory

On 2016-10-05 08:02:42 +0200, Magnus Hagander wrote:

On Oct 5, 2016 1:23 AM, "Thomas Munro" <thomas.munro@enterprisedb.com>
wrote:

Another thought: it could be used to make things like
pg_stat_statements not have to be in shared_preload_libraries.

That would indeed be a great improvement. And possibly also allow the
changing of the max number of statements it can track without a restart?

There's the issue of having to serialize / load the file around a
restart...

Greetings,

Andres Freund

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10John Gorman
johngorman2@gmail.com
In reply to: Andres Freund (#9)
1 attachment(s)
Re: Hash tables in dynamic shared memory

I reviewed the dht-v2.patch and found that it is in excellent shape.

Benchmarking shows that this performs somewhat faster than
dynahash which is surprising because it is doing DSA address
translations on the fly.

One area where this could run faster is to reduce the amount of
time when the hash is in the RESIZE_IN_PROGRESS state.

When a hash partition reaches 75% full a resize begins by allocating
an empty new hash bucket array with double the number of buckets.
This begins the RESIZE_IN_PROGRESS state where there is both an
old and a new hash bucket array.

During the RESIZE state all operations such as find, insert,
delete and iterate run slower due to having to look items up in
two hash bucket arrays.

Whenever a new item is inserted into the hash and the hash is in
the RESIZE state the current code takes the time to move one item
from the old hash partition to the new hash partition. During
insertion an exclusive lock is already held on the partition so
this is an efficient time to do the resize cleanup work.

However since we only clean up one item per insertion we do not
complete the cleanup and free the old hash bucket array until we
are due to start yet another resize operation.

So we are effectively always in the RESIZE state which slows down
operations and wastes some space.

Here are the timings for insert and find in nanoseconds on a
Macbook Pro. The insert_resize figure includes the resize work to
move one item from the old to the new hash bucket array.

insert_resize: 945.98
insert_clean: 450.39
find_resize: 348.90
find_clean: 293.16

The attached dht-v2-resize-cleanup.patch patch applies on top of
the dht-v2.patch and speeds up the resize cleanup process so that
hashes spend most of their time in the clean state.

It does this by cleaning up more than one old item during
inserts. This patch cleans up three old items.

There is also the case where a hash receives all of its inserts
at the beginning and the rest of the work load is all finds. This
patch also cleans up two items for each find.

The downside of doing extra cleanup early is some additional
latency. Here are the experimental figures and the approximate
formulas for different numbers of cleanups for inserts. Note that
we cannot go lower than one cleanup per insert.

Resize work in inserts: 729.79 insert + 216.19 * cleanups
1 945.98
2 1178.13
3 1388.73
4 1617.04
5 1796.91

Here are the timings for different numbers of cleanups for finds.
Note that since we do not hold an exclusive partition lock on a find
there is the additional overhead of taking that lock.

Resize work in finds: 348.90 dirty_find + 233.45 lock + 275.78 * cleanups
0 348.90
1 872.88
2 1138.98
3 1448.94
4 1665.31
5 1975.91

The new effective latencies during the resize vs. the clean states.

#define DHT_INSERT_CLEANS 3
#define DHT_SEARCH_CLEANS 2

insert_resize: 1388.73 -- 3 cleaned
insert_clean: 450.39
find_resize: 1138.98 -- 2 cleaned
find_clean: 293.16

The overall performance will be faster due to not having to examine
more than one hash bucket array most of the time.

--

John Gorman
http://www.enterprisedb.com

Attachments:

dht-v2-resize-cleanup.patchapplication/octet-stream; name=dht-v2-resize-cleanup.patchDownload
diff --git a/src/backend/storage/ipc/dht.c b/src/backend/storage/ipc/dht.c
index 1fc6aa1..c0f68fc 100644
--- a/src/backend/storage/ipc/dht.c
+++ b/src/backend/storage/ipc/dht.c
@@ -73,6 +73,12 @@ struct dht_hash_table_item
 /* A magic value used to identify DHT hash tables. */
 #define DHT_MAGIC 0x75ff6a20
 
+/* Resize cleanup operations per insert in dht_release(exclusive). */
+#define DHT_INSERT_CLEANS 3
+
+/* Resize cleanup operations per find in dht_release(shared). */
+#define DHT_SEARCH_CLEANS 2
+
 /*
  * Tracking information for each lock partition.  Initially, each partition
  * corresponds to one bucket, but each time the hash table grows, the buckets
@@ -613,21 +619,40 @@ dht_release(dht_hash_table *hash_table, void *entry)
 	 * already hold a lock, but if not, we'll make a note to search other lock
 	 * partitions after releasing the lock we currently hold.
 	 */
-	if (RESIZE_IN_PROGRESS(hash_table) && hash_table->exclusively_locked)
+	if (RESIZE_IN_PROGRESS(hash_table))
 	{
-		dht_partition *partition;
+		if (hash_table->exclusively_locked)
+		{
+			dht_partition *partition;
 
-		partition = &hash_table->control->partitions[partition_index];
+			partition = &hash_table->control->partitions[partition_index];
 
-		if (partition->old_count > 0)
-		{
-			/* Do some resizing work in this partition. */
-			do_incremental_resize(hash_table, partition_index);
+			if (partition->old_count > 0)
+			{
+				/* Do some resizing work in this partition. */
+				int ii = 0;
+
+				while (true)
+				{
+					/* We always do at least one. */
+					do_incremental_resize(hash_table, partition_index);
+					if (partition->old_count <= 0)
+						break;
+					ii++;
+					if (ii >= DHT_INSERT_CLEANS)
+						break;
+				}
+			}
+			else
+			{
+				/* Nothing left to do in this partition.  Look elsewhere. */
+				deferred_resize_work = true;
+			}
 		}
 		else
 		{
-			/* Nothing left to do in this partition.  Look elsewhere. */
-			deferred_resize_work = true;
+			/* Do cleanups after dht_find() ... dht_release() */
+			deferred_resize_work = (DHT_SEARCH_CLEANS > 0);
 		}
 	}
 
@@ -636,8 +661,9 @@ dht_release(dht_hash_table *hash_table, void *entry)
 
 	/*
 	 * If we need to contribute some work to the ongoing resize effort, and we
-	 * didn't find something to migrate in our own partition, go search for
-	 * such work in other partitions.  We'll start with the one after ours.
+	 * didn't find something to migrate in our own partition under exclusive
+	 * lock, go search for such work in other partitions.  We'll start with
+	 * the one after ours.
 	 */
 	if (deferred_resize_work)
 		search_for_resize_work(hash_table,
@@ -1240,7 +1266,18 @@ search_for_resize_work(dht_hash_table *hash_table, int start_partition)
 			else if (partition->old_count > 0)
 			{
 				/* There is in fact work to be done in this partition. */
-				do_incremental_resize(hash_table, i);
+				int ii = 0;
+
+				while (true)
+				{
+					/* We always do at least one. */
+					do_incremental_resize(hash_table, i);
+					if (partition->old_count <= 0)
+						break;
+					ii++;
+					if (ii >= DHT_SEARCH_CLEANS)
+						break;
+				}
 				done = true;
 			}
 			LWLockRelease(PARTITION_LOCK(hash_table, i));
#11Haribabu Kommi
kommi.haribabu@gmail.com
In reply to: John Gorman (#10)
Re: Hash tables in dynamic shared memory

On Sun, Nov 20, 2016 at 9:54 AM, John Gorman <johngorman2@gmail.com> wrote:

I reviewed the dht-v2.patch and found that it is in excellent shape.

The overall performance will be faster due to not having to examine
more than one hash bucket array most of the time.

Reviewer didn't find any problems in the approach of the patch. Provided
some
improvements on the approach to the author to respond.

Moved to next CF with "waiting on author" state.

Regards,
Hari Babu
Fujitsu Australia

#12Thomas Munro
thomas.munro@enterprisedb.com
In reply to: John Gorman (#10)
Re: Hash tables in dynamic shared memory

On Sun, Nov 20, 2016 at 11:54 AM, John Gorman <johngorman2@gmail.com> wrote:

I reviewed the dht-v2.patch and found that it is in excellent shape.

Thanks for reviewing! And sorry for the late reply.

Benchmarking shows that this performs somewhat faster than
dynahash which is surprising because it is doing DSA address
translations on the fly.

That is indeed surprising and I think warrants more investigation.

One area where this could run faster is to reduce the amount of
time when the hash is in the RESIZE_IN_PROGRESS state.

When a hash partition reaches 75% full a resize begins by allocating
an empty new hash bucket array with double the number of buckets.
This begins the RESIZE_IN_PROGRESS state where there is both an
old and a new hash bucket array.

During the RESIZE state all operations such as find, insert,
delete and iterate run slower due to having to look items up in
two hash bucket arrays.

Whenever a new item is inserted into the hash and the hash is in
the RESIZE state the current code takes the time to move one item
from the old hash partition to the new hash partition. During
insertion an exclusive lock is already held on the partition so
this is an efficient time to do the resize cleanup work.

However since we only clean up one item per insertion we do not
complete the cleanup and free the old hash bucket array until we
are due to start yet another resize operation.

So we are effectively always in the RESIZE state which slows down
operations and wastes some space.

Right, that is the case in a workload that inserts a bunch of data but
then becomes read-only forever. A workload that constantly does a mix
of writing and reading should settle down at a reasonable size and
stop resizing.

Here are the timings for insert and find in nanoseconds on a
Macbook Pro. The insert_resize figure includes the resize work to
move one item from the old to the new hash bucket array.

insert_resize: 945.98
insert_clean: 450.39
find_resize: 348.90
find_clean: 293.16

The attached dht-v2-resize-cleanup.patch patch applies on top of
the dht-v2.patch and speeds up the resize cleanup process so that
hashes spend most of their time in the clean state.

It does this by cleaning up more than one old item during
inserts. This patch cleans up three old items.

There is also the case where a hash receives all of its inserts
at the beginning and the rest of the work load is all finds. This
patch also cleans up two items for each find.

The downside of doing extra cleanup early is some additional
latency. Here are the experimental figures and the approximate
formulas for different numbers of cleanups for inserts. Note that
we cannot go lower than one cleanup per insert.

Resize work in inserts: 729.79 insert + 216.19 * cleanups
1 945.98
2 1178.13
3 1388.73
4 1617.04
5 1796.91

Here are the timings for different numbers of cleanups for finds.
Note that since we do not hold an exclusive partition lock on a find
there is the additional overhead of taking that lock.

Resize work in finds: 348.90 dirty_find + 233.45 lock + 275.78 * cleanups
0 348.90
1 872.88
2 1138.98
3 1448.94
4 1665.31
5 1975.91

The new effective latencies during the resize vs. the clean states.

#define DHT_INSERT_CLEANS 3
#define DHT_SEARCH_CLEANS 2

insert_resize: 1388.73 -- 3 cleaned
insert_clean: 450.39
find_resize: 1138.98 -- 2 cleaned
find_clean: 293.16

The overall performance will be faster due to not having to examine
more than one hash bucket array most of the time.

Thanks for doing all these experiments. Yeah, I think it makes sense
to merge this change to improve that case.

Since Dilip Kumar's Parallel Bitmap Heap Scan project is no longer
using this, I think I should park it here unless/until another
potential use case pops up. Some interesting candidates have been
mentioned already, and I'm fairly sure there are other uses too, but I
don't expect anyone to be interested in committing this patch until
something concrete shows up, so I'll go work on other things until
then.

--
Thomas Munro
http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#13Thomas Munro
thomas.munro@enterprisedb.com
In reply to: Thomas Munro (#12)
1 attachment(s)
Re: Hash tables in dynamic shared memory

On Mon, Dec 19, 2016 at 12:33 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:

Since Dilip Kumar's Parallel Bitmap Heap Scan project is no longer
using this, I think I should park it here unless/until another
potential use case pops up. Some interesting candidates have been
mentioned already, and I'm fairly sure there are other uses too, but I
don't expect anyone to be interested in committing this patch until
something concrete shows up, so I'll go work on other things until
then.

Here is a rebased version. Changes since v2:

1. Out-of-memory conditions now raise errors (following
dsa_allocate's change in behaviour to match palloc).
2. Moved into 'lib' where other reusable data structures live.

There are still a couple of things I'd like to adjust in this code
(including feedback from John and improvements to the iterator code
which is overcomplicated) but I'm posting it today as infrastructure
for another patch.

--
Thomas Munro
http://www.enterprisedb.com

Attachments:

dht-v3.patchapplication/octet-stream; name=dht-v3.patchDownload
diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index f222c6c20d1..7033dfb1aa8 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/lib
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = binaryheap.o bipartite_match.o hyperloglog.o ilist.o knapsack.o \
+OBJS = binaryheap.o bipartite_match.o dht.o hyperloglog.o ilist.o knapsack.o \
        pairingheap.o rbtree.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/dht.c b/src/backend/lib/dht.c
new file mode 100644
index 00000000000..2fec70f7742
--- /dev/null
+++ b/src/backend/lib/dht.c
@@ -0,0 +1,1365 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.c
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * This is an open hashing hash table, with a linked list at each table
+ * entry.  It supports dynamic resizing, as required to prevent the linked
+ * lists from growing too long on average.  Currently, only growing is
+ * supported: the hash table never becomes smaller.
+ *
+ * To deal with currency, it has a fixed size set of partitions, each of which
+ * is independently locked.  Each bucket maps to a partition; so insert, find
+ * and iterate operations normally only acquire one lock.  Therefore, good
+ * concurrency is achieved whenever they don't collide at the lock partition
+ * level.  However, when a resize operation begins, all partition locks must
+ * be acquired simultaneously for a brief period.  This is only expected to
+ * happen a small number of times until a stable size is found, since growth is
+ * geometric.
+ *
+ * Resizing is done incrementally so that no individual insert operation pays
+ * for the potentially large cost of splitting all buckets.  A resize
+ * operation begins when any partition exceeds a certain load factor, and
+ * splits every bucket into two new buckets but doesn't yet transfer any items
+ * into the new buckets.  Then, future dht_release operations involving an
+ * exclusive lock transfer one item from an old bucket into the appropriate
+ * new bucket.  If possible, an item from the already-locked partition is
+ * migrated.  If none remain, a lock on another partition will be acquired and
+ * one item from that partition will be migrated.  This continues until there
+ * are no items left in old buckets, and the resize operation is finished.
+ * While a resize is in progress, find, insert and delete operations must look
+ * in a new bucket and an old bucket, but this is not too expensive since they
+ * are covered by the same lock.  That is, a given hash value maps to the same
+ * lock partition, no matter how many times buckets have been split.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/dht.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "lib/dht.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "utils/dsa.h"
+#include "utils/memutils.h"
+
+/*
+ * An item in the hash table.  This wraps the user's entry object in an
+ * envelop that holds a pointer back to the bucket and a pointer to the next
+ * item in the bucket.
+ */
+struct dht_hash_table_item
+{
+	/* The hashed key, to avoid having to recompute it. */
+	dht_hash	hash;
+	/* The next item in the same bucket. */
+	dsa_pointer next;
+	/* The user's entry object follows here. */
+	char		entry[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* The number of partitions for locking purposes. */
+#define DHT_NUM_PARTITIONS_LOG2 7
+#define DHT_NUM_PARTITIONS (1 << DHT_NUM_PARTITIONS_LOG2)
+
+/* A magic value used to identify DHT hash tables. */
+#define DHT_MAGIC 0x75ff6a20
+
+/*
+ * Tracking information for each lock partition.  Initially, each partition
+ * corresponds to one bucket, but each time the hash table grows, the buckets
+ * covered by each partition split so the number of buckets covered doubles.
+ *
+ * We might want to add padding here so that each partition is on a different
+ * cache line, but doing so would bloat this structure considerably.
+ */
+typedef struct
+{
+	LWLock		lock;			/* Protects all buckets in this partition. */
+	size_t		count;			/* # of items in active buckets */
+	size_t		old_count;		/* # of items in old buckets */
+	Index		next_bucket_to_split;	/* do_increment_resize progress
+										 * tracker */
+} dht_partition;
+
+/*
+ * The head object for a hash table.  This will be stored in dynamic shared
+ * memory.
+ */
+typedef struct
+{
+	dht_hash_table_handle handle;
+	uint32		magic;
+	dht_partition partitions[DHT_NUM_PARTITIONS];
+	int			lwlock_tranche_id;
+
+	/*
+	 * The following members are written to only when ALL partitions locks are
+	 * held.  They can be read when any one partition lock is held.
+	 */
+
+	/* Number of buckets expressed as power of 2 (8 = 256 buckets). */
+	size_t		size_log2;		/* log2(number of buckets) */
+	dsa_pointer buckets;		/* current bucket array */
+	dsa_pointer old_buckets;	/* previous bucket array, if splitting */
+}	dht_hash_table_control;
+
+/*
+ * Per-backend state for a dynamic hash table.
+ */
+struct dht_hash_table
+{
+	dsa_area   *area;			/* Backing dynamic shared memory area. */
+	dht_parameters params;		/* Parameters. */
+	dht_hash_table_control *control;	/* Control object in DSM. */
+	dsa_pointer *buckets;		/* Current bucket pointers in DSM. */
+	size_t		size_log2;		/* log2(number of buckets) */
+	dsa_pointer *old_buckets;	/* Old bucket pointers in DSM. */
+	bool		exclusively_locked;		/* Is an exclusive partition lock
+										 * held? */
+};
+
+/* Given a pointer to an entry, find the pointer to the item that holds it. */
+#define ITEM_FROM_ENTRY(entry)										\
+	((dht_hash_table_item *)((char *)entry -						\
+							 offsetof(dht_hash_table_item, entry)))
+
+/* How many resize operations (bucket splits) have there been? */
+#define NUM_SPLITS(size_log2)					\
+	(size_log2 - DHT_NUM_PARTITIONS_LOG2)
+
+/* How many buckets are there in each partition at a given size? */
+#define BUCKETS_PER_PARTITION(size_log2)		\
+	(1 << NUM_SPLITS(size_log2))
+
+/* Max entries before we need to grow.  Half + quarter = 75% load factor. */
+#define MAX_COUNT_PER_PARTITION(hash_table)				\
+	(BUCKETS_PER_PARTITION(hash_table->size_log2) / 2 + \
+	 BUCKETS_PER_PARTITION(hash_table->size_log2) / 4)
+
+/* Choose partition based on the highest order bits of the hash. */
+#define PARTITION_FOR_HASH(hash)										\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - DHT_NUM_PARTITIONS_LOG2))
+
+/*
+ * Find the bucket index for a given hash and table size.  Each time the table
+ * doubles in size, the appropriate bucket for a given hash value doubles and
+ * possibly adds one, depending on the newly revealed bit, so that all buckets
+ * are split.
+ */
+#define BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, size_log2)		\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - (size_log2)))
+
+/* The index of the first bucket in a given partition. */
+#define BUCKET_INDEX_FOR_PARTITION(partition, size_log2)	\
+	((partition) << NUM_SPLITS(size_log2))
+
+/* The head of the active bucket for a given hash value (lvalue). */
+#define BUCKET_FOR_HASH(hash_table, hash)								\
+	(hash_table->buckets[												\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2)])
+
+/* The head of the old bucket for a given hash value (lvalue). */
+#define OLD_BUCKET_FOR_HASH(hash_table, hash)							\
+	(hash_table->old_buckets[											\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2 - 1)])
+
+/* Is a resize currently in progress? */
+#define RESIZE_IN_PROGRESS(hash_table)			\
+	(hash_table->old_buckets != NULL)
+
+static void delete_item(dht_hash_table *hash_table, dht_hash_table_item * item);
+static void advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next);
+static dsa_pointer next_in_bucket(dht_iterator *iterator);
+static void begin_resize(dht_hash_table *hash_table, size_t new_size);
+static void finish_resize(dht_hash_table *hash_table);
+static void do_incremental_resize(dht_hash_table *hash_table, int p);
+static void search_for_resize_work(dht_hash_table *hash_table,
+					   int start_partition);
+static inline void ensure_valid_bucket_pointers(dht_hash_table *hash_table);
+static inline dht_hash_table_item *find_in_bucket(dht_hash_table *hash_table,
+			   const void *key,
+			   dsa_pointer item_pointer);
+static void insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket);
+static dht_hash_table_item *insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket);
+static bool delete_key_from_bucket(dht_hash_table *hash_table, const void *key,
+					   dsa_pointer *bucket_head);
+static bool delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head);
+
+#define PARTITION_LOCK(hash_table, i)			\
+	(&(hash_table)->control->partitions[(i)].lock)
+
+/*
+ * Create a new hash table backed by the given dynamic shared area, with the
+ * given parameters.
+ */
+dht_hash_table *
+dht_create(dsa_area *area, const dht_parameters *params)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Allocate the control object in shared memory. */
+	control = dsa_allocate(area, sizeof(dht_hash_table_control));
+
+	/* Set up the local and shared hash table structs. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->control->handle = control;
+	hash_table->control->magic = DHT_MAGIC;
+	hash_table->control->lwlock_tranche_id = params->tranche_id;
+
+	/* Set up the array of lock partitions. */
+	{
+		int			i;
+
+		for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		{
+			LWLockInitialize(PARTITION_LOCK(hash_table, i),
+							 hash_table->control->lwlock_tranche_id);
+			hash_table->control->partitions[i].count = 0;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+
+	/*
+	 * Set up the initial array of buckets.  Our initial size is the same as
+	 * the number of partitions.
+	 */
+	hash_table->control->size_log2 = DHT_NUM_PARTITIONS_LOG2;
+	hash_table->control->buckets =
+		dsa_allocate(area, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+	hash_table->buckets = dsa_get_address(area,
+										  hash_table->control->buckets);
+	hash_table->control->old_buckets = InvalidDsaPointer;
+	hash_table->old_buckets = NULL;
+	memset(hash_table->buckets, 0, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+
+	return hash_table;
+}
+
+/*
+ * Attach to an existing hash table using a handle.
+ */
+dht_hash_table *
+dht_attach(dsa_area *area, const dht_parameters *params,
+		   dht_hash_table_handle handle)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Find the control object in shared memory. */
+	control = handle;
+
+	/* Set up the local hash table struct. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->exclusively_locked = false;
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	return hash_table;
+}
+
+/*
+ * Detach from a hash table.  This frees backend-local resources associated
+ * with the hash table, but the hash table will continue to exist until it is
+ * either explicitly destroyed (by a backend that is still attached to it), or
+ * the area that backs it is returned to the operating system.
+ */
+void
+dht_detach(dht_hash_table *hash_table)
+{
+	/* The hash table may have been destroyed.  Just free local memory. */
+	pfree(hash_table);
+}
+
+/*
+ * Destroy a hash table, returning all memory to the area.  The caller must be
+ * certain that no other backend will attempt to access the hash table before
+ * calling this function.  Other backend must explicitly call dht_detach to
+ * free up backend-local memory associated with the hash table.  The backend
+ * that calls dht_destroy must not call dht_detach.
+ */
+void
+dht_destroy(dht_hash_table *hash_table)
+{
+	dht_iterator iterator;
+	void	   *entry;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/* Free all the entries. */
+	dht_iterate_begin(hash_table, &iterator, true);
+	while ((entry = dht_iterate_next(&iterator)))
+		dht_iterate_delete(&iterator);
+	dht_iterate_end(&iterator);
+
+	/*
+	 * Vandalize the control block to help catch programming errors where
+	 * other backends access the memory formerly occupied by this hash table.
+	 */
+	hash_table->control->magic = 0;
+
+	/* Free the active and (if appropriate) old table, and control object. */
+	if (DsaPointerIsValid(hash_table->old_buckets))
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+	dsa_free(hash_table->area, hash_table->control->buckets);
+	dsa_free(hash_table->area, hash_table->control->handle);
+
+	pfree(hash_table);
+}
+
+/*
+ * Get a handle that can be used by other processes to attach to this hash
+ * table.
+ */
+dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	return hash_table->control->handle;
+}
+
+/*
+ * Look up an entry, given a key.  Returns a pointer to an entry if one can be
+ * found with the given key.  Returns NULL if the key is not found.  If a
+ * non-NULL value is returned, the entry is locked and must be released by
+ * calling dht_release.  If an error is raised before dht_release is called,
+ * the lock will be released automatically, but the caller must take care to
+ * ensure that the entry is not left corrupted.  The lock mode is either
+ * shared or exclusive depending on 'exclusive'.
+ *
+ * Note that the lock held is in fact an LWLock, so interrupts will be held
+ * on return from this function, and not resumed until dht_release is called.
+ * It is a very good idea for the caller to release the lock quickly.
+ */
+void *
+dht_find(dht_hash_table *hash_table, const void *key, bool exclusive)
+{
+	size_t		hash;
+	size_t		partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition = PARTITION_FOR_HASH(hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition),
+				  exclusive ? LW_EXCLUSIVE : LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (!item)
+	{
+		/* Not found. */
+		LWLockRelease(PARTITION_LOCK(hash_table, partition));
+		return NULL;
+	}
+	else
+	{
+		/* The caller will free the lock by calling dht_release. */
+		hash_table->exclusively_locked = exclusive;
+		return &item->entry;
+	}
+}
+
+/*
+ * Returns a pointer to an exclusively locked item which must be released with
+ * dht_release.  If the key is found in the hash table, 'found' is set to true
+ * and a pointer to the existing entry is returned.  If the key is not found,
+ * 'found' is set to false, and a pointer to a newly created entry is related.
+ */
+void *
+dht_find_or_insert(dht_hash_table *hash_table,
+				   const void *key,
+				   bool *found)
+{
+	size_t		hash;
+	size_t		partition_index;
+	dht_partition *partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition_index = PARTITION_FOR_HASH(hash);
+	partition = &hash_table->control->partitions[partition_index];
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+restart:
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition_index),
+				  LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (item)
+		*found = true;
+	else
+	{
+		*found = false;
+
+		/* Check if we are getting too full. */
+		if (partition->count > MAX_COUNT_PER_PARTITION(hash_table) &&
+			!RESIZE_IN_PROGRESS(hash_table))
+		{
+			/*
+			 * The load factor (= keys / buckets) for all buckets protected by
+			 * this partition is > 0.75.  Presumably the same applies
+			 * generally across the whole hash table (though we don't attempt
+			 * to track that directly to avoid contention on some kind of
+			 * central counter; we just assume that this partition is
+			 * representative).  This is a good time to start a new resize
+			 * operation, if there isn't one already in progress.
+			 *
+			 * Give up our existing lock first, because resizing needs to
+			 * reacquire all the locks in the right order.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+			begin_resize(hash_table, hash_table->size_log2 + 1);
+
+			goto restart;
+		}
+
+		/* Finally we can try to insert the new item. */
+		item = insert_into_bucket(hash_table, key,
+								  &BUCKET_FOR_HASH(hash_table, hash));
+		item->hash = hash;
+		/* Adjust per-lock-partition counter for load factor knowledge. */
+		++partition->count;
+	}
+
+	/* The caller must release the lock with dht_release. */
+	hash_table->exclusively_locked = true;
+	return &item->entry;
+}
+
+/*
+ * Remove an entry by key.  Returns true if the key was found and the
+ * corresponding entry was removed.
+ *
+ * To delete an entry that you already have a pointer to, see
+ * dht_delete_entry.
+ */
+bool
+dht_delete_key(dht_hash_table *hash_table, const void *key)
+{
+	size_t		hash;
+	size_t		partition;
+	bool		found;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	partition = PARTITION_FOR_HASH(hash);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition), LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_key_from_bucket(hash_table, key,
+							   &BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].count;
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_key_from_bucket(hash_table, key,
+									&OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].old_count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].old_count;
+	}
+	else
+		found = false;
+
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+
+	return found;
+}
+
+/*
+ * Remove an entry.  The entry must already be exclusively locked, and must
+ * have been obtained by dht_find or dht_find_or_insert.  Note that this
+ * function releases the lock just like dht_release.
+ *
+ * To delete an entry by key, see dht_delete_key.  To delete an entry while
+ * iterating, see dht_iterator_delete.
+ */
+void
+dht_delete_entry(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	size_t		partition = PARTITION_FOR_HASH(item->hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(hash_table->exclusively_locked);
+
+	delete_item(hash_table, item);
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+}
+
+/*
+ * Unlock an entry which was locked by dht_find or dht_find_or_insert.
+ */
+void
+dht_release(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	size_t		partition_index = PARTITION_FOR_HASH(item->hash);
+	bool		deferred_resize_work = false;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/*
+	 * If there is a resize in progress and we took an exclusive lock, try to
+	 * migrate a key from the old bucket array to the new bucket array.
+	 * Ideally, we'll find some work to do in the partition on which we
+	 * already hold a lock, but if not, we'll make a note to search other lock
+	 * partitions after releasing the lock we currently hold.
+	 */
+	if (RESIZE_IN_PROGRESS(hash_table) && hash_table->exclusively_locked)
+	{
+		dht_partition *partition;
+
+		partition = &hash_table->control->partitions[partition_index];
+
+		if (partition->old_count > 0)
+		{
+			/* Do some resizing work in this partition. */
+			do_incremental_resize(hash_table, partition_index);
+		}
+		else
+		{
+			/* Nothing left to do in this partition.  Look elsewhere. */
+			deferred_resize_work = true;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+
+	/*
+	 * If we need to contribute some work to the ongoing resize effort, and we
+	 * didn't find something to migrate in our own partition, go search for
+	 * such work in other partitions.  We'll start with the one after ours.
+	 */
+	if (deferred_resize_work)
+		search_for_resize_work(hash_table,
+							   (partition_index + 1) % DHT_NUM_PARTITIONS);
+}
+
+/*
+ * Begin iterating through the whole hash table.  The caller must supply a
+ * dht_iterator object, which can then be used to call dht_iterate_next to get
+ * values until the end is reached.
+ */
+void
+dht_iterate_begin(dht_hash_table *hash_table,
+				  dht_iterator *iterator,
+				  bool exclusive)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	iterator->hash_table = hash_table;
+	iterator->exclusive = exclusive;
+	iterator->partition = 0;
+	iterator->bucket = 0;
+	iterator->item = NULL;
+	iterator->last_item_pointer = InvalidDsaPointer;
+	iterator->locked = false;
+
+	/* Snapshot the size (arbitrary lock to prevent size changing). */
+	LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+	iterator->table_size_log2 = hash_table->size_log2;
+	LWLockRelease(PARTITION_LOCK(hash_table, 0));
+}
+
+/*
+ * Delete the entry that was most recently returned by dht_iterate_next.  The
+ * iterator must have been initialized in exclusive lock mode.
+ */
+void
+dht_iterate_delete(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	Assert(iterator->exclusive);
+	Assert(iterator->item != NULL);
+	delete_item(iterator->hash_table, iterator->item);
+	iterator->item = NULL;
+}
+
+/*
+ * Move to the next item in the hash table.  Returns a pointer to an entry, or
+ * NULL if the end of the hash table has been reached.  The item is locked in
+ * exclusive or shared mode depending on the argument given to
+ * dht_iterate_begin.  The caller can optionally release the lock by calling
+ * dht_iterate_release, and then call dht_iterate_next again to move to the
+ * next entry.  If the iteration is in exclusive mode, client code can also
+ * call dht_iterate_delete.  When the end of the hash table is reached, or at
+ * any time, the client may call dht_iterate_end to abandon iteration.
+ */
+void *
+dht_iterate_next(dht_iterator *iterator)
+{
+	dsa_pointer item_pointer;
+
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+
+	while (iterator->partition < DHT_NUM_PARTITIONS)
+	{
+		if (!iterator->locked)
+		{
+			LWLockAcquire(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition),
+						  iterator->exclusive ? LW_EXCLUSIVE : LW_SHARED);
+			iterator->locked = true;
+		}
+
+		item_pointer = next_in_bucket(iterator);
+		if (DsaPointerIsValid(item_pointer))
+		{
+			/* Remember this item, so that we can step over it next time. */
+			iterator->last_item_pointer = item_pointer;
+			iterator->item = dsa_get_address(iterator->hash_table->area,
+											 item_pointer);
+			return &(iterator->item->entry);
+		}
+
+		/* We have reached the end of the bucket. */
+		iterator->last_item_pointer = InvalidDsaPointer;
+		iterator->bucket += 1;
+
+		/*
+		 * If the last bucket was split while we were iterating, then we have
+		 * been doing some expensive bucket merging to get each item.  Now
+		 * that we've completed that whole pre-split bucket (by merging all
+		 * its descendants), we can update the table size of our iterator and
+		 * start iterating cheaply again.
+		 */
+		iterator->bucket <<=
+			iterator->hash_table->size_log2 - iterator->table_size_log2;
+		iterator->table_size_log2 = iterator->hash_table->size_log2;
+		/* Have we gone past the last bucket in this partition? */
+		if (iterator->bucket >=
+			BUCKET_INDEX_FOR_PARTITION(iterator->partition + 1,
+									   iterator->table_size_log2))
+		{
+			/* No more buckets.  Try next partition. */
+			LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition));
+			iterator->locked = false;
+			++iterator->partition;
+			iterator->bucket =
+				BUCKET_INDEX_FOR_PARTITION(iterator->partition,
+										   iterator->table_size_log2);
+		}
+	}
+	iterator->item = NULL;
+	return NULL;
+}
+
+/*
+ * Release the most recently obtained lock.  This can optionally be called in
+ * between calls to dht_iterator_next to allow other processes to access the
+ * same partition of the hash table.
+ */
+void
+dht_iterate_release(dht_iterator *iterator)
+{
+	Assert(iterator->locked);
+	LWLockRelease(PARTITION_LOCK(iterator->hash_table, iterator->partition));
+	iterator->locked = false;
+}
+
+/*
+ * Terminate iteration.  This must be called after iteration completes,
+ * whether or not the end was reached.  The iterator object may then be reused
+ * for another iteration.
+ */
+void
+dht_iterate_end(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	if (iterator->locked)
+		LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+									 iterator->partition));
+}
+
+/*
+ * Print out debugging information about the internal state of the hash table.
+ */
+void
+dht_dump(dht_hash_table *hash_table)
+{
+	size_t		i;
+	size_t		j;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
+
+	ensure_valid_bucket_pointers(hash_table);
+
+	fprintf(stderr,
+			"hash table size = %zu\n", (size_t) 1 << hash_table->size_log2);
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+		size_t		begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2);
+		size_t		end = BUCKET_INDEX_FOR_PARTITION(i + 1, hash_table->size_log2);
+
+		fprintf(stderr, "  partition %zu\n", i);
+		fprintf(stderr,
+				"    active buckets (key count = %zu)\n", partition->count);
+
+		for (j = begin; j < end; ++j)
+		{
+			size_t		count = 0;
+			dsa_pointer bucket = hash_table->buckets[j];
+
+			while (DsaPointerIsValid(bucket))
+			{
+				dht_hash_table_item *item;
+
+				item = dsa_get_address(hash_table->area, bucket);
+
+				bucket = item->next;
+				++count;
+			}
+			fprintf(stderr, "      bucket %zu (key count = %zu)\n", j, count);
+		}
+		if (RESIZE_IN_PROGRESS(hash_table))
+		{
+			size_t		begin;
+			size_t		end;
+
+			begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2 - 1);
+			end = BUCKET_INDEX_FOR_PARTITION(i + 1,
+											 hash_table->size_log2 - 1);
+
+			fprintf(stderr, "    old buckets (key count = %zu)\n",
+					partition->old_count);
+
+			for (j = begin; j < end; ++j)
+			{
+				size_t		count = 0;
+				dsa_pointer bucket = hash_table->old_buckets[j];
+
+				while (DsaPointerIsValid(bucket))
+				{
+					dht_hash_table_item *item;
+
+					item = dsa_get_address(hash_table->area, bucket);
+
+					bucket = item->next;
+					++count;
+				}
+				fprintf(stderr,
+						"      bucket %zu (key count = %zu)\n", j, count);
+			}
+		}
+	}
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
+
+/*
+ * Delete a locked item to which we have a pointer.
+ */
+static void
+delete_item(dht_hash_table *hash_table, dht_hash_table_item * item)
+{
+	size_t		hash = item->hash;
+	size_t		partition = PARTITION_FOR_HASH(hash);
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, partition)));
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_item_from_bucket(hash_table, item,
+								&BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].count;
+		Assert(hash_table->control->partitions[partition].count >= 0);
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_item_from_bucket(hash_table, item,
+									 &OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].old_count;
+		Assert(hash_table->control->partitions[partition].old_count >= 0);
+	}
+	else
+	{
+		Assert(false);
+	}
+}
+
+/*
+ * Find the item that comes after iterator->last_item_pointer but before
+ * *next, and write it into *next.
+ *
+ * Calling this on several buckets allows us to 'merge' the items from a
+ * bucket with its ancestors and descendents during resize operations, and
+ * iterate through the items in all of them in a stable order, despite items
+ * being able to move while we iterate.
+ *
+ * This works because buckets are sorted by descending item pointer.
+ */
+static void
+advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next)
+{
+	dht_hash_table_item *item;
+
+	while (DsaPointerIsValid(bucket_head))
+	{
+		item = dsa_get_address(iterator->hash_table->area,
+							   bucket_head);
+		if ((!DsaPointerIsValid(iterator->last_item_pointer) ||
+			 bucket_head < iterator->last_item_pointer) &&
+			bucket_head > *next &&
+			BUCKET_INDEX_FOR_HASH_AND_SIZE(item->hash,
+										   iterator->table_size_log2) ==
+			iterator->bucket)
+		{
+			*next = bucket_head;
+			return;
+		}
+		bucket_head = item->next;
+	}
+}
+
+/*
+ * Find the next item in the bucket we are visiting.  If a resize is in
+ * progress or the bucket has been split since we started iterating over this
+ * bucket, we may need to combine items from multiple buckets to synthesize
+ * the original bucket.
+ */
+static dsa_pointer
+next_in_bucket(dht_iterator *iterator)
+{
+	size_t		bucket;
+	size_t		begin;
+	size_t		end;
+	dsa_pointer next = InvalidDsaPointer;
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition)));
+
+	/* Handle old buckets if a resize is in progress. */
+	if (RESIZE_IN_PROGRESS(iterator->hash_table))
+	{
+		if (iterator->table_size_log2 == iterator->hash_table->size_log2)
+		{
+			/*
+			 * The table hasn't grown since we started iterating, but there
+			 * was already a resize in progress so there may be items in an
+			 * old bucket that belong in the bucket we're iterating over which
+			 * haven't been migrated yet.
+			 */
+			advance_iterator(iterator,
+					 iterator->hash_table->old_buckets[iterator->bucket / 2],
+							 &next);
+		}
+		else
+		{
+			/*
+			 * The bucket has been split one or more times, and a resize is
+			 * still in progress so there may be items still in an old bucket.
+			 * If N splits have occurred, there will be 2^N current buckets
+			 * that correspond to the one original bucket.
+			 */
+			Assert(iterator->hash_table->size_log2 > iterator->table_size_log2);
+			begin = iterator->bucket <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			end = (iterator->bucket + 1) <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			for (bucket = begin; bucket < end; bucket += 1)
+				advance_iterator(iterator,
+								 iterator->hash_table->old_buckets[bucket],
+								 &next);
+		}
+	}
+
+	/*
+	 * Scan the bucket.  This is a loop because it may have split any number
+	 * of times since we started iterating through it, but in the common case
+	 * we just loop once.
+	 */
+	begin = iterator->bucket <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	end = (iterator->bucket + 1) <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	for (bucket = begin; bucket < end; ++bucket)
+		advance_iterator(iterator,
+						 iterator->hash_table->buckets[iterator->bucket],
+						 &next);
+
+	return next;
+}
+
+/*
+ * Grow the hash table if necessary to the requested number of buckets.  The
+ * requested size must be double some previously observed size.  Returns true
+ * if the table was successfully expanded or found to be big enough already
+ * (because another backend expanded it).  Returns false for out-of-memory.
+ * The process of transferring items from the old table to the new one will be
+ * carried out with a small tax on all future operations until the work is
+ * done.
+ */
+static void
+begin_resize(dht_hash_table *hash_table, size_t new_size_log2)
+{
+	int			i;
+	dsa_pointer new_buckets;
+
+	/*
+	 * Acquire the locks for all lock partitions.  This is expensive, but we
+	 * shouldn't have to do it many times.
+	 */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0 && hash_table->control->size_log2 >= new_size_log2)
+		{
+			/*
+			 * Another backend has already increased the size; we can avoid
+			 * obtaining all the locks and return early.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, 0));
+			return;
+		}
+	}
+
+	Assert(new_size_log2 == hash_table->control->size_log2 + 1);
+	Assert(!RESIZE_IN_PROGRESS(hash_table));
+
+	/* Allocate the space for the new table. */
+	new_buckets = dsa_allocate(hash_table->area,
+							   sizeof(dsa_pointer) * (1 << new_size_log2));
+	hash_table->control->old_buckets = hash_table->control->buckets;
+	hash_table->control->buckets = new_buckets;
+	hash_table->control->size_log2 = new_size_log2;
+	hash_table->buckets = dsa_get_address(hash_table->area,
+										  hash_table->control->buckets);
+	hash_table->old_buckets =
+		dsa_get_address(hash_table->area,
+						hash_table->control->old_buckets);
+	hash_table->size_log2 = new_size_log2;
+	/* InvalidDsaPointer in every bucket */
+	memset(hash_table->buckets, 0,
+		   sizeof(dsa_pointer) * (1 << new_size_log2));
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		partition->old_count = partition->count;
+		partition->count = 0;
+		partition->next_bucket_to_split =
+			BUCKET_INDEX_FOR_PARTITION(i, new_size_log2 - 1);
+	}
+
+	/* Release all the locks. */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
+
+/*
+ * Check if the resize operation is finished.  If it is, then the old table
+ * can be returned to the shared memory area.
+ */
+static void
+finish_resize(dht_hash_table *hash_table)
+{
+	int			i,
+				j;
+	bool		finished = true;
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0)
+		{
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* Another backend has already finished the resize. */
+				LWLockRelease(PARTITION_LOCK(hash_table, i));
+				return;
+			}
+		}
+
+		/*
+		 * We can only actually finish the resize when every partition has no
+		 * more keys in old buckets.  This might not be true yet, because
+		 * search_for_resize_work calls us when it thinks there might be no
+		 * more resizing work to do, but it could be wrong because it judges
+		 * that with an unsynchronized view of memory.  Now that we can
+		 * examine the synchronized counters, we can check if the resize
+		 * really is complete.
+		 */
+		if (partition->old_count > 0)
+		{
+			finished = false;
+			break;
+		}
+	}
+
+	if (finished)
+	{
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+		hash_table->control->old_buckets = InvalidDsaPointer;
+	}
+
+	/*
+	 * Only release as many locks as we acquired.  When finished is true, this
+	 * will be all of them, but if we exited the lock-acquire loop early it
+	 * might be fewer.
+	 */
+	for (j = 0; j < i; ++j)
+		LWLockRelease(PARTITION_LOCK(hash_table, j));
+}
+
+/*
+ * Migrate one item from partition 'p' from an old bucket to the appropriate
+ * new bucket.  The partition lock must already be held and there must be at
+ * least one item to be migrated.
+ */
+static void
+do_incremental_resize(dht_hash_table *hash_table, int p)
+{
+	size_t		old_size_log2 = hash_table->control->size_log2 - 1;
+	dht_partition *partition = &hash_table->control->partitions[p];
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, p)));
+	Assert(partition->old_count > 0);
+
+	/*
+	 * Walk through all the buckets that are covered by this partition until
+	 * we find one that is not empty, and migrate one item from it.
+	 */
+	for (;;)
+	{
+		dsa_pointer item_pointer;
+
+		item_pointer =
+			hash_table->old_buckets[partition->next_bucket_to_split];
+
+		if (DsaPointerIsValid(item_pointer))
+		{
+			dht_hash_table_item *item;
+
+			item = dsa_get_address(hash_table->area, item_pointer);
+
+			/* Remove it from the old bucket. */
+			hash_table->old_buckets[partition->next_bucket_to_split] =
+				item->next;
+			--partition->old_count;
+
+			/* Add it to the active bucket. */
+			insert_item_into_bucket(hash_table,
+									item_pointer,
+									item,
+									&BUCKET_FOR_HASH(hash_table, item->hash));
+			++partition->count;
+
+			return;
+		}
+		else
+		{
+			/* Move to the next bucket protected by this partition. */
+			partition->next_bucket_to_split += 1;
+
+			/*
+			 * We shouldn't be able to reach past the end of the partition
+			 * without finding an item.  That would imply that the partition's
+			 * old_count was corrupted.
+			 *
+			 * This is an elog() because we want to catch this even in builds
+			 * that are not Assert-enabled.
+			 */
+			if (partition->next_bucket_to_split >=
+				BUCKET_INDEX_FOR_PARTITION(p + 1, old_size_log2))
+				elog(ERROR, "dht corrupted");
+		}
+	}
+}
+
+/*
+ * Try to find a partition that has outstanding resizing work and do some of
+ * that work, possibly completing the resize operation.  The caller must hold
+ * no locks.
+ *
+ * Consider 'start_partition' first.  dht_release tries to spread this around
+ * to try to reduce contention.
+ */
+static void
+search_for_resize_work(dht_hash_table *hash_table, int start_partition)
+{
+	int			i = start_partition;
+
+	Assert(start_partition >= 0);
+	Assert(start_partition < DHT_NUM_PARTITIONS);
+
+	/*
+	 * Using a non-synchronized view of memory (no locks), see if we can find
+	 * any partition that looks like it still has keys to be migrated.
+	 */
+	do
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		if (partition->old_count > 0)
+		{
+			bool		done = false;
+
+			/* Now check again with a lock. */
+			LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* The resize finished underneath us. */
+				done = true;
+			}
+			else if (partition->old_count > 0)
+			{
+				/* There is in fact work to be done in this partition. */
+				do_incremental_resize(hash_table, i);
+				done = true;
+			}
+			LWLockRelease(PARTITION_LOCK(hash_table, i));
+
+			if (done)
+				return;
+		}
+		i = (i + 1) % DHT_NUM_PARTITIONS;
+	} while (i != start_partition);
+
+	/*
+	 * We didn't find any partitions that seem to have remaining migration
+	 * work, though we can't be 100% sure due to unsynchronized reads of
+	 * memory.  We will try to finish the resize, which may or may not
+	 * actually succeed, depending on what it finds the true state to be.
+	 */
+	finish_resize(hash_table);
+}
+
+/*
+ * Make sure that our backend-local bucket pointers are up to date.  The
+ * caller must have locked one lock partition, which prevents begin_resize()
+ * from running concurrently.
+ */
+static inline void
+ensure_valid_bucket_pointers(dht_hash_table *hash_table)
+{
+	if (hash_table->size_log2 != hash_table->control->size_log2)
+	{
+		/*
+		 * A resize has started since we last looked, so our pointers are out
+		 * of date.
+		 */
+		hash_table->buckets = dsa_get_address(hash_table->area,
+											  hash_table->control->buckets);
+		if (DsaPointerIsValid(hash_table->control->old_buckets))
+			hash_table->old_buckets =
+				dsa_get_address(hash_table->area,
+								hash_table->control->old_buckets);
+		else
+			hash_table->old_buckets = NULL;
+		hash_table->size_log2 = hash_table->control->size_log2;
+	}
+	if (hash_table->old_buckets &&
+		!DsaPointerIsValid(hash_table->control->old_buckets))
+	{
+		/* A resize has finished since we last looked. */
+		hash_table->old_buckets = NULL;
+	}
+}
+
+/*
+ * Scan a locked bucket for a match, using the provided compare-function.
+ */
+static inline dht_hash_table_item *
+find_in_bucket(dht_hash_table *hash_table, const void *key,
+			   dsa_pointer item_pointer)
+{
+	while (DsaPointerIsValid(item_pointer))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, item_pointer);
+		if (hash_table->params.compare_function(key, &item->entry,
+										   hash_table->params.key_size) == 0)
+			return item;
+		item_pointer = item->next;
+	}
+	return NULL;
+}
+
+/*
+ * Insert an already-allocated item into a bucket.
+ */
+static void
+insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket)
+{
+	Assert(item == dsa_get_address(hash_table->area, item_pointer));
+
+	/*
+	 * Find the insertion point for this item in the bucket.  Buckets are
+	 * sorted by descending dsa_pointer, as part of the protocol for ensuring
+	 * that iterators don't visit items twice when buckets are split during a
+	 * scan.  (We don't actually expect them to have more than 1 item unless
+	 * the hash function is of low quality.)
+	 */
+	while (*bucket > item_pointer)
+	{
+		dht_hash_table_item *this_item;
+
+		this_item = dsa_get_address(hash_table->area, *bucket);
+
+		bucket = &this_item->next;
+	}
+
+	item->next = *bucket;
+	*bucket = item_pointer;
+}
+
+/*
+ * Allocate space for an entry with the given key and insert it into the
+ * provided bucket.
+ */
+static dht_hash_table_item *
+insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket)
+{
+	dsa_pointer item_pointer;
+	dht_hash_table_item *item;
+
+	item_pointer = dsa_allocate(hash_table->area,
+								hash_table->params.entry_size +
+								offsetof(dht_hash_table_item, entry));
+	item = dsa_get_address(hash_table->area, item_pointer);
+	memcpy(&(item->entry), key, hash_table->params.key_size);
+	insert_item_into_bucket(hash_table, item_pointer, item, bucket);
+	return item;
+}
+
+/*
+ * Search a bucket for a matching key and delete it.
+ */
+static bool
+delete_key_from_bucket(dht_hash_table *hash_table,
+					   const void *key,
+					   dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (hash_table->params.compare_function(key, &item->entry,
+												hash_table->params.key_size)
+			== 0)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &item->next;
+	}
+	return false;
+}
+
+/*
+ * Delete the specified item from the bucket.
+ */
+static bool
+delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *bucket_item;
+
+		bucket_item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (bucket_item == item)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &bucket_item->next;
+	}
+	return false;
+}
diff --git a/src/include/lib/dht.h b/src/include/lib/dht.h
new file mode 100644
index 00000000000..fea8c5f7c71
--- /dev/null
+++ b/src/include/lib/dht.h
@@ -0,0 +1,114 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.h
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/include/lib/dht.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DHT_H
+#define DHT_H
+
+#include "utils/dsa.h"
+
+/* The opaque type representing a hash table. */
+struct dht_hash_table;
+typedef struct dht_hash_table dht_hash_table;
+
+/* The opaque type used for tracking iterator state. */
+struct dht_iterator;
+typedef struct dht_iterator dht_iterator;
+
+/* A handle for a dht_hash_table which can be shared with other processes. */
+typedef dsa_pointer dht_hash_table_handle;
+
+/* The type for hash values. */
+typedef uint32 dht_hash;
+
+/*
+ * The function type for computing hash values for keys.  Compatible with
+ * HashValueFunc in hsearch.h and hash functions like tag_hash.
+ */
+typedef dht_hash (*dht_hash_function)(const void *v, Size size);
+
+/*
+ * The function used for comparing keys.  Compatible with HashCompareFunction
+ * in hsearch.h and memcmp.
+ */
+typedef int (*dht_compare_function)(const void *a, const void *b, Size size);
+
+/*
+ * The set of parameters needed to create or attach to a hash table.  The
+ * members tranche_id and tranche_name do not need to be initialized when
+ * attaching to an existing hash table.
+ */
+typedef struct
+{
+	Size key_size;				/* Size of the key (initial bytes of entry) */
+	Size entry_size;			/* Total size of entry */
+	dht_compare_function compare_function;	/* Compare function */
+	dht_hash_function hash_function;	/* Hash function */
+	int tranche_id;				/* The tranche ID to use for locks. */
+} dht_parameters;
+
+/* Forward declaration of private types for use only by dht.c. */
+struct dht_hash_table_bucket;
+struct dht_hash_table_item;
+typedef struct dht_hash_table_item dht_hash_table_item;
+typedef struct dht_hash_table_bucket dht_hash_table_bucket;
+
+/*
+ * The state used to track a walk over all entries in a hash table.  The
+ * members of this struct are only for use by code in dht.c, but it is
+ * included in the header because it's useful to be able to create objects of
+ * this type on the stack to pass into the dht_iterator_XXX functions.
+ */
+struct dht_iterator
+{
+	dht_hash_table *hash_table;	/* The hash table we are iterating over. */
+	bool exclusive;				/* Whether to lock buckets exclusively. */
+	Size partition;				/* The index of the next partition to visit. */
+	Size bucket;				/* The index of the next bucket to visit. */
+	dht_hash_table_item *item;  /* The most recently returned item. */
+	dsa_pointer last_item_pointer; /* The last item visited. */
+	Size table_size_log2; 	/* The table size when we started iterating. */
+	bool locked;			/* Whether the current partition is locked. */
+};
+
+/* Creating, sharing and destroying from hash tables. */
+extern dht_hash_table *dht_create(dsa_area *area,
+								  const dht_parameters *params);
+extern dht_hash_table *dht_attach(dsa_area *area,
+								  const dht_parameters *params,
+								  dht_hash_table_handle handle);
+extern void dht_detach(dht_hash_table *hash_table);
+extern dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table);
+extern void dht_destroy(dht_hash_table *hash_table);
+
+/* Iterating over the whole hash table. */
+extern void dht_iterate_begin(dht_hash_table *hash_table,
+							  dht_iterator *iterator, bool exclusive);
+extern void *dht_iterate_next(dht_iterator *iterator);
+extern void dht_iterate_delete(dht_iterator *iterator);
+extern void dht_iterate_release(dht_iterator *iterator);
+extern void dht_iterate_end(dht_iterator *iterator);
+
+/* Finding, creating, deleting entries. */
+extern void *dht_find(dht_hash_table *hash_table,
+					  const void *key, bool exclusive);
+extern void *dht_find_or_insert(dht_hash_table *hash_table,
+								const void *key, bool *found);
+extern bool dht_delete_key(dht_hash_table *hash_table, const void *key);
+extern void dht_delete_entry(dht_hash_table *hash_table, void *entry);
+extern void dht_release(dht_hash_table *hash_table, void *entry);
+
+/* Debugging support. */
+extern void dht_dump(dht_hash_table *hash_table);
+
+#endif   /* DHT_H */