From 678f56caf1c6eceb5195c531190b573c180e5ef2 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 13 Mar 2019 20:05:04 +0200
Subject: [PATCH 1/2] Add SparseBitset, to hold a large set of 64-bit ints
 efficiently.

This doesn't include any use of the code yet, but we have two patches in
the works that would benefit from this:

* the GiST vacuum patch, to track empty GiST pages and internal GiST pages.

* Reducing memory usage, and also allowing more than 1 GB of memory to be
  used, to hold the dead TIDs in VACUUM.
---
 src/backend/lib/Makefile       |   2 +-
 src/backend/lib/README         |   2 +
 src/backend/lib/sparsebitset.c | 986 +++++++++++++++++++++++++++++++++
 src/include/lib/sparsebitset.h |  26 +
 4 files changed, 1015 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/lib/sparsebitset.c
 create mode 100644 src/include/lib/sparsebitset.h

diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 191ea9bca26..32ba388c12d 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -13,6 +13,6 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = binaryheap.o bipartite_match.o bloomfilter.o dshash.o hyperloglog.o \
-       ilist.o knapsack.o pairingheap.o rbtree.o stringinfo.o
+       ilist.o knapsack.o pairingheap.o rbtree.o sparsebitset.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/README b/src/backend/lib/README
index ae5debe1bc6..f938795b54a 100644
--- a/src/backend/lib/README
+++ b/src/backend/lib/README
@@ -19,6 +19,8 @@ pairingheap.c - a pairing heap
 
 rbtree.c - a red-black tree
 
+sparsebitset.c - a sparse bitset of integers
+
 stringinfo.c - an extensible string type
 
 
diff --git a/src/backend/lib/sparsebitset.c b/src/backend/lib/sparsebitset.c
new file mode 100644
index 00000000000..6df900b9ebc
--- /dev/null
+++ b/src/backend/lib/sparsebitset.c
@@ -0,0 +1,986 @@
+/*-------------------------------------------------------------------------
+ *
+ * sparsebitset.c
+ *	  Data structure to hold a large set of 64-bit integers efficiently
+ *
+ * This is somewhat similar to a Bitmapset, except that this works on
+ * 64-bit integers, and is more memory-efficient when the bitmap is sparse.
+ * The supported operations are much more limited, though.
+ *
+ * The data structure is a B-tree, with special packed representation
+ * at the leaf level, for clusters of nearby values.
+ *
+ * Usage:
+ *
+ * 1. Create the sparse bitset with sbs_create()
+ *
+ * 2. Add as many values as you want, with sbs_add_member(). Values must be
+ *    added in order!
+ *
+ * 3. You can test for presence of a value with sbs_is_member()
+ *
+ * 4. You can iterate through all values with sbs_begin_iterate() /
+ *    sbs_iterate_next().
+ *
+ *
+ * Limitations:
+ *
+ * - Values must be added in order.  (Random insertions would require
+ *   splitting nodes, which could be done, but it would be a fair amount of
+ *   extra code.)
+ *
+ * - Values cannot be added while iteration is in progress.
+ *
+ * - No support for removing values.
+ *
+ * None of these limitations are fundamental to the data structure, and
+ * could be lifted if needed, by writing some new code.  But the current
+ * users of this facility don't need them.
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/sparsebitset.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "lib/sparsebitset.h"
+#include "port/pg_bitutils.h"
+#include "utils/memutils.h"
+
+/*
+ * Definitions for bit arrays used on bitmap-type nodes.
+ */
+#define SBS_BITS_PER_BITMAPWORD 32
+typedef uint32 sbsword;
+#define sbsword_rightmost_one_pos(w)	pg_rightmost_one_pos32(w)
+
+#define WORDNUM(x)	((x) / SBS_BITS_PER_BITMAPWORD)
+#define BITNUM(x)	((x) % SBS_BITS_PER_BITMAPWORD)
+
+/* Forward declarations, to break circular references between these structs */
+typedef struct sbs_node sbs_node;
+typedef struct sbs_leaf_node sbs_leaf_node;
+typedef struct sbs_internal_node sbs_internal_node;
+
+/*
+ * Parameters for the size and shape of the B-tree.
+ */
+
+/*
+ * Size of the array/bitmap on leaf nodes.  In bytes; the number of
+ * entries we can fit in that size depends on the format of the node.
+ *
+ * This is the size of the bulky array/bitmap, only. It does not include
+ * any other fields on each node.
+ */
+#define LEAF_NODE_SIZE		1024
+
+/*
+ * Number of children stored on internal nodes.
+ *
+ * An internal node has two arrays, with 64 elements each.  With
+ * sizeof(void *) == 8, they add up to 1024 bytes.
+ *
+ * Note that because nodes are just an in-memory data structure, there is
+ * no need internal and leaf nodes to be of the same size.
+ *
+ * If you change this, you must recalculate MAX_INTERVAL_LEVELS, too!
+ */
+#define MAX_INTERNAL_ITEMS	64
+
+/*
+ * Maximum height of the tree.
+ *
+ * MAX_INTERNAL_ITEMS determines the "fan-out" of the B-tree.  The theoretical
+ * maximum number of items that we can store in a bitset is 2^64.
+ * MAX_TREE_LEVELS should be set so that:
+ *
+ *   MAX_INTERNAL_ITEMS ^ MAX_INTERNAL_LEVELS >= 2^64.
+ *
+ * In practice, we'll need fewer levels, because leaf nodes will be packed
+ * very densely, as the bitset becomes more full.  But there's no harm in
+ * overshooting this.
+ */
+#define MAX_TREE_LEVELS		11
+
+
+/*
+ * SparseBitset is the main object representing the bitset.
+ *
+ * All the values are stored in an in-memory B-tree structure.  In addition
+ * to the B-tree, SparseBitset tracks information about memory usage, and the
+ * current position, when iterating the tree with sbm_begin_iterate /
+ * sbm_iterate_next.
+ */
+struct SparseBitset
+{
+	/*
+	 * 'context' is a dedicated memory context, used to hold the SparseBitset
+	 * struct itself, as well as all the tree nodes.
+	 *
+	 * 'mem_used' tracks the amount of memory used.  We don't do anything with
+	 * it in the sparse bitset code itself, but the callers can ask for it
+	 * with sbs_memory_usage().
+	 */
+	MemoryContext context;		/* memory context holding everything */
+	uint64		mem_used;		/* amount of memory used */
+
+	/*
+	 * 'root' points to the root node of the tree (or the only leaf node, if
+	 * num_levels == 1).  'leftmost_leaf' points to the leftmost leaf node.
+	 */
+	sbs_node   *root;
+	sbs_leaf_node *leftmost_leaf;
+	int			num_levels;		/* height of the tree */
+
+	uint64		num_entries;	/* total # of values in the tree */
+
+	/*
+	 * Pointer to the rightmost leaf node, and its parent, grandparent etc.
+	 * all the way up to the root.  These are needed when adding new values.
+	 * (Currently, we require that new values are added at the end.)
+	 */
+	sbs_leaf_node *rightmost_leaf;
+	sbs_internal_node *rightmost_parents[MAX_TREE_LEVELS - 1];
+	uint64		last_item;		/* highest value stored in this bitset */
+
+	/*
+	 * Iterator support.
+	 */
+	sbs_leaf_node *iter_node;
+	int			iter_itemno;
+	int			iter_wordno;
+	int			iter_bitno;
+};
+
+
+/*
+ * Node structures, for the in-memory B-tree.
+ *
+ * An internal node holds a number of downlink pointers, to leaf nodes, or
+ * nodes to internal nodes on lower level.  For each downlink, the key value
+ * corresponding lower level node is stored.  The stored key values are
+ * low keys.  If the downlink has value X, then all items stored on that child
+ * are >= X.
+ *
+ * A leaf node stores a number of values. There are three different formats,
+ * with varying levels of "compression".  Which one is used depending on the
+ * values stored.
+ *
+ * 1. A sorted array of uint64 values.
+ *
+ * 2. A sorted array of uint32 values, with a constant offset or 'prefix'
+ *    that's added to each value.  This representation takes half the space as
+ *    the uint64 array - or to put another way, it can store twice as many
+ *    items on a node.  But it has the limitation that the difference between
+ *    the lowest and highest entries on the node must be less than 2^32.  The
+ *    lowest entry is used as the 'prefix', and the highest entry must be
+ *    representable with prefix + uint32.
+ *
+ * 3. A bitmap.  On a bitmap node, values are not stored as integers, but as
+ *    a prefix, plus a bitmap indicating which values are present. This is much
+ *    more compact, when a lot of close-by values are set.
+ *
+ * Each leaf node also has a pointer to the next leaf node, so that the leaf
+ * nodes can be easily walked from beginning to end, when iterating.
+ */
+
+/*
+ * Common structure of both leaf and internal nodes.
+ */
+typedef enum
+{
+	SBS_INTERNAL,
+	SBS_LEAF_U64,
+	SBS_LEAF_U32,
+	SBS_LEAF_BITMAP
+} sbs_node_type;
+
+struct sbs_node
+{
+	sbs_node_type type;
+	uint16		num_items;
+};
+
+/*
+ * Internal node
+ */
+struct sbs_internal_node
+{
+	/* common header, must match sbs_node */
+	sbs_node_type type;
+	uint16		num_items;
+
+	uint16		level;			/* always >= 1 */
+
+	/*
+	 * 'items' is an array of key values, and 'downlinks' are pointers
+	 * to lower-level nodes, corresponding to the key values.
+	 */
+	uint64		items[MAX_INTERNAL_ITEMS];
+	sbs_node   *downlinks[MAX_INTERNAL_ITEMS];
+};
+
+/*
+ * Leaf node
+ */
+struct sbs_leaf_node
+{
+	/* common header, must match sbs_node */
+	sbs_node_type type;
+	uint16		num_items;
+
+	/* constant to be added to values, in U32 or BITMAP node */
+	uint64		prefix;
+
+	/*
+	 * Space used to hold the items. The format depends on the node type.
+	 *
+	 * All of these arrays should have the same size, in bytes. Otherwise,
+	 * we would waste memory.
+	 */
+#define MAX_U64_ITEMS		(LEAF_NODE_SIZE / sizeof(uint64))
+#define MAX_U32_ITEMS		(LEAF_NODE_SIZE / sizeof(uint32))
+#define MAX_BITMAP_WORDS	(LEAF_NODE_SIZE / sizeof(sbsword))
+	union
+	{
+		uint64		u64[MAX_U64_ITEMS];
+		uint32		u32[MAX_U32_ITEMS];
+		sbsword		words[MAX_BITMAP_WORDS];
+	} items;
+
+	sbs_leaf_node *next;		/* right sibling, if any */
+};
+
+/*
+ * On a U32-type leaf node, the difference between the smallest and largest
+ * value on the node can be at most 2^32 - 1.  Otherwise, the largest value
+ * could not be represented as a prefix + 32-bit offset.
+ *
+ * Likewise, on a bitmap node, the size of the bitmap is fixed, which dictates
+ * the highest value that can be stored in the bitmap, given a particular
+ * prefix.
+ */
+#define MAX_U32_DISTANCE	PG_UINT32_MAX
+#define MAX_BITMAP_DISTANCE	(MAX_BITMAP_WORDS * SBS_BITS_PER_BITMAPWORD - 1)
+
+
+/*
+ * prototypes for internal functions.
+ */
+static sbs_leaf_node *sbs_new_leaf(SparseBitset *sbs, uint64 newitem);
+static sbs_internal_node *sbs_new_internal(SparseBitset *sbs, int level,
+				 uint64 newitem, sbs_node *downlink);
+static bool sbs_repack_node(sbs_leaf_node *leaf);
+static void sbs_update_upper(SparseBitset *sbs, int level,
+				 void *new_node, uint64 new_node_item);
+
+static inline int sbs_binsrch_u64(uint64 value, uint64 *arr, int arr_elems,
+				bool nextkey);
+static inline int sbs_binsrch_u32(uint32 value, uint32 *arr, int arr_elems,
+				bool nextkey);
+static uint64 sbs_node_get_first_item(sbs_node *node);
+
+
+/*
+ * Create a new, initially empty, bit set.
+ */
+SparseBitset *
+sbs_create(void)
+{
+	MemoryContext context;
+	SparseBitset *sbs;
+
+	/*
+	 * Create a new memory context to hold everything.
+	 *
+	 * We never free any nodes, so the generational allocator works
+	 * well for us.
+	 *
+	 * Use a large block size, in the hopes that if we use a lot of memory,
+	 * the libc allocator will give it back to the OS when we free it, rather
+	 * than add it to a free-list. (On glibc, see M_MMAP_THRESHOLD.  As of this
+	 * writing, the effective threshold is somewhere between 128 kB and 4 MB.)
+	 */
+	context = GenerationContextCreate(CurrentMemoryContext,
+									  "Sparse bitmap",
+									  SLAB_LARGE_BLOCK_SIZE);
+
+	/* Allocate the SparseBitset object itself, in the context. */
+	sbs = (SparseBitset *) MemoryContextAlloc(context, sizeof(SparseBitset));
+	sbs->context = context;
+	sbs->mem_used = GetMemoryChunkSpace(sbs);
+
+	sbs->root = NULL;
+	sbs->leftmost_leaf = NULL;
+	sbs->num_levels = 0;
+	sbs->num_entries = 0;
+
+	sbs->rightmost_leaf = NULL;
+	memset(sbs->rightmost_parents, 0, sizeof(sbs->rightmost_parents));
+	sbs->last_item = 0;
+
+	sbs->iter_node = NULL;
+	sbs->iter_itemno = 0;
+	sbs->iter_wordno = 0;
+	sbs->iter_bitno = 0;
+
+	return sbs;
+}
+
+/*
+ * Allocate a new leaf node.
+ *
+ * The new node is installed as the right sibling of the current rightmost
+ * leaf.  (Or as 'first_child', if this is the first node in the tree.)
+ */
+static sbs_leaf_node *
+sbs_new_leaf(SparseBitset *sbs, uint64 newitem)
+{
+	sbs_leaf_node *n;
+
+	n = (sbs_leaf_node *)
+		MemoryContextAlloc(sbs->context, sizeof(sbs_leaf_node));
+	sbs->mem_used += GetMemoryChunkSpace(n);
+
+	n->type = SBS_LEAF_U64;
+	n->prefix = 0;
+	n->next = NULL;
+
+	/* Store the item on it. */
+	n->items.u64[0] = newitem;
+	n->num_items = 1;
+
+	/* Link it to the previous rightmost node. */
+	if (sbs->rightmost_leaf)
+		sbs->rightmost_leaf->next = n;
+	else
+	{
+		/* creating the very first node in the tree. */
+		Assert(sbs->leftmost_leaf == NULL);
+		Assert(sbs->num_entries == 0);
+		sbs->leftmost_leaf = n;
+	}
+	sbs->rightmost_leaf = n;
+
+	return n;
+}
+
+/*
+ * Allocate a new internal node.
+ *
+ * The 'rightmost_parents' is updated with the new node, but inserting the
+ * downlink to the parent is left to the caller.
+ */
+static sbs_internal_node *
+sbs_new_internal(SparseBitset *sbs, int level, uint64 newitem,
+				 sbs_node *downlink)
+{
+	sbs_internal_node *n;
+
+	n = (sbs_internal_node *)
+		MemoryContextAlloc(sbs->context, sizeof(sbs_internal_node));
+	sbs->mem_used += GetMemoryChunkSpace(n);
+
+	n->type = SBS_INTERNAL;
+	n->level = level;
+
+	/* Store the item on it. */
+	n->items[0] = newitem;
+	n->downlinks[0] = downlink;
+	n->num_items = 1;
+
+	sbs->rightmost_parents[level] = n;
+
+	return n;
+}
+
+/*
+ * Free a sparse bitmap.
+ */
+void
+sbs_free(SparseBitset *sbs)
+{
+	/* everything is allocated in the memory context */
+	MemoryContextDelete(sbs->context);
+}
+
+/*
+ * Return the number of entries in the bitset.
+ */
+uint64
+sbs_num_entries(SparseBitset *sbs)
+{
+	return sbs->num_entries;
+}
+
+/*
+ * Return the amount of memory used by the bitset.
+ */
+uint64
+sbs_memory_usage(SparseBitset *sbs)
+{
+	return sbs->mem_used;
+}
+
+/*
+ * Add a value in the bitmap.
+ *
+ * Values must be added in order.
+ */
+void
+sbs_add_member(SparseBitset *sbs, uint64 newitem)
+{
+	sbs_leaf_node *leaf;
+
+	if (sbs->iter_node)
+		elog(ERROR, "cannot add new values to sparse bitset when iteration is in progress");
+
+	if (sbs->num_entries == 0)
+	{
+		/*
+		 * This is the very first item in the bitmap.
+		 *
+		 * Allocate root node. It's also a leaf.
+		 */
+		leaf = sbs_new_leaf(sbs, newitem);
+		sbs->root = (sbs_node *) leaf;
+		sbs->num_levels = 1;
+	}
+	else
+	{
+		/* The new value must be larger than the last one added */
+		if (newitem <= sbs->last_item)
+			elog(ERROR, "cannot insert to sparse bitset out of order");
+
+		/*
+		 * The new value will go to the rightmost leaf node.  Or to a new leaf
+		 * node, if the currently rightmost leaf is full.
+		 */
+		leaf = sbs->rightmost_leaf;
+
+retry:
+		switch (leaf->type)
+		{
+			case SBS_LEAF_U64:
+				/*
+				 * On a U64 node, add the new entry to the end of the array,
+				 * if there's room.  If the array is full, try to repack the
+				 * node to a more dense format, to make room.
+				 */
+				if (leaf->num_items < MAX_U64_ITEMS)
+				{
+					/* There is room, add the item to the node. */
+					leaf->items.u64[leaf->num_items] = newitem;
+					leaf->num_items++;
+				}
+				else
+				{
+					if (sbs_repack_node(leaf))
+						goto retry;
+
+					leaf = sbs_new_leaf(sbs, newitem);
+					sbs_update_upper(sbs, 1, leaf, newitem);
+				}
+				break;
+
+			case SBS_LEAF_U32:
+				/*
+				 * The logic for a U32 node is the same as for U64, but we have
+				 * to also check that the new item is not too far from the
+				 * lowest value on the node, the 'prefix'.  Otherwise, it
+				 * cannot be represented as uint32 difference from the prefix.
+				 */
+				if (leaf->num_items < MAX_U32_ITEMS &&
+					newitem - leaf->prefix <= MAX_U32_DISTANCE)
+				{
+					leaf->items.u32[leaf->num_items] = newitem - leaf->prefix;
+					leaf->num_items++;
+				}
+				else
+				{
+					if (leaf->num_items >= MAX_U32_ITEMS)
+					{
+						if (sbs_repack_node(leaf))
+							goto retry;
+					}
+
+					leaf = sbs_new_leaf(sbs, newitem);
+					sbs_update_upper(sbs, 1, leaf, newitem);
+				}
+				break;
+
+			case SBS_LEAF_BITMAP:
+				/*
+				 * If the new item falls within the bitmap, set the
+				 * corresponding bit.  Otherwise, we have to allocate a new
+				 * node.
+				 */
+				if (newitem - leaf->prefix <= MAX_BITMAP_DISTANCE)
+				{
+					uint32		val = newitem - leaf->prefix;
+
+					leaf->items.words[WORDNUM(val)] |= (1 << BITNUM(val));
+					leaf->num_items++;
+				}
+				else
+				{
+					leaf = sbs_new_leaf(sbs, newitem);
+					sbs_update_upper(sbs, 1, leaf, newitem);
+				}
+				break;
+
+			default:
+				elog(ERROR, "unknown sparse bitset node type %u", leaf->type);
+		}
+	}
+
+	sbs->last_item = newitem;
+	sbs->num_entries++;
+}
+
+/*
+ * Rewrite a leaf node to a more densely packed fromat, if possible.
+ */
+static bool
+sbs_repack_node(sbs_leaf_node *leaf)
+{
+	if (leaf->num_items < 1)
+		return false;
+
+	if (leaf->type == SBS_LEAF_U64)
+	{
+		/*
+		 * Try to rewrite U64 node into a U32 node.
+		 */
+		uint64		lowest = leaf->items.u64[0];
+		uint64		highest = leaf->items.u64[leaf->num_items - 1];
+
+		if (highest - lowest > MAX_U32_DISTANCE)
+			return false;		/* cannot make this more dense */
+
+		for (int i = 0; i < leaf->num_items; i++)
+		{
+			uint64 x = leaf->items.u64[i];
+
+			leaf->items.u32[i] = x - lowest;
+		}
+
+		leaf->prefix = lowest;
+		leaf->type = SBS_LEAF_U32;
+		return true;
+	}
+	else if (leaf->type == SBS_LEAF_U32)
+	{
+		/*
+		 * Try to rewrite U32 node into a BITMAP node.
+		 */
+		uint32		tmpitems[LEAF_NODE_SIZE / sizeof(uint32)];
+
+		/* prefix is already set. */
+		if (leaf->items.u32[leaf->num_items - 1] > MAX_BITMAP_DISTANCE)
+			return false;		/* cannot make this more dense */
+
+		/* copy all the old items out of the way. */
+		memcpy(tmpitems, leaf->items.u32, LEAF_NODE_SIZE);
+
+		/* clear the bitmap */
+		memset(leaf->items.words, 0, LEAF_NODE_SIZE);
+
+		/* set all bits, representing the items. */
+		for (int i = 0; i < leaf->num_items; i++)
+		{
+			uint32		x = tmpitems[i];
+
+			leaf->items.words[WORDNUM(x)] |= (1 << BITNUM(x));
+		}
+
+		leaf->type = SBS_LEAF_BITMAP;
+		return true;
+	}
+	else if (leaf->type == SBS_LEAF_BITMAP)
+	{
+		/* already in the densest format */
+		return false;
+	}
+	else
+	{
+		elog(ERROR, "unknown sparse bitset leaf type %d", leaf->type);
+		return false;
+	}
+}
+
+/*
+ * Insert the downlink into parent node, after creating a new node.
+ *
+ * Recurses if the parent node is full, too.
+ */
+static void
+sbs_update_upper(SparseBitset *sbs, int level, void *new_node,
+				 uint64 new_node_item)
+{
+	sbs_internal_node *parent;
+
+	/*
+	 * Create a new root node, if necessary.
+	 */
+	if (level >= sbs->num_levels)
+	{
+		uint64		old_root_item;
+
+		/* MAX_TREE_LEVELS should be more than enough, this shouldn't happen */
+		if (sbs->num_levels == MAX_TREE_LEVELS)
+			elog(ERROR, "could not expand sparse bitset, maximum number of levels reached");
+		sbs->num_levels++;
+
+		/* Get the first item on the old root page, to be used as the downlink. */
+		old_root_item = sbs_node_get_first_item(sbs->root);
+
+		parent = sbs_new_internal(sbs, level, old_root_item, sbs->root);
+
+		sbs->root = (sbs_node *) parent;
+	}
+
+	parent = sbs->rightmost_parents[level];
+
+	/* Place the downlink on the parent page. */
+	if (parent->num_items < MAX_INTERNAL_ITEMS)
+	{
+		parent->items[parent->num_items] = new_node_item;
+		parent->downlinks[parent->num_items] = new_node;
+		parent->num_items++;
+	}
+	else
+	{
+		/*
+		 * Doesn't fit.  Allocate new parent, with the downlink, and recursively
+		 * insert the downlink to the new parent to the grandparent.
+		 */
+		parent = sbs_new_internal(sbs, level, new_node_item, new_node);
+		sbs_update_upper(sbs, level + 1, parent, new_node_item);
+	}
+}
+
+/*
+ * Does the bitmap contain the given value?
+ */
+bool
+sbs_is_member(SparseBitset *sbs, uint64 item)
+{
+	sbs_leaf_node *leaf;
+	sbs_internal_node *node;
+	int			level = sbs->num_levels - 1;
+	int			itemno;
+
+	if (sbs->num_entries == 0)
+		return false;
+
+	/*
+	 * Start from the root, and walk down the B-tree to find the right leaf
+	 * node.
+	 */
+	node = (sbs_internal_node *) sbs->root;
+	while (level > 0)
+	{
+		itemno = sbs_binsrch_u64(item, node->items, node->num_items, true);
+
+		if (itemno == 0)
+			return false;
+		itemno--;
+
+		node = (sbs_internal_node *) node->downlinks[itemno];
+		level--;
+	}
+
+	leaf = (sbs_leaf_node *) node;
+
+	if (leaf->type == SBS_LEAF_U64)
+	{
+		/* Binary search in the leaf node */
+		itemno = sbs_binsrch_u64(item, leaf->items.u64, leaf->num_items, false);
+
+		if (itemno >= leaf->num_items)
+			return false;
+		else
+			return leaf->items.u64[itemno] == item;
+	}
+	else if (leaf->type == SBS_LEAF_U32)
+	{
+		uint32		val = item - leaf->prefix;
+
+		/* Binary search in the leaf node */
+		itemno = sbs_binsrch_u32(val, leaf->items.u32, leaf->num_items, false);
+
+		if (itemno >= leaf->num_items)
+			return false;
+		else
+			return leaf->items.u32[itemno] == val;
+	}
+	else if (leaf->type == SBS_LEAF_BITMAP)
+	{
+		uint32	val = item - leaf->prefix;
+
+		if (val > MAX_BITMAP_DISTANCE)
+			return false;
+		else
+			return (leaf->items.words[WORDNUM(val)] & (1 << BITNUM(val))) != 0;
+	}
+	else
+		elog(ERROR, "invalid sparse bitset leaf node type %d", leaf->type);
+}
+
+
+/*
+ * Begin in-order scan through all the items.
+ *
+ * While the iteration is in-progress, you cannot add new items to the bitset.
+ */
+void
+sbs_begin_iterate(SparseBitset *sbs)
+{
+	sbs->iter_node = sbs->leftmost_leaf;
+	sbs->iter_itemno = 0;
+	sbs->iter_wordno = 0;
+	sbs->iter_bitno = 0;
+}
+
+/*
+ * Returns the next item, when iterating.
+ *
+ * sbs_begin_iterate() must be called first.  sbs_iterate_next() returns
+ * the next value in the bit set.  If there are no more values, *found is set
+ * to false.
+ */
+uint64
+sbs_iterate_next(SparseBitset *sbs, bool *found)
+{
+	uint64		result;
+
+	if (!sbs->iter_node)
+	{
+		/* We are already at the end. */
+		*found = false;
+		return 0;
+	}
+
+	while (sbs->iter_node)
+	{
+		if (sbs->iter_node->type == SBS_LEAF_U64 ||
+			sbs->iter_node->type == SBS_LEAF_U32)
+		{
+			if (sbs->iter_itemno >= sbs->iter_node->num_items)
+			{
+				/* We have reached end of this node.  Step to the next one. */
+				sbs->iter_node = sbs->iter_node->next;
+				sbs->iter_itemno = 0;
+				sbs->iter_wordno = 0;
+				sbs->iter_bitno = 0;
+				continue;
+			}
+
+			/* Return the next item on the page. */
+			if (sbs->iter_node->type == SBS_LEAF_U64)
+				result = sbs->iter_node->items.u64[sbs->iter_itemno];
+			else
+				result = sbs->iter_node->prefix + sbs->iter_node->items.u32[sbs->iter_itemno];
+
+			sbs->iter_itemno++;
+			*found = true;
+			return result;
+		}
+		else if (sbs->iter_node->type == SBS_LEAF_BITMAP)
+		{
+			sbsword		mask;
+
+			/* At the beginning, ignore bits that we had returned already */
+			mask = (~(sbsword) 0) << sbs->iter_bitno;
+
+			/*
+			 * Scan the bitmap words for the next set bit.
+			 */
+			while (sbs->iter_wordno < LEAF_NODE_SIZE / sizeof(sbsword))
+			{
+				sbsword		w = sbs->iter_node->items.words[sbs->iter_wordno];
+
+				/* ignore bits before bitno */
+				w &= mask;
+
+				if (w != 0)
+				{
+					/*
+					 * Found a set bit!  Reconstruct the original value that
+					 * the bit represents.
+					 */
+					int			match_bitno = sbsword_rightmost_one_pos(w);
+
+					result = sbs->iter_node->prefix;
+					result += sbs->iter_wordno * SBS_BITS_PER_BITMAPWORD;
+					result += match_bitno;
+
+					/* advance iterator to next bit, for next call */
+					if (match_bitno == SBS_BITS_PER_BITMAPWORD - 1)
+					{
+						sbs->iter_wordno++;
+						sbs->iter_bitno = 0;
+					}
+					else
+						sbs->iter_bitno = match_bitno + 1;
+
+					*found = true;
+					return result;
+				}
+
+				/* in subsequent words, consider all bits */
+				mask = (~(sbsword) 0);
+
+				sbs->iter_wordno++;
+				sbs->iter_bitno = 0;
+			}
+
+			/* No more matches on this bitmap. Step to the next node. */
+			sbs->iter_node = sbs->iter_node->next;
+			sbs->iter_itemno = 0;
+			sbs->iter_wordno = 0;
+			sbs->iter_bitno = 0;
+			continue;
+		}
+		else
+			elog(ERROR, "invalid sparse bitset leaf node type %d", sbs->iter_node->type);
+	}
+
+	/* No more results. */
+	*found = false;
+	return 0;
+}
+
+
+/*
+ * sbs_binsrch_u64() -- search a sorted array of uint64s
+ *
+ * Returns the first position with key equal or less than the given key.
+ * The returned position would be the "insert" location for the given key,
+ * that is, the position where the new key should be inserted to.
+ *
+ * 'nextkey' affects the behavior on equal keys.  If true, and there is an
+ * equal key in the array, this returns the position immediately after the
+ * equal key. If false, this returns the position of the equal key itself.
+ */
+static inline int
+sbs_binsrch_u64(uint64 item, uint64 *arr, int arr_elems, bool nextkey)
+{
+	int			low,
+				high,
+				mid;
+
+	low = 0;
+	high = arr_elems;
+	while (high > low)
+	{
+		mid = low + (high - low) / 2;
+
+		if (nextkey)
+		{
+			if (item >= arr[mid])
+				low = mid + 1;
+			else
+				high = mid;
+		}
+		else
+		{
+			if (item > arr[mid])
+				low = mid + 1;
+			else
+				high = mid;
+		}
+	}
+
+	return low;
+}
+
+/* same, but for an array of uint32 */
+static inline int
+sbs_binsrch_u32(uint32 item, uint32 *arr, int arr_elems, bool nextkey)
+{
+	int			low,
+				high,
+				mid;
+
+	low = 0;
+	high = arr_elems;
+	while (high > low)
+	{
+		mid = low + (high - low) / 2;
+
+		if (nextkey)
+		{
+			if (item >= arr[mid])
+				low = mid + 1;
+			else
+				high = mid;
+		}
+		else
+		{
+			if (item > arr[mid])
+				low = mid + 1;
+			else
+				high = mid;
+		}
+	}
+
+	return low;
+}
+
+/*
+ * Get the first item on a node.
+ *
+ * This knows how to deal with all node types.
+ */
+static uint64
+sbs_node_get_first_item(sbs_node *node)
+{
+	sbs_internal_node *internal;
+	sbs_leaf_node *leaf;
+
+	Assert(node->num_items > 0);
+
+	switch(node->type)
+	{
+		case SBS_INTERNAL:
+			internal = (sbs_internal_node *) node;
+
+			return internal->items[0];
+
+		case SBS_LEAF_U64:
+			leaf = (sbs_leaf_node *) node;
+
+			return leaf->items.u64[0];
+
+		case SBS_LEAF_U32:
+			leaf = (sbs_leaf_node *) node;
+
+			return leaf->prefix + leaf->items.u32[0];
+
+		case SBS_LEAF_BITMAP:
+			leaf = (sbs_leaf_node *) node;
+
+			/*
+			 * Currently, sbs_add_member() always chooses the prefix based on
+			 * the first item on the node. Therefore, the first bit should
+			 * always be set, and we can just return 'prefix'.
+			 */
+			Assert(leaf->items.words[0] & 1);
+
+			return leaf->prefix;
+
+		default:
+			elog(ERROR, "invalid sparse bitset node type %d", node->type);
+	}
+	return 0;
+}
diff --git a/src/include/lib/sparsebitset.h b/src/include/lib/sparsebitset.h
new file mode 100644
index 00000000000..7449f189f60
--- /dev/null
+++ b/src/include/lib/sparsebitset.h
@@ -0,0 +1,26 @@
+/*
+ * sparsebitset.h
+ *	  Data structure to hold a large set of integers efficiently
+ *
+ * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
+ *
+ * src/include/lib/sparsebitset.h
+ */
+#ifndef SPARSEBITSET_H
+#define SPARSEBITSET_H
+
+typedef struct SparseBitset SparseBitset;
+
+extern SparseBitset *sbs_create(void);
+extern void sbs_free(SparseBitset *sbs);
+
+extern void sbs_add_member(SparseBitset *sbs, uint64 newitem);
+extern bool sbs_is_member(SparseBitset *sbs, uint64 item);
+
+extern uint64 sbs_num_entries(SparseBitset *sbs);
+extern uint64 sbs_memory_usage(SparseBitset *sbs);
+
+extern void sbs_begin_iterate(SparseBitset *sbs);
+extern uint64 sbs_iterate_next(SparseBitset *sbs, bool *found);
+
+#endif							/* SPARSEBITSET_H */
-- 
2.20.1

