Cube extension split algorithm fix

Started by Stas Kelvichover 12 years ago3 messages
#1Stas Kelvich
stas.kelvich@gmail.com
1 attachment(s)

Hello, hackers.

I've fixed split algorithm that was implemented in cube extension. I've changed it according to the original Guttman paper (old version was more simple algorithm) and also ported Alexander Korotkov's algorithm from box datatype indexing that work faster and better on low dimensions.

On my test dataset (1M records, 7 dimensions, real world database of goods) numbers was following:

Building index over table (on expression):
old: 67.296058 seconds
new: 48.842391 seconds

Cube point search, mean, 100 queries
old: 0.001025 seconds
new: 0.000427 seconds

Index on field size:
old: 562 MB
new: 283 MB

Stas.

Attachments:

split.diffapplication/octet-stream; name=split.diff; x-unix-mode=0644Download
diff --git a/contrib/cube/cube.c b/contrib/cube/cube.c
index dab0e6e..0ff7f8d 100644
--- a/contrib/cube/cube.c
+++ b/contrib/cube/cube.c
@@ -131,21 +131,32 @@ Datum		cube_enlarge(PG_FUNCTION_ARGS);
 /*
 ** For internal use only
 */
-int32		cube_cmp_v0(NDBOX *a, NDBOX *b);
-bool		cube_contains_v0(NDBOX *a, NDBOX *b);
-bool		cube_overlap_v0(NDBOX *a, NDBOX *b);
-NDBOX	   *cube_union_v0(NDBOX *a, NDBOX *b);
-void		rt_cube_size(NDBOX *a, double *sz);
+static int				interval_cmp_lower(const void *i1, const void *i2);
+static int				interval_cmp_upper(const void *i1, const void *i2);
+static int				common_entry_cmp(const void *i1, const void *i2);
+static void				guttman_split(GistEntryVector *entryvec, GIST_SPLITVEC *v);
+static void				korotkov_split(GistEntryVector *entryvec, GIST_SPLITVEC *v);
+static void				fallback_split(GistEntryVector *entryvec, GIST_SPLITVEC *v);
+
+static inline int32		cube_cmp_v0(NDBOX *a, NDBOX *b);
+static inline bool		cube_contains_v0(NDBOX *a, NDBOX *b);
+static inline bool		cube_overlap_v0(NDBOX *a, NDBOX *b);
+static inline NDBOX	   *cube_union_v0(NDBOX *a, NDBOX *b);
+static inline NDBOX	   *_cube_inter(NDBOX *a, NDBOX *b);
+static inline void		rt_cube_size(NDBOX *a, double *sz);
+static inline double	distance_1D(double a1, double a2, double b1, double b2);
+static inline double	cube_penalty(NDBOX *origentry, NDBOX *newentry);
+
+static inline float 	non_negative(float val);
+static inline NDBOX	   *adjust_box(NDBOX *b, NDBOX *addon);
+static inline void		cube_consider_split(ConsiderSplitContext *context, int dimNum,
+						  double rightLower, int minLeftCount,
+						  double leftUpper, int maxLeftCount);
+
 NDBOX	   *g_cube_binary_union(NDBOX *r1, NDBOX *r2, int *sizep);
 bool		g_cube_leaf_consistent(NDBOX *key, NDBOX *query, StrategyNumber strategy);
 bool		g_cube_internal_consistent(NDBOX *key, NDBOX *query, StrategyNumber strategy);
 
-/*
-** Auxiliary funxtions
-*/
-static double distance_1D(double a1, double a2, double b1, double b2);
-
-
 /*****************************************************************************
  * Input/Output functions
  *****************************************************************************/
@@ -454,15 +465,11 @@ g_cube_penalty(PG_FUNCTION_ARGS)
 	GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
 	GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
 	float	   *result = (float *) PG_GETARG_POINTER(2);
-	NDBOX	   *ud;
-	double		tmp1,
-				tmp2;
 
-	ud = cube_union_v0(DatumGetNDBOX(origentry->key),
-					   DatumGetNDBOX(newentry->key));
-	rt_cube_size(ud, &tmp1);
-	rt_cube_size(DatumGetNDBOX(origentry->key), &tmp2);
-	*result = (float) (tmp1 - tmp2);
+	*result = cube_penalty(
+		DatumGetNDBOX(origentry->key),
+		DatumGetNDBOX(newentry->key)
+	);
 
 	/*
 	 * fprintf(stderr, "penalty\n"); fprintf(stderr, "\t%g\n", *result);
@@ -470,8 +477,6 @@ g_cube_penalty(PG_FUNCTION_ARGS)
 	PG_RETURN_FLOAT8(*result);
 }
 
-
-
 /*
 ** The GiST PickSplit method for boxes
 ** We use Guttman's poly time split algorithm
@@ -479,148 +484,13 @@ g_cube_penalty(PG_FUNCTION_ARGS)
 Datum
 g_cube_picksplit(PG_FUNCTION_ARGS)
 {
-	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
-	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
-	OffsetNumber i,
-				j;
-	NDBOX	   *datum_alpha,
-			   *datum_beta;
-	NDBOX	   *datum_l,
-			   *datum_r;
-	NDBOX	   *union_d,
-			   *union_dl,
-			   *union_dr;
-	NDBOX	   *inter_d;
-	bool		firsttime;
-	double		size_alpha,
-				size_beta,
-				size_union,
-				size_inter;
-	double		size_waste,
-				waste;
-	double		size_l,
-				size_r;
-	int			nbytes;
-	OffsetNumber seed_1 = 1,
-				seed_2 = 2;
-	OffsetNumber *left,
-			   *right;
-	OffsetNumber maxoff;
-
-	/*
-	 * fprintf(stderr, "picksplit\n");
-	 */
-	maxoff = entryvec->n - 2;
-	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
-	v->spl_left = (OffsetNumber *) palloc(nbytes);
-	v->spl_right = (OffsetNumber *) palloc(nbytes);
-
-	firsttime = true;
-	waste = 0.0;
-
-	for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
-	{
-		datum_alpha = DatumGetNDBOX(entryvec->vector[i].key);
-		for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
-		{
-			datum_beta = DatumGetNDBOX(entryvec->vector[j].key);
-
-			/* compute the wasted space by unioning these guys */
-			/* size_waste = size_union - size_inter; */
-			union_d = cube_union_v0(datum_alpha, datum_beta);
-			rt_cube_size(union_d, &size_union);
-			inter_d = DatumGetNDBOX(DirectFunctionCall2(cube_inter,
-						  entryvec->vector[i].key, entryvec->vector[j].key));
-			rt_cube_size(inter_d, &size_inter);
-			size_waste = size_union - size_inter;
-
-			/*
-			 * are these a more promising split than what we've already seen?
-			 */
-
-			if (size_waste > waste || firsttime)
-			{
-				waste = size_waste;
-				seed_1 = i;
-				seed_2 = j;
-				firsttime = false;
-			}
-		}
-	}
-
-	left = v->spl_left;
-	v->spl_nleft = 0;
-	right = v->spl_right;
-	v->spl_nright = 0;
-
-	datum_alpha = DatumGetNDBOX(entryvec->vector[seed_1].key);
-	datum_l = cube_union_v0(datum_alpha, datum_alpha);
-	rt_cube_size(datum_l, &size_l);
-	datum_beta = DatumGetNDBOX(entryvec->vector[seed_2].key);
-	datum_r = cube_union_v0(datum_beta, datum_beta);
-	rt_cube_size(datum_r, &size_r);
-
-	/*
-	 * Now split up the regions between the two seeds.	An important property
-	 * of this split algorithm is that the split vector v has the indices of
-	 * items to be split in order in its left and right vectors.  We exploit
-	 * this property by doing a merge in the code that actually splits the
-	 * page.
-	 *
-	 * For efficiency, we also place the new index tuple in this loop. This is
-	 * handled at the very end, when we have placed all the existing tuples
-	 * and i == maxoff + 1.
-	 */
-
-	maxoff = OffsetNumberNext(maxoff);
-	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-	{
-		/*
-		 * If we've already decided where to place this item, just put it on
-		 * the right list.	Otherwise, we need to figure out which page needs
-		 * the least enlargement in order to store the item.
-		 */
-
-		if (i == seed_1)
-		{
-			*left++ = i;
-			v->spl_nleft++;
-			continue;
-		}
-		else if (i == seed_2)
-		{
-			*right++ = i;
-			v->spl_nright++;
-			continue;
-		}
-
-		/* okay, which page needs least enlargement? */
-		datum_alpha = DatumGetNDBOX(entryvec->vector[i].key);
-		union_dl = cube_union_v0(datum_l, datum_alpha);
-		union_dr = cube_union_v0(datum_r, datum_alpha);
-		rt_cube_size(union_dl, &size_alpha);
-		rt_cube_size(union_dr, &size_beta);
-
-		/* pick which page to add it to */
-		if (size_alpha - size_l < size_beta - size_r)
-		{
-			datum_l = union_dl;
-			size_l = size_alpha;
-			*left++ = i;
-			v->spl_nleft++;
-		}
-		else
-		{
-			datum_r = union_dr;
-			size_r = size_beta;
-			*right++ = i;
-			v->spl_nright++;
-		}
-	}
-	*left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
+	GistEntryVector	*entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
+	GIST_SPLITVEC	*v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
 
-	v->spl_ldatum = PointerGetDatum(datum_l);
-	v->spl_rdatum = PointerGetDatum(datum_r);
+	if (DatumGetNDBOX(entryvec->vector[FirstOffsetNumber].key)->dim >= SPLIT_THRESHOLD)
+		guttman_split(entryvec, v);
+	else
+		korotkov_split(entryvec, v);
 
 	PG_RETURN_POINTER(v);
 }
@@ -724,7 +594,7 @@ g_cube_binary_union(NDBOX *r1, NDBOX *r2, int *sizep)
 
 
 /* cube_union_v0 */
-NDBOX *
+static inline NDBOX *
 cube_union_v0(NDBOX *a, NDBOX *b)
 {
 	int			i;
@@ -794,76 +664,82 @@ cube_union(PG_FUNCTION_ARGS)
 }
 
 /* cube_inter */
-Datum
-cube_inter(PG_FUNCTION_ARGS)
+static inline NDBOX*
+_cube_inter(NDBOX *a, NDBOX *b)
 {
-	NDBOX	   *a = PG_GETARG_NDBOX(0);
-	NDBOX	   *b = PG_GETARG_NDBOX(1);
-	NDBOX	   *result;
-	bool		swapped = false;
+	NDBOX	   *result, *cube1, *cube2;
 	int			i;
+	double		edge1, edge2;
 
-	if (a->dim >= b->dim)
-	{
-		result = palloc0(VARSIZE(a));
-		SET_VARSIZE(result, VARSIZE(a));
-		result->dim = a->dim;
-	}
-	else
-	{
-		result = palloc0(VARSIZE(b));
-		SET_VARSIZE(result, VARSIZE(b));
-		result->dim = b->dim;
-	}
+	cube1 = (a->dim > b->dim) ? a : b;
+	cube2 = (a->dim <= b->dim) ? a : b;
 
-	/* swap the box pointers if needed */
-	if (a->dim < b->dim)
+	result = palloc0(VARSIZE(cube1));
+	SET_VARSIZE(result, VARSIZE(cube1));
+	result->dim = cube1->dim;
+
+	/* compute the intersection */
+	for (i = 0; i < cube2->dim; i++)
 	{
-		NDBOX	   *tmp = b;
+		edge1 = Max(
+			Min(cube1->x[i], cube1->x[i + cube1->dim]),
+			Min(cube2->x[i], cube2->x[i + cube2->dim])
+		);
+		edge2 = Min(
+			Max(cube1->x[i], cube1->x[i + cube1->dim]),
+			Max(cube2->x[i], cube2->x[i + cube2->dim])
+		);
 
-		b = a;
-		a = tmp;
-		swapped = true;
-	}
 
-	/*
-	 * use the potentially	smaller of the two boxes (b) to fill in the
-	 * result, padding absent dimensions with zeroes
-	 */
-	for (i = 0; i < b->dim; i++)
-	{
-		result->x[i] = Min(b->x[i], b->x[i + b->dim]);
-		result->x[i + a->dim] = Max(b->x[i], b->x[i + b->dim]);
-	}
-	for (i = b->dim; i < a->dim; i++)
-	{
-		result->x[i] = 0;
-		result->x[i + a->dim] = 0;
+		if (edge1 <= edge2)
+		{
+			result->x[i] = edge1;
+			result->x[i + result->dim] = edge2;
+		}
+		else
+		{
+			result->x[i] = 0;
+			result->x[i + result->dim] = 0;
+		}
 	}
 
-	/* compute the intersection */
-	for (i = 0; i < a->dim; i++)
+	for (; i < cube2->dim; i++)
 	{
-		result->x[i] =
-			Max(Min(a->x[i], a->x[i + a->dim]), result->x[i]);
-		result->x[i + a->dim] = Min(Max(a->x[i],
-								   a->x[i + a->dim]), result->x[i + a->dim]);
+		edge1 = Max(
+			Min(cube1->x[i], cube1->x[i + cube1->dim]),
+			0
+		);
+		edge2 = Min(
+			Max(cube1->x[i], cube1->x[i + cube1->dim]),
+			0
+		);
+
+		if (edge1 <= edge2)
+		{
+			result->x[i] = edge1;
+			result->x[i + result->dim] = edge2;
+		}
+		else
+		{
+			result->x[i] = 0;
+			result->x[i + result->dim] = 0;
+		}
 	}
 
-	if (swapped)
-	{
-		PG_FREE_IF_COPY(b, 0);
-		PG_FREE_IF_COPY(a, 1);
-	}
-	else
-	{
-		PG_FREE_IF_COPY(a, 0);
-		PG_FREE_IF_COPY(b, 1);
-	}
+	return result;
+}
 
-	/*
-	 * Is it OK to return a non-null intersection for non-overlapping boxes?
-	 */
+Datum
+cube_inter(PG_FUNCTION_ARGS)
+{
+	NDBOX	   *a = PG_GETARG_NDBOX(0);
+	NDBOX	   *b = PG_GETARG_NDBOX(1);
+	NDBOX	   *result;
+
+	result = _cube_inter(a, b);
+
+	PG_FREE_IF_COPY(a, 0);
+	PG_FREE_IF_COPY(b, 1);
 	PG_RETURN_NDBOX(result);
 }
 
@@ -884,7 +760,7 @@ cube_size(PG_FUNCTION_ARGS)
 	PG_RETURN_FLOAT8(result);
 }
 
-void
+static inline void
 rt_cube_size(NDBOX *a, double *size)
 {
 	int			i,
@@ -903,7 +779,7 @@ rt_cube_size(NDBOX *a, double *size)
 
 /* make up a metric in which one box will be 'lower' than the other
    -- this can be useful for sorting and to determine uniqueness */
-int32
+static inline int32
 cube_cmp_v0(NDBOX *a, NDBOX *b)
 {
 	int			i;
@@ -983,6 +859,101 @@ cube_cmp_v0(NDBOX *a, NDBOX *b)
 	return 0;
 }
 
+static inline NDBOX*
+adjust_box(NDBOX *b, NDBOX *addon)
+{
+	int			i, size;
+	NDBOX	   *cube;
+
+	if (b->dim >= addon->dim)
+		cube = b;
+	else
+	{
+		size = offsetof(NDBOX, x[0]) + 2*sizeof(double)*addon->dim;
+		cube = (NDBOX *) palloc0(size);
+		SET_VARSIZE(cube, size);
+		cube->dim = addon->dim;
+
+		for (i=0; i<b->dim; i++)
+			cube->x[cube->dim + i] = b->x[b->dim + i];
+
+		for (; i<cube->dim; i++)
+			cube->x[i] = cube->x[cube->dim + i] = 0;
+	}
+
+	for(i=0; i < addon->dim; i++)
+	{
+		if (cube->x[i + cube->dim] < addon->x[i + addon->dim])
+			cube->x[i + cube->dim] = addon->x[i + addon->dim];
+		if (cube->x[i] > addon->x[i])
+			cube->x[i] = addon->x[i];
+	}
+
+	return cube;
+}
+
+static int
+interval_cmp_lower(const void *i1, const void *i2)
+{
+	double		lower1 = ((const SplitInterval *) i1)->lower,
+				lower2 = ((const SplitInterval *) i2)->lower;
+
+	if (lower1 < lower2)
+		return -1;
+	else if (lower1 > lower2)
+		return 1;
+	else
+		return 0;
+}
+
+static int
+interval_cmp_upper(const void *i1, const void *i2)
+{
+	double		upper1 = ((const SplitInterval *) i1)->upper,
+				upper2 = ((const SplitInterval *) i2)->upper;
+
+	if (upper1 < upper2)
+		return -1;
+	else if (upper1 > upper2)
+		return 1;
+	else
+		return 0;
+}
+
+static inline float
+non_negative(float val)
+{
+	if (val >= 0.0f)
+		return val;
+	else
+		return 0.0f;
+}
+
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+	double		delta1 = ((const CommonEntry *) i1)->delta,
+				delta2 = ((const CommonEntry *) i2)->delta;
+
+	if (delta1 < delta2)
+		return -1;
+	else if (delta1 > delta2)
+		return 1;
+	else
+		return 0;
+}
+
+static inline double
+cube_penalty(NDBOX *origentry, NDBOX *newentry)
+{
+	double		tmp1,
+				tmp2;
+
+	rt_cube_size(cube_union_v0(origentry, newentry), &tmp1);
+	rt_cube_size(origentry, &tmp2);
+	return (tmp1 - tmp2);
+}
+
 Datum
 cube_cmp(PG_FUNCTION_ARGS)
 {
@@ -1090,7 +1061,7 @@ cube_ge(PG_FUNCTION_ARGS)
 
 /* Contains */
 /* Box(A) CONTAINS Box(B) IFF pt(A) < pt(B) */
-bool
+static inline bool
 cube_contains_v0(NDBOX *a, NDBOX *b)
 {
 	int			i;
@@ -1160,7 +1131,7 @@ cube_contained(PG_FUNCTION_ARGS)
 
 /* Overlap */
 /* Box(A) Overlap Box(B) IFF (pt(a)LL < pt(B)UR) && (pt(b)LL < pt(a)UR) */
-bool
+static inline bool
 cube_overlap_v0(NDBOX *a, NDBOX *b)
 {
 	int			i;
@@ -1274,7 +1245,7 @@ cube_distance(PG_FUNCTION_ARGS)
 	PG_RETURN_FLOAT8(sqrt(distance));
 }
 
-static double
+static inline double
 distance_1D(double a1, double a2, double b1, double b2)
 {
 	/* interval (a) is entirely on the left of (b) */
@@ -1494,3 +1465,681 @@ cube_c_f8_f8(PG_FUNCTION_ARGS)
 	PG_FREE_IF_COPY(c, 0);
 	PG_RETURN_NDBOX(result);
 }
+
+
+/*
+ * Split routines.
+ */
+
+static void
+guttman_split(GistEntryVector *entryvec, GIST_SPLITVEC *v)
+{
+	OffsetNumber	i,
+					j,
+					nelems,
+					seed_1,
+					seed_2,
+					selected_offset;
+	NDBOX		   *cube_alpha,
+				   *cube_beta,
+				   *union_cube,
+				   *inter_cube,
+				   *cube_left,
+				   *cube_right,
+				   *current_cube,
+				   *left_union,
+				   *right_union,
+				   *selected_cube,
+				   *cube;
+	double			size_waste,
+					waste,
+					volume_left,
+					volume_right,
+					current_increase_l,
+					current_increase_r,
+					current_diff_increase,
+					diff_increase,
+					increase_l,
+					increase_r,
+					sz1,
+					sz2;
+	int				nbytes,
+					min_count,
+					remaining_count;
+	bool			firsttime,
+				   *inserted_cubes;
+
+	nelems = entryvec->n - 1;
+	nbytes = nelems*sizeof(OffsetNumber) + 1;
+	v->spl_left = (OffsetNumber *) palloc(nbytes);
+	v->spl_right = (OffsetNumber *) palloc(nbytes);
+	inserted_cubes = palloc0(nelems + 1);
+	v->spl_nleft = v->spl_nright = 0;
+
+	firsttime = true;
+	waste = 0.0;
+
+	/* 
+	 * pick two seed elements
+	 */
+	for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i))
+	{
+		cube_alpha = DatumGetNDBOX(entryvec->vector[i].key);
+		for (j = OffsetNumberNext(i); j <= nelems; j = OffsetNumberNext(j))
+		{
+			cube_beta = DatumGetNDBOX(entryvec->vector[j].key);
+
+			/* compute the wasted space by unioning these guys */
+			/* size_waste = size_union - size_inter; */
+			union_cube = cube_union_v0(cube_alpha, cube_beta);
+			inter_cube = _cube_inter(cube_alpha, cube_beta);
+			rt_cube_size(union_cube, &sz1);
+			rt_cube_size(inter_cube, &sz2);
+			size_waste = sz1 - sz2;
+
+			/*
+			 * are these a more promising split than what we've already seen?
+			 */
+			if (size_waste > waste || firsttime)
+			{
+				waste = size_waste;
+				seed_1 = i;
+				seed_2 = j;
+				firsttime = false;
+			}
+		}
+	}
+
+	/* selected elements became first elements in groups */
+	v->spl_left[v->spl_nleft++] = seed_1;
+	v->spl_right[v->spl_nright++] = seed_2;
+	inserted_cubes[seed_1] = inserted_cubes[seed_2] = true;
+	
+	/* bounding boxes for groups */
+	cube = DatumGetNDBOX(entryvec->vector[seed_1].key);
+	cube_left = palloc(VARSIZE(cube));
+	memcpy(cube_left, cube, VARSIZE(cube));
+
+	cube = DatumGetNDBOX(entryvec->vector[seed_2].key);
+	cube_right = palloc(VARSIZE(cube));
+	memcpy(cube_right, cube, VARSIZE(cube));
+
+	rt_cube_size(cube_left, &volume_left);
+	rt_cube_size(cube_right, &volume_right);
+
+	/*
+	 * insert cubes
+	 */
+	remaining_count = nelems-2;
+	min_count = ceil(LIMIT_RATIO*(nelems - 1));
+
+	while (remaining_count > 0)
+	{
+		diff_increase = 0.0;
+
+		/* check if there ehough cubes to satisfy mininum elements restriction */
+		if ((v->spl_nleft + remaining_count) <= min_count || (v->spl_nright + remaining_count) <= min_count )
+			break;
+
+		/* find cube with biggest diff_increase to insert it */
+		for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i))
+		{
+			current_cube = DatumGetNDBOX(entryvec->vector[i].key);
+		
+			/* skip already inserted cubes */
+			if (inserted_cubes[i])
+				continue;
+
+			left_union = cube_union_v0(cube_left, current_cube);
+			rt_cube_size(union_cube, &sz1);
+			current_increase_l = sz1 - volume_left;
+
+			right_union = cube_union_v0(cube_right, current_cube);
+			rt_cube_size(union_cube, &sz2);
+			current_increase_r = sz2 - volume_right;
+
+			current_diff_increase = Abs(current_increase_l-current_increase_r);
+
+			if (current_diff_increase >= diff_increase)
+			{
+				diff_increase = current_diff_increase;
+				increase_l = current_increase_l;
+				increase_r = current_increase_r;
+				selected_offset = i;
+			}
+		}
+
+		/* now insert selected cube on leaf with minimal increase */
+		selected_cube = DatumGetNDBOX(entryvec->vector[selected_offset].key);
+		if (increase_l < increase_r)
+		{
+			v->spl_left[v->spl_nleft++] = selected_offset;
+			cube_left = adjust_box(cube_left, selected_cube);
+			rt_cube_size(cube_left, &volume_left);
+		}
+		else
+		{
+			v->spl_right[v->spl_nright++] = selected_offset;
+			cube_right = adjust_box(cube_right, selected_cube);
+			rt_cube_size(cube_right, &volume_right);
+		}
+
+		inserted_cubes[selected_offset] = true;	/* mark this cube as inserted */
+		remaining_count--;
+	}
+
+	/* insert all remaining entries (if any) to one branch */
+	if (remaining_count > 0)
+	{
+		if (v->spl_nleft < v->spl_nright)
+		{
+			for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i))
+			{
+				if (!inserted_cubes[i]){
+					v->spl_left[v->spl_nleft++] = i;
+					cube_left = adjust_box(cube_left, DatumGetNDBOX(entryvec->vector[i].key));
+				}
+			}
+		}
+		else
+		{
+			for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i))
+			{
+				if (!inserted_cubes[i]){
+					v->spl_right[v->spl_nright++] = i;
+					cube_right = adjust_box(cube_right, DatumGetNDBOX(entryvec->vector[i].key));
+				}
+			}
+		}
+	}
+
+	v->spl_ldatum = PointerGetDatum(cube_left);
+	v->spl_rdatum = PointerGetDatum(cube_right);
+	return;
+}
+
+static void
+korotkov_split(GistEntryVector *entryvec, GIST_SPLITVEC *v)
+{
+	ConsiderSplitContext context;
+	OffsetNumber	i,
+					maxoff;
+	NDBOX		   *box,
+				   *leftBox,
+				   *rightBox;
+	int				dim,
+					commonEntriesCount;
+	SplitInterval  *intervalsLower,
+				   *intervalsUpper;
+	CommonEntry	   *commonEntries;
+	int				nentries;
+
+	memset(&context, 0, sizeof(ConsiderSplitContext));
+
+	maxoff = entryvec->n - 1;
+	nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
+
+	/* Allocate arrays for intervals along axes */
+	intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
+	intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
+
+	/*
+	 * Calculate the overall minimum bounding box over all the entries.
+	 */
+	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+	{
+		box = DatumGetNDBOX(entryvec->vector[i].key);
+		if (i == FirstOffsetNumber)
+		{
+			context.boundingBox = palloc(VARSIZE(box));
+			memcpy(context.boundingBox, box, VARSIZE(box));
+		}
+		else
+			context.boundingBox = adjust_box(context.boundingBox, box);
+	}
+
+	/*
+	 * Iterate over axes for optimal split searching.
+	 */
+	context.first = true;		/* nothing selected yet */
+	for (dim = 0; dim < box->dim; dim++)
+	{
+		double		leftUpper,
+					rightLower;
+		int			i1,
+					i2;
+
+		/* Project each entry as an interval on the selected axis. */
+		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+		{
+			box = DatumGetNDBOX(entryvec->vector[i].key);
+			intervalsLower[i - FirstOffsetNumber].lower = box->x[dim];
+			intervalsLower[i - FirstOffsetNumber].upper = box->x[dim+box->dim];
+		}
+
+		/*
+		 * Make two arrays of intervals: one sorted by lower bound and another
+		 * sorted by upper bound.
+		 */
+		memcpy(intervalsUpper, intervalsLower,
+			   sizeof(SplitInterval) * nentries);
+		qsort(intervalsLower, nentries, sizeof(SplitInterval),
+			  interval_cmp_lower);
+		qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+			  interval_cmp_upper);
+
+		/*----
+		 * The goal is to form a left and right interval, so that every entry
+		 * interval is contained by either left or right interval (or both).
+		 *
+		 * For example, with the intervals (0,1), (1,3), (2,3), (2,4):
+		 *
+		 * 0 1 2 3 4
+		 * +-+
+		 *	 +---+
+		 *	   +-+
+		 *	   +---+
+		 *
+		 * The left and right intervals are of the form (0,a) and (b,4).
+		 * We first consider splits where b is the lower bound of an entry.
+		 * We iterate through all entries, and for each b, calculate the
+		 * smallest possible a. Then we consider splits where a is the
+		 * uppper bound of an entry, and for each a, calculate the greatest
+		 * possible b.
+		 *
+		 * In the above example, the first loop would consider splits:
+		 * b=0: (0,1)-(0,4)
+		 * b=1: (0,1)-(1,4)
+		 * b=2: (0,3)-(2,4)
+		 *
+		 * And the second loop:
+		 * a=1: (0,1)-(1,4)
+		 * a=3: (0,3)-(2,4)
+		 * a=4: (0,4)-(2,4)
+		 */
+
+		/*
+		 * Iterate over lower bound of right group, finding smallest possible
+		 * upper bound of left group.
+		 */
+		i1 = 0;
+		i2 = 0;
+		rightLower = intervalsLower[i1].lower;
+		leftUpper = intervalsUpper[i2].lower;
+		while (true)
+		{
+			/*
+			 * Find next lower bound of right group.
+			 */
+			while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+			{
+				leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+				i1++;
+			}
+			if (i1 >= nentries)
+				break;
+			rightLower = intervalsLower[i1].lower;
+
+			/*
+			 * Find count of intervals which anyway should be placed to the
+			 * left group.
+			 */
+			while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+				i2++;
+
+			/*
+			 * Consider found split.
+			 */
+			cube_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+		}
+
+		/*
+		 * Iterate over upper bound of left group finding greates possible
+		 * lower bound of right group.
+		 */
+		i1 = nentries - 1;
+		i2 = nentries - 1;
+		rightLower = intervalsLower[i1].upper;
+		leftUpper = intervalsUpper[i2].upper;
+		while (true)
+		{
+			/*
+			 * Find next upper bound of left group.
+			 */
+			while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+			{
+				rightLower = Min(rightLower, intervalsUpper[i2].lower);
+				i2--;
+			}
+			if (i2 < 0)
+				break;
+			leftUpper = intervalsUpper[i2].upper;
+
+			/*
+			 * Find count of intervals which anyway should be placed to the
+			 * right group.
+			 */
+			while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+				i1--;
+
+			/*
+			 * Consider found split.
+			 */
+			cube_consider_split(&context, dim,
+								 rightLower, i1 + 1, leftUpper, i2 + 1);
+		}
+	}
+
+	/*
+	 * If we failed to find any acceptable splits, use trivial split.
+	 */
+	if (context.first)
+	{
+		fallback_split(entryvec, v);
+		return;
+	}
+
+	/*
+	 * Ok, we have now selected the split across one axis.
+	 *
+	 * While considering the splits, we already determined that there will be
+	 * enough entries in both groups to reach the desired ratio, but we did
+	 * not memorize which entries go to which group. So determine that now.
+	 */
+
+	/* Allocate vectors for results */
+	v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+	v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+	v->spl_nleft = 0;
+	v->spl_nright = 0;
+
+	/* Allocate bounding boxes of left and right groups */
+	leftBox = palloc0(VARSIZE(context.boundingBox));
+	rightBox = palloc0(VARSIZE(context.boundingBox));
+
+	/*
+	 * Allocate an array for "common entries" - entries which can be placed to
+	 * either group without affecting overlap along selected axis.
+	 */
+	commonEntriesCount = 0;
+	commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+	/* Helper macros to place an entry in the left or right group */
+#define PLACE_LEFT(box, off)					\
+	do {										\
+		if (v->spl_nleft > 0)					\
+			leftBox = adjust_box(leftBox, box);	\
+		else									\
+			*leftBox = *(box);					\
+		v->spl_left[v->spl_nleft++] = off;		\
+	} while(0)
+
+#define PLACE_RIGHT(box, off)					\
+	do {										\
+		if (v->spl_nright > 0)					\
+			rightBox = adjust_box(rightBox, box);\
+		else									\
+			*rightBox = *(box);					\
+		v->spl_right[v->spl_nright++] = off;	\
+	} while(0)
+
+	/*
+	 * Distribute entries which can be distributed unambiguously, and collect
+	 * common entries.
+	 */
+	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+	{
+		double		lower,
+					upper;
+
+		/*
+		 * Get upper and lower bounds along selected axis.
+		 */
+		box = DatumGetNDBOX(entryvec->vector[i].key);
+		lower = box->x[context.dim];
+		upper = box->x[context.dim + box->dim];
+
+		if (upper <= context.leftUpper)
+		{
+			/* Fits to the left group */
+			if (lower >= context.rightLower)
+			{
+				/* Fits also to the right group, so "common entry" */
+				commonEntries[commonEntriesCount++].index = i;
+			}
+			else
+			{
+				/* Doesn't fit to the right group, so join to the left group */
+				PLACE_LEFT(box, i);
+			}
+		}
+		else
+		{
+			/*
+			 * Each entry should fit on either left or right group. Since this
+			 * entry didn't fit on the left group, it better fit in the right
+			 * group.
+			 */
+			Assert(lower >= context.rightLower);
+
+			/* Doesn't fit to the left group, so join to the right group */
+			PLACE_RIGHT(box, i);
+		}
+	}
+
+	/*
+	 * Distribute "common entries", if any.
+	 */
+	if (commonEntriesCount > 0)
+	{
+		/*
+		 * Calculate minimum number of entries that must be placed in both
+		 * groups, to reach LIMIT_RATIO.
+		 */
+		int			m = ceil(LIMIT_RATIO * (double) nentries);
+
+		/*
+		 * Calculate delta between penalties of join "common entries" to
+		 * different groups.
+		 */
+		for (i = 0; i < commonEntriesCount; i++)
+		{
+			box = DatumGetNDBOX(entryvec->vector[commonEntries[i].index].key);
+			commonEntries[i].delta = Abs(cube_penalty(leftBox, box) - cube_penalty(rightBox, box));
+		}
+
+		/*
+		 * Sort "common entries" by calculated deltas in order to distribute
+		 * the most ambiguous entries first.
+		 */
+		qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
+
+		/*
+		 * Distribute "common entries" between groups.
+		 */
+		for (i = 0; i < commonEntriesCount; i++)
+		{
+			box = DatumGetNDBOX(entryvec->vector[commonEntries[i].index].key);
+
+			/*
+			 * Check if we have to place this entry in either group to achieve
+			 * LIMIT_RATIO.
+			 */
+			if (v->spl_nleft + (commonEntriesCount - i) <= m)
+				PLACE_LEFT(box, commonEntries[i].index);
+			else if (v->spl_nright + (commonEntriesCount - i) <= m)
+				PLACE_RIGHT(box, commonEntries[i].index);
+			else
+			{
+				/* Otherwise select the group by minimal penalty */
+				if (cube_penalty(leftBox, box) < cube_penalty(rightBox, box))
+					PLACE_LEFT(box, commonEntries[i].index);
+				else
+					PLACE_RIGHT(box, commonEntries[i].index);
+			}
+		}
+	}
+
+	v->spl_ldatum = PointerGetDatum(leftBox);
+	v->spl_rdatum = PointerGetDatum(rightBox);
+
+	return;
+}
+
+static void
+fallback_split(GistEntryVector *entryvec, GIST_SPLITVEC *v)
+{
+	OffsetNumber i,
+				maxoff;
+	NDBOX	   *unionL = NULL,
+			   *unionR = NULL;
+	int			nbytes;
+
+	maxoff = entryvec->n - 1;
+
+	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
+	v->spl_left = (OffsetNumber *) palloc(nbytes);
+	v->spl_right = (OffsetNumber *) palloc(nbytes);
+	v->spl_nleft = v->spl_nright = 0;
+
+	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+	{
+		NDBOX		   *cur = DatumGetNDBOX(entryvec->vector[i].key);
+
+		if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
+		{
+			v->spl_left[v->spl_nleft] = i;
+			if (unionL == NULL)
+			{
+				unionL = (NDBOX *) palloc(VARSIZE(cur));
+				*unionL = *cur;
+			}
+			else
+				unionL = adjust_box(unionL, cur);
+
+			v->spl_nleft++;
+		}
+		else
+		{
+			v->spl_right[v->spl_nright] = i;
+			if (unionR == NULL)
+			{
+				unionR = (NDBOX *) palloc(VARSIZE(cur));
+				*unionR = *cur;
+			}
+			else
+				unionR = adjust_box(unionR, cur);
+
+			v->spl_nright++;
+		}
+	}
+
+	v->spl_ldatum = PointerGetDatum(unionL);
+	v->spl_rdatum = PointerGetDatum(unionR);
+}
+
+/*
+ * Consider replacement of currently selected split with the better one.
+ */
+static inline void
+cube_consider_split(ConsiderSplitContext *context, int dimNum,
+					 double rightLower, int minLeftCount,
+					 double leftUpper, int maxLeftCount)
+{
+	int			leftCount,
+				rightCount;
+	float4		ratio,
+				overlap;
+	double		range;
+
+	/*
+	 * Calculate entries distribution ratio assuming most uniform distribution
+	 * of common entries.
+	 */
+	if (minLeftCount >= (context->entriesCount + 1) / 2)
+	{
+		leftCount = minLeftCount;
+	}
+	else
+	{
+		if (maxLeftCount <= context->entriesCount / 2)
+			leftCount = maxLeftCount;
+		else
+			leftCount = context->entriesCount / 2;
+	}
+	rightCount = context->entriesCount - leftCount;
+
+	/*
+	 * Ratio of split - quotient between size of lesser group and total
+	 * entries count.
+	 */
+	ratio = ((float4) Min(leftCount, rightCount)) /
+		((float4) context->entriesCount);
+
+	if (ratio > LIMIT_RATIO)
+	{
+		bool		selectthis = false;
+
+		/*
+		 * The ratio is acceptable, so compare current split with previously
+		 * selected one. Between splits of one dimension we search for minimal
+		 * overlap (allowing negative values) and minimal ration (between same
+		 * overlaps. We switch dimension if find less overlap (non-negative)
+		 * or less range with same overlap.
+		 */
+		range = (context->boundingBox)->x[dimNum+(context->boundingBox)->dim]
+		 - (context->boundingBox)->x[dimNum];
+
+		overlap = (leftUpper - rightLower) / range;
+
+		/* If there is no previous selection, select this */
+		if (context->first)
+			selectthis = true;
+		else if (context->dim == dimNum)
+		{
+			/*
+			 * Within the same dimension, choose the new split if it has a
+			 * smaller overlap, or same overlap but better ratio.
+			 */
+			if (overlap < context->overlap ||
+				(overlap == context->overlap && ratio > context->ratio))
+				selectthis = true;
+		}
+		else
+		{
+			/*
+			 * Across dimensions, choose the new split if it has a smaller
+			 * *non-negative* overlap, or same *non-negative* overlap but
+			 * bigger range. This condition differs from the one described in
+			 * the article. On the datasets where leaf MBRs don't overlap
+			 * themselves, non-overlapping splits (i.e. splits which have zero
+			 * *non-negative* overlap) are frequently possible. In this case
+			 * splits tends to be along one dimension, because most distant
+			 * non-overlapping splits (i.e. having lowest negative overlap)
+			 * appears to be in the same dimension as in the previous split.
+			 * Therefore MBRs appear to be very prolonged along another
+			 * dimension, which leads to bad search performance. Using range
+			 * as the second split criteria makes MBRs more quadratic. Using
+			 * *non-negative* overlap instead of overlap as the first split
+			 * criteria gives to range criteria a chance to matter, because
+			 * non-overlapping splits are equivalent in this criteria.
+			 */
+			if (non_negative(overlap) < non_negative(context->overlap) ||
+				(range > context->range &&
+				 non_negative(overlap) <= non_negative(context->overlap)))
+				selectthis = true;
+		}
+
+		if (selectthis)
+		{
+			/* save information about selected split */
+			context->first = false;
+			context->ratio = ratio;
+			context->range = range;
+			context->overlap = overlap;
+			context->rightLower = rightLower;
+			context->leftUpper = leftUpper;
+			context->dim = dimNum;
+		}
+	}
+}
diff --git a/contrib/cube/cubedata.h b/contrib/cube/cubedata.h
index fd0c26a..7f5fe11 100644
--- a/contrib/cube/cubedata.h
+++ b/contrib/cube/cubedata.h
@@ -12,3 +12,57 @@ typedef struct NDBOX
 #define DatumGetNDBOX(x)	((NDBOX*)DatumGetPointer(x))
 #define PG_GETARG_NDBOX(x)	DatumGetNDBOX( PG_DETOAST_DATUM(PG_GETARG_DATUM(x)) )
 #define PG_RETURN_NDBOX(x)	PG_RETURN_POINTER(x)
+
+/*
+ * Represents information about an entry that can be placed to either group
+ * without affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+	/* Index of entry in the initial array */
+	int			index;
+	/* Delta between penalties of entry insertion into different groups */
+	double		delta;
+} CommonEntry;
+
+/*
+ * Context for g_box_consider_split. Contains information about currently
+ * selected split and some general information.
+ */
+typedef struct
+{
+	int			entriesCount;	/* total number of entries being split */
+	NDBOX	   *boundingBox;	/* minimum bounding box across all entries */
+
+	/* Information about currently selected split follows */
+
+	bool		first;			/* true if no split was selected yet */
+
+	double		leftUpper;		/* upper bound of left interval */
+	double		rightLower;		/* lower bound of right interval */
+
+	float4		ratio;
+	float4		overlap;
+	int			dim;			/* axis of this split */
+	double		range;			/* width of general MBR projection to the
+								 * selected axis */
+} ConsiderSplitContext;
+
+/*
+ * Interval represents projection of box to axis.
+ */
+typedef struct
+{
+	double		lower,
+				upper;
+} SplitInterval;
+
+/* Minimum accepted ratio of split */
+#define LIMIT_RATIO 0.3
+
+/* 
+ * We use Guttman split when cube dim >= SPLIT_THRESHOLD 
+ * and Korotkov split otherwise.
+ */
+#define SPLIT_THRESHOLD 3
+
#2Peter Eisentraut
peter_e@gmx.net
In reply to: Stas Kelvich (#1)
Re: Cube extension split algorithm fix

On 9/25/13 7:14 AM, Stas Kelvich wrote:

I've fixed split algorithm that was implemented in cube extension.

This patch creates a bunch of new compiler warnings. Please fix those.

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Alexander Korotkov
aekorotkov@gmail.com
In reply to: Stas Kelvich (#1)
Re: Cube extension split algorithm fix

Hi!

On Wed, Sep 25, 2013 at 3:14 PM, Stas Kelvich <stas.kelvich@gmail.com>wrote:

I've fixed split algorithm that was implemented in cube extension. I've
changed it according to the original Guttman paper (old version was more
simple algorithm) and also ported Alexander Korotkov's algorithm from box
datatype indexing that work faster and better on low dimensions.

On my test dataset (1M records, 7 dimensions, real world database of
goods) numbers was following:

Building index over table (on expression):
old: 67.296058 seconds
new: 48.842391 seconds

Cube point search, mean, 100 queries
old: 0.001025 seconds
new: 0.000427 seconds

Index on field size:
old: 562 MB
new: 283 MB

Thanks for patch! Here is my review.

1) After points for cubes were committed, this patch doesn't applies
anymore to head.

patching file contrib/cube/cube.c
Hunk #1 FAILED at 131.
Hunk #2 succeeded at 482 (offset 17 lines).
Hunk #3 succeeded at 494 (offset 17 lines).
Hunk #4 succeeded at 501 (offset 17 lines).
Hunk #5 succeeded at 611 (offset 17 lines).
Hunk #6 FAILED at 681.
Hunk #7 succeeded at 790 with fuzz 1 (offset 30 lines).
Hunk #8 succeeded at 808 (offset 29 lines).
Hunk #9 succeeded at 888 (offset 29 lines).
Hunk #10 succeeded at 1090 (offset 29 lines).
Hunk #11 succeeded at 1160 (offset 29 lines).
Hunk #12 succeeded at 1272 (offset 27 lines).
Hunk #13 succeeded at 1560 with fuzz 1 (offset 95 lines).
2 out of 13 hunks FAILED -- saving rejects to file contrib/cube/cube.c.rej
(Stripping trailing CRs from patch.)
patching file contrib/cube/cubedata.h
Hunk #1 succeeded at 47 (offset 35 lines).

2) Split algorithms are choosing by number of dimensions in first cube.
This is generally weak idea :) At least, cubes could have different number
of dimensions in the same index. This rule would give different answers for
different orders of cubes. Also, as corner case n+1 dimensional cubes with
confluent (n+1)th dimension are generally same as n dimensional cubes.
Choosing split algorithm shouldn't depend on it. We should try to
extrapolate this principle little more and detect useless for split
dimensions to do a better choice. Also, as responsible part as choosing
split algorithm needs to be documented (with references to the papers if
needed).

3) We need more comprehensive testing. One dataset is not enough. We need
much more to prove the rule of choosing split algorithm. Description of set
of queries 'Cube point search' is not detail. Can you share dataset you
use? We need the tests that anyone can reproduce.

------
With best regards,
Alexander Korotkov.