From f1e033f308b9bbd2f1b1a3bb4a71f0fe2c538e82 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 29 Jun 2018 16:41:04 +0900
Subject: [PATCH 1/3] Give dshash ability to make a local snapshot

Add snapshot feature to DSHASH, that makes a dynahash that consists of
the same data.
---
 src/backend/lib/dshash.c | 164 +++++++++++++++++++++++++++++++++++++++++++++++
 src/include/lib/dshash.h |  21 +++++-
 2 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c
index b46f7c4cfd..758435efe7 100644
--- a/src/backend/lib/dshash.c
+++ b/src/backend/lib/dshash.c
@@ -354,6 +354,48 @@ dshash_destroy(dshash_table *hash_table)
 	pfree(hash_table);
 }
 
+/*
+ * take a local snapshot of a dshash table
+ */
+HTAB *
+dshash_take_snapshot(dshash_table *org_table)
+{
+	HTAB	*dest_hash;
+	HASHCTL ctl;
+	int		num_entries = 0;
+	dshash_seq_status s;
+	void *ps;
+
+	ctl.keysize = org_table->params.key_size;
+	ctl.entrysize = org_table->params.entry_size;
+
+	/*
+	 *  num_entries is not a strict parameter. We don't care if new entries
+	 *	are added before taking snapshot
+	 */
+	num_entries = dshash_get_num_entries(org_table);
+
+	/*
+	 * dshash only supports binary hash and comparator functions, which won't
+	 * stop at intemediate NUL(\x0) bytes. Just specify HASH_BLOBS so that the
+	 * local hash behaves in the same way.
+	 */
+	dest_hash = hash_create("local snapshot of dshash",
+							num_entries, &ctl,
+							HASH_ELEM | HASH_BLOBS);
+							
+	dshash_seq_init(&s, org_table, true);
+	while ((ps = dshash_seq_next(&s)) != NULL)
+	{
+		bool found;
+		void *pd = hash_search(dest_hash, ps, HASH_ENTER, &found);
+		Assert(!found);
+		memcpy(pd, ps, ctl.entrysize);
+	}
+
+	return dest_hash;
+}
+
 /*
  * Get a handle that can be used by other processes to attach to this hash
  * table.
@@ -592,6 +634,128 @@ dshash_memhash(const void *v, size_t size, void *arg)
 	return tag_hash(v, size);
 }
 
+/*
+ * Initialize a sequential scan on the hash_table. Allows no modifications
+ * during a scan if consistent = true.
+ */
+void
+dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
+				bool consistent)
+{
+	status->hash_table = hash_table;
+	status->curbucket = 0;
+	status->nbuckets = ((size_t) 1) << hash_table->control->size_log2;
+	status->curitem = NULL;
+	status->curpartition = -1;
+	status->consistent = consistent;
+
+	/*
+	 * Protect all partitions from modification if the caller wants a
+	 * consistent result.
+	 */
+	if (consistent)
+	{
+		int i;
+
+		for (i = 0; i < DSHASH_NUM_PARTITIONS; ++i)
+		{
+			Assert(!LWLockHeldByMe(PARTITION_LOCK(hash_table, i)));
+
+			LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
+		}
+	}
+	ensure_valid_bucket_pointers(hash_table);
+}
+
+void *
+dshash_seq_next(dshash_seq_status *status)
+{
+	dsa_pointer next_item_pointer;
+
+	if (status->curitem == NULL)
+	{
+		Assert (status->curbucket == 0);
+		Assert(!status->hash_table->find_locked);
+
+		/* first shot. grab the first item. */
+		next_item_pointer = status->hash_table->buckets[status->curbucket];
+		status->hash_table->find_locked = true;
+	}
+	else
+		next_item_pointer = status->curitem->next;
+
+	/* Move to the next bucket if we finished the current bucket */
+	while (!DsaPointerIsValid(next_item_pointer))
+	{
+		if (++status->curbucket >= status->nbuckets)
+		{
+			/* all buckets have been scanned. finsih. */
+			dshash_seq_release(status);
+			return NULL;
+		}
+		Assert(status->hash_table->find_locked);
+
+		next_item_pointer = status->hash_table->buckets[status->curbucket];
+
+		/*
+		 * we need a lock on the scanning partition even if the caller don't
+		 * requested a consistent snapshot.
+		 */
+		if (!status->consistent && DsaPointerIsValid(next_item_pointer))
+		{
+			dshash_table_item  *item = dsa_get_address(status->hash_table->area,
+													   next_item_pointer);
+			int next_partition = PARTITION_FOR_HASH(item->hash);
+			if (status->curpartition != next_partition)
+			{
+				if (status->curpartition >= 0)
+					LWLockRelease(PARTITION_LOCK(status->hash_table,
+												 status->curpartition));
+				LWLockAcquire(PARTITION_LOCK(status->hash_table,
+											 next_partition),
+							  LW_SHARED);
+				status->curpartition = next_partition;
+			}
+		}
+	}
+
+	status->curitem =
+		dsa_get_address(status->hash_table->area, next_item_pointer);
+	return ENTRY_FROM_ITEM(status->curitem);
+}
+
+void
+dshash_seq_release(dshash_seq_status *status)
+{
+	Assert(status->hash_table->find_locked);
+	status->hash_table->find_locked = false;
+
+	if (status->consistent)
+	{
+		int i;
+
+		for (i = 0; i < DSHASH_NUM_PARTITIONS; ++i)
+			LWLockRelease(PARTITION_LOCK(status->hash_table, i));
+	}
+	else if (status->curpartition >= 0)
+		LWLockRelease(PARTITION_LOCK(status->hash_table, status->curpartition));
+}
+
+int
+dshash_get_num_entries(dshash_table *hash_table)
+{
+	/* a shotcut implement. should be improved  */
+	dshash_seq_status s;
+	void *p;
+	int n = 0;
+
+	dshash_seq_init(&s, hash_table, false);
+	while ((p = dshash_seq_next(&s)) != NULL)
+		n++;
+
+	return n;
+}
+
 /*
  * Print debugging information about the internal state of the hash table to
  * stderr.  The caller must hold no partition locks.
diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h
index 8c733bfe25..eb2e45cf66 100644
--- a/src/include/lib/dshash.h
+++ b/src/include/lib/dshash.h
@@ -15,6 +15,7 @@
 #define DSHASH_H
 
 #include "utils/dsa.h"
+#include "utils/hsearch.h"
 
 /* The opaque type representing a hash table. */
 struct dshash_table;
@@ -59,6 +60,18 @@ typedef struct dshash_parameters
 struct dshash_table_item;
 typedef struct dshash_table_item dshash_table_item;
 
+struct dshash_seq_status
+{
+	dshash_table	   *hash_table;
+	int					curbucket;
+	int					nbuckets;
+	dshash_table_item  *curitem;
+	int					curpartition;
+	bool				consistent;
+};
+
+typedef struct dshash_seq_status dshash_seq_status;
+
 /* Creating, sharing and destroying from hash tables. */
 extern dshash_table *dshash_create(dsa_area *area,
 			  const dshash_parameters *params,
@@ -70,7 +83,7 @@ extern dshash_table *dshash_attach(dsa_area *area,
 extern void dshash_detach(dshash_table *hash_table);
 extern dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table);
 extern void dshash_destroy(dshash_table *hash_table);
-
+extern HTAB * dshash_take_snapshot(dshash_table *org_table);
 /* Finding, creating, deleting entries. */
 extern void *dshash_find(dshash_table *hash_table,
 			const void *key, bool exclusive);
@@ -80,6 +93,12 @@ extern bool dshash_delete_key(dshash_table *hash_table, const void *key);
 extern void dshash_delete_entry(dshash_table *hash_table, void *entry);
 extern void dshash_release_lock(dshash_table *hash_table, void *entry);
 
+/* seq scan support */
+extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
+							bool exclusive);
+extern void *dshash_seq_next(dshash_seq_status *status);
+extern void dshash_seq_release(dshash_seq_status *status);
+extern int dshash_get_num_entries(dshash_table *hash_table);
 /* Convenience hash and compare functions wrapping memcmp and tag_hash. */
 extern int	dshash_memcmp(const void *a, const void *b, size_t size, void *arg);
 extern dshash_hash dshash_memhash(const void *v, size_t size, void *arg);
-- 
2.16.3

