diff --git a/contrib/cube/cube.c b/contrib/cube/cube.c index dab0e6e..0ff7f8d 100644 --- a/contrib/cube/cube.c +++ b/contrib/cube/cube.c @@ -131,21 +131,32 @@ Datum cube_enlarge(PG_FUNCTION_ARGS); /* ** For internal use only */ -int32 cube_cmp_v0(NDBOX *a, NDBOX *b); -bool cube_contains_v0(NDBOX *a, NDBOX *b); -bool cube_overlap_v0(NDBOX *a, NDBOX *b); -NDBOX *cube_union_v0(NDBOX *a, NDBOX *b); -void rt_cube_size(NDBOX *a, double *sz); +static int interval_cmp_lower(const void *i1, const void *i2); +static int interval_cmp_upper(const void *i1, const void *i2); +static int common_entry_cmp(const void *i1, const void *i2); +static void guttman_split(GistEntryVector *entryvec, GIST_SPLITVEC *v); +static void korotkov_split(GistEntryVector *entryvec, GIST_SPLITVEC *v); +static void fallback_split(GistEntryVector *entryvec, GIST_SPLITVEC *v); + +static inline int32 cube_cmp_v0(NDBOX *a, NDBOX *b); +static inline bool cube_contains_v0(NDBOX *a, NDBOX *b); +static inline bool cube_overlap_v0(NDBOX *a, NDBOX *b); +static inline NDBOX *cube_union_v0(NDBOX *a, NDBOX *b); +static inline NDBOX *_cube_inter(NDBOX *a, NDBOX *b); +static inline void rt_cube_size(NDBOX *a, double *sz); +static inline double distance_1D(double a1, double a2, double b1, double b2); +static inline double cube_penalty(NDBOX *origentry, NDBOX *newentry); + +static inline float non_negative(float val); +static inline NDBOX *adjust_box(NDBOX *b, NDBOX *addon); +static inline void cube_consider_split(ConsiderSplitContext *context, int dimNum, + double rightLower, int minLeftCount, + double leftUpper, int maxLeftCount); + NDBOX *g_cube_binary_union(NDBOX *r1, NDBOX *r2, int *sizep); bool g_cube_leaf_consistent(NDBOX *key, NDBOX *query, StrategyNumber strategy); bool g_cube_internal_consistent(NDBOX *key, NDBOX *query, StrategyNumber strategy); -/* -** Auxiliary funxtions -*/ -static double distance_1D(double a1, double a2, double b1, double b2); - - /***************************************************************************** * Input/Output functions *****************************************************************************/ @@ -454,15 +465,11 @@ g_cube_penalty(PG_FUNCTION_ARGS) GISTENTRY *origentry = (GISTENTRY *) PG_GETARG_POINTER(0); GISTENTRY *newentry = (GISTENTRY *) PG_GETARG_POINTER(1); float *result = (float *) PG_GETARG_POINTER(2); - NDBOX *ud; - double tmp1, - tmp2; - ud = cube_union_v0(DatumGetNDBOX(origentry->key), - DatumGetNDBOX(newentry->key)); - rt_cube_size(ud, &tmp1); - rt_cube_size(DatumGetNDBOX(origentry->key), &tmp2); - *result = (float) (tmp1 - tmp2); + *result = cube_penalty( + DatumGetNDBOX(origentry->key), + DatumGetNDBOX(newentry->key) + ); /* * fprintf(stderr, "penalty\n"); fprintf(stderr, "\t%g\n", *result); @@ -470,8 +477,6 @@ g_cube_penalty(PG_FUNCTION_ARGS) PG_RETURN_FLOAT8(*result); } - - /* ** The GiST PickSplit method for boxes ** We use Guttman's poly time split algorithm @@ -479,148 +484,13 @@ g_cube_penalty(PG_FUNCTION_ARGS) Datum g_cube_picksplit(PG_FUNCTION_ARGS) { - GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); - GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); - OffsetNumber i, - j; - NDBOX *datum_alpha, - *datum_beta; - NDBOX *datum_l, - *datum_r; - NDBOX *union_d, - *union_dl, - *union_dr; - NDBOX *inter_d; - bool firsttime; - double size_alpha, - size_beta, - size_union, - size_inter; - double size_waste, - waste; - double size_l, - size_r; - int nbytes; - OffsetNumber seed_1 = 1, - seed_2 = 2; - OffsetNumber *left, - *right; - OffsetNumber maxoff; - - /* - * fprintf(stderr, "picksplit\n"); - */ - maxoff = entryvec->n - 2; - nbytes = (maxoff + 2) * sizeof(OffsetNumber); - v->spl_left = (OffsetNumber *) palloc(nbytes); - v->spl_right = (OffsetNumber *) palloc(nbytes); - - firsttime = true; - waste = 0.0; - - for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) - { - datum_alpha = DatumGetNDBOX(entryvec->vector[i].key); - for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) - { - datum_beta = DatumGetNDBOX(entryvec->vector[j].key); - - /* compute the wasted space by unioning these guys */ - /* size_waste = size_union - size_inter; */ - union_d = cube_union_v0(datum_alpha, datum_beta); - rt_cube_size(union_d, &size_union); - inter_d = DatumGetNDBOX(DirectFunctionCall2(cube_inter, - entryvec->vector[i].key, entryvec->vector[j].key)); - rt_cube_size(inter_d, &size_inter); - size_waste = size_union - size_inter; - - /* - * are these a more promising split than what we've already seen? - */ - - if (size_waste > waste || firsttime) - { - waste = size_waste; - seed_1 = i; - seed_2 = j; - firsttime = false; - } - } - } - - left = v->spl_left; - v->spl_nleft = 0; - right = v->spl_right; - v->spl_nright = 0; - - datum_alpha = DatumGetNDBOX(entryvec->vector[seed_1].key); - datum_l = cube_union_v0(datum_alpha, datum_alpha); - rt_cube_size(datum_l, &size_l); - datum_beta = DatumGetNDBOX(entryvec->vector[seed_2].key); - datum_r = cube_union_v0(datum_beta, datum_beta); - rt_cube_size(datum_r, &size_r); - - /* - * Now split up the regions between the two seeds. An important property - * of this split algorithm is that the split vector v has the indices of - * items to be split in order in its left and right vectors. We exploit - * this property by doing a merge in the code that actually splits the - * page. - * - * For efficiency, we also place the new index tuple in this loop. This is - * handled at the very end, when we have placed all the existing tuples - * and i == maxoff + 1. - */ - - maxoff = OffsetNumberNext(maxoff); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - { - /* - * If we've already decided where to place this item, just put it on - * the right list. Otherwise, we need to figure out which page needs - * the least enlargement in order to store the item. - */ - - if (i == seed_1) - { - *left++ = i; - v->spl_nleft++; - continue; - } - else if (i == seed_2) - { - *right++ = i; - v->spl_nright++; - continue; - } - - /* okay, which page needs least enlargement? */ - datum_alpha = DatumGetNDBOX(entryvec->vector[i].key); - union_dl = cube_union_v0(datum_l, datum_alpha); - union_dr = cube_union_v0(datum_r, datum_alpha); - rt_cube_size(union_dl, &size_alpha); - rt_cube_size(union_dr, &size_beta); - - /* pick which page to add it to */ - if (size_alpha - size_l < size_beta - size_r) - { - datum_l = union_dl; - size_l = size_alpha; - *left++ = i; - v->spl_nleft++; - } - else - { - datum_r = union_dr; - size_r = size_beta; - *right++ = i; - v->spl_nright++; - } - } - *left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */ + GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); + GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); - v->spl_ldatum = PointerGetDatum(datum_l); - v->spl_rdatum = PointerGetDatum(datum_r); + if (DatumGetNDBOX(entryvec->vector[FirstOffsetNumber].key)->dim >= SPLIT_THRESHOLD) + guttman_split(entryvec, v); + else + korotkov_split(entryvec, v); PG_RETURN_POINTER(v); } @@ -724,7 +594,7 @@ g_cube_binary_union(NDBOX *r1, NDBOX *r2, int *sizep) /* cube_union_v0 */ -NDBOX * +static inline NDBOX * cube_union_v0(NDBOX *a, NDBOX *b) { int i; @@ -794,76 +664,82 @@ cube_union(PG_FUNCTION_ARGS) } /* cube_inter */ -Datum -cube_inter(PG_FUNCTION_ARGS) +static inline NDBOX* +_cube_inter(NDBOX *a, NDBOX *b) { - NDBOX *a = PG_GETARG_NDBOX(0); - NDBOX *b = PG_GETARG_NDBOX(1); - NDBOX *result; - bool swapped = false; + NDBOX *result, *cube1, *cube2; int i; + double edge1, edge2; - if (a->dim >= b->dim) - { - result = palloc0(VARSIZE(a)); - SET_VARSIZE(result, VARSIZE(a)); - result->dim = a->dim; - } - else - { - result = palloc0(VARSIZE(b)); - SET_VARSIZE(result, VARSIZE(b)); - result->dim = b->dim; - } + cube1 = (a->dim > b->dim) ? a : b; + cube2 = (a->dim <= b->dim) ? a : b; - /* swap the box pointers if needed */ - if (a->dim < b->dim) + result = palloc0(VARSIZE(cube1)); + SET_VARSIZE(result, VARSIZE(cube1)); + result->dim = cube1->dim; + + /* compute the intersection */ + for (i = 0; i < cube2->dim; i++) { - NDBOX *tmp = b; + edge1 = Max( + Min(cube1->x[i], cube1->x[i + cube1->dim]), + Min(cube2->x[i], cube2->x[i + cube2->dim]) + ); + edge2 = Min( + Max(cube1->x[i], cube1->x[i + cube1->dim]), + Max(cube2->x[i], cube2->x[i + cube2->dim]) + ); - b = a; - a = tmp; - swapped = true; - } - /* - * use the potentially smaller of the two boxes (b) to fill in the - * result, padding absent dimensions with zeroes - */ - for (i = 0; i < b->dim; i++) - { - result->x[i] = Min(b->x[i], b->x[i + b->dim]); - result->x[i + a->dim] = Max(b->x[i], b->x[i + b->dim]); - } - for (i = b->dim; i < a->dim; i++) - { - result->x[i] = 0; - result->x[i + a->dim] = 0; + if (edge1 <= edge2) + { + result->x[i] = edge1; + result->x[i + result->dim] = edge2; + } + else + { + result->x[i] = 0; + result->x[i + result->dim] = 0; + } } - /* compute the intersection */ - for (i = 0; i < a->dim; i++) + for (; i < cube2->dim; i++) { - result->x[i] = - Max(Min(a->x[i], a->x[i + a->dim]), result->x[i]); - result->x[i + a->dim] = Min(Max(a->x[i], - a->x[i + a->dim]), result->x[i + a->dim]); + edge1 = Max( + Min(cube1->x[i], cube1->x[i + cube1->dim]), + 0 + ); + edge2 = Min( + Max(cube1->x[i], cube1->x[i + cube1->dim]), + 0 + ); + + if (edge1 <= edge2) + { + result->x[i] = edge1; + result->x[i + result->dim] = edge2; + } + else + { + result->x[i] = 0; + result->x[i + result->dim] = 0; + } } - if (swapped) - { - PG_FREE_IF_COPY(b, 0); - PG_FREE_IF_COPY(a, 1); - } - else - { - PG_FREE_IF_COPY(a, 0); - PG_FREE_IF_COPY(b, 1); - } + return result; +} - /* - * Is it OK to return a non-null intersection for non-overlapping boxes? - */ +Datum +cube_inter(PG_FUNCTION_ARGS) +{ + NDBOX *a = PG_GETARG_NDBOX(0); + NDBOX *b = PG_GETARG_NDBOX(1); + NDBOX *result; + + result = _cube_inter(a, b); + + PG_FREE_IF_COPY(a, 0); + PG_FREE_IF_COPY(b, 1); PG_RETURN_NDBOX(result); } @@ -884,7 +760,7 @@ cube_size(PG_FUNCTION_ARGS) PG_RETURN_FLOAT8(result); } -void +static inline void rt_cube_size(NDBOX *a, double *size) { int i, @@ -903,7 +779,7 @@ rt_cube_size(NDBOX *a, double *size) /* make up a metric in which one box will be 'lower' than the other -- this can be useful for sorting and to determine uniqueness */ -int32 +static inline int32 cube_cmp_v0(NDBOX *a, NDBOX *b) { int i; @@ -983,6 +859,101 @@ cube_cmp_v0(NDBOX *a, NDBOX *b) return 0; } +static inline NDBOX* +adjust_box(NDBOX *b, NDBOX *addon) +{ + int i, size; + NDBOX *cube; + + if (b->dim >= addon->dim) + cube = b; + else + { + size = offsetof(NDBOX, x[0]) + 2*sizeof(double)*addon->dim; + cube = (NDBOX *) palloc0(size); + SET_VARSIZE(cube, size); + cube->dim = addon->dim; + + for (i=0; idim; i++) + cube->x[cube->dim + i] = b->x[b->dim + i]; + + for (; idim; i++) + cube->x[i] = cube->x[cube->dim + i] = 0; + } + + for(i=0; i < addon->dim; i++) + { + if (cube->x[i + cube->dim] < addon->x[i + addon->dim]) + cube->x[i + cube->dim] = addon->x[i + addon->dim]; + if (cube->x[i] > addon->x[i]) + cube->x[i] = addon->x[i]; + } + + return cube; +} + +static int +interval_cmp_lower(const void *i1, const void *i2) +{ + double lower1 = ((const SplitInterval *) i1)->lower, + lower2 = ((const SplitInterval *) i2)->lower; + + if (lower1 < lower2) + return -1; + else if (lower1 > lower2) + return 1; + else + return 0; +} + +static int +interval_cmp_upper(const void *i1, const void *i2) +{ + double upper1 = ((const SplitInterval *) i1)->upper, + upper2 = ((const SplitInterval *) i2)->upper; + + if (upper1 < upper2) + return -1; + else if (upper1 > upper2) + return 1; + else + return 0; +} + +static inline float +non_negative(float val) +{ + if (val >= 0.0f) + return val; + else + return 0.0f; +} + +static int +common_entry_cmp(const void *i1, const void *i2) +{ + double delta1 = ((const CommonEntry *) i1)->delta, + delta2 = ((const CommonEntry *) i2)->delta; + + if (delta1 < delta2) + return -1; + else if (delta1 > delta2) + return 1; + else + return 0; +} + +static inline double +cube_penalty(NDBOX *origentry, NDBOX *newentry) +{ + double tmp1, + tmp2; + + rt_cube_size(cube_union_v0(origentry, newentry), &tmp1); + rt_cube_size(origentry, &tmp2); + return (tmp1 - tmp2); +} + Datum cube_cmp(PG_FUNCTION_ARGS) { @@ -1090,7 +1061,7 @@ cube_ge(PG_FUNCTION_ARGS) /* Contains */ /* Box(A) CONTAINS Box(B) IFF pt(A) < pt(B) */ -bool +static inline bool cube_contains_v0(NDBOX *a, NDBOX *b) { int i; @@ -1160,7 +1131,7 @@ cube_contained(PG_FUNCTION_ARGS) /* Overlap */ /* Box(A) Overlap Box(B) IFF (pt(a)LL < pt(B)UR) && (pt(b)LL < pt(a)UR) */ -bool +static inline bool cube_overlap_v0(NDBOX *a, NDBOX *b) { int i; @@ -1274,7 +1245,7 @@ cube_distance(PG_FUNCTION_ARGS) PG_RETURN_FLOAT8(sqrt(distance)); } -static double +static inline double distance_1D(double a1, double a2, double b1, double b2) { /* interval (a) is entirely on the left of (b) */ @@ -1494,3 +1465,681 @@ cube_c_f8_f8(PG_FUNCTION_ARGS) PG_FREE_IF_COPY(c, 0); PG_RETURN_NDBOX(result); } + + +/* + * Split routines. + */ + +static void +guttman_split(GistEntryVector *entryvec, GIST_SPLITVEC *v) +{ + OffsetNumber i, + j, + nelems, + seed_1, + seed_2, + selected_offset; + NDBOX *cube_alpha, + *cube_beta, + *union_cube, + *inter_cube, + *cube_left, + *cube_right, + *current_cube, + *left_union, + *right_union, + *selected_cube, + *cube; + double size_waste, + waste, + volume_left, + volume_right, + current_increase_l, + current_increase_r, + current_diff_increase, + diff_increase, + increase_l, + increase_r, + sz1, + sz2; + int nbytes, + min_count, + remaining_count; + bool firsttime, + *inserted_cubes; + + nelems = entryvec->n - 1; + nbytes = nelems*sizeof(OffsetNumber) + 1; + v->spl_left = (OffsetNumber *) palloc(nbytes); + v->spl_right = (OffsetNumber *) palloc(nbytes); + inserted_cubes = palloc0(nelems + 1); + v->spl_nleft = v->spl_nright = 0; + + firsttime = true; + waste = 0.0; + + /* + * pick two seed elements + */ + for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i)) + { + cube_alpha = DatumGetNDBOX(entryvec->vector[i].key); + for (j = OffsetNumberNext(i); j <= nelems; j = OffsetNumberNext(j)) + { + cube_beta = DatumGetNDBOX(entryvec->vector[j].key); + + /* compute the wasted space by unioning these guys */ + /* size_waste = size_union - size_inter; */ + union_cube = cube_union_v0(cube_alpha, cube_beta); + inter_cube = _cube_inter(cube_alpha, cube_beta); + rt_cube_size(union_cube, &sz1); + rt_cube_size(inter_cube, &sz2); + size_waste = sz1 - sz2; + + /* + * are these a more promising split than what we've already seen? + */ + if (size_waste > waste || firsttime) + { + waste = size_waste; + seed_1 = i; + seed_2 = j; + firsttime = false; + } + } + } + + /* selected elements became first elements in groups */ + v->spl_left[v->spl_nleft++] = seed_1; + v->spl_right[v->spl_nright++] = seed_2; + inserted_cubes[seed_1] = inserted_cubes[seed_2] = true; + + /* bounding boxes for groups */ + cube = DatumGetNDBOX(entryvec->vector[seed_1].key); + cube_left = palloc(VARSIZE(cube)); + memcpy(cube_left, cube, VARSIZE(cube)); + + cube = DatumGetNDBOX(entryvec->vector[seed_2].key); + cube_right = palloc(VARSIZE(cube)); + memcpy(cube_right, cube, VARSIZE(cube)); + + rt_cube_size(cube_left, &volume_left); + rt_cube_size(cube_right, &volume_right); + + /* + * insert cubes + */ + remaining_count = nelems-2; + min_count = ceil(LIMIT_RATIO*(nelems - 1)); + + while (remaining_count > 0) + { + diff_increase = 0.0; + + /* check if there ehough cubes to satisfy mininum elements restriction */ + if ((v->spl_nleft + remaining_count) <= min_count || (v->spl_nright + remaining_count) <= min_count ) + break; + + /* find cube with biggest diff_increase to insert it */ + for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i)) + { + current_cube = DatumGetNDBOX(entryvec->vector[i].key); + + /* skip already inserted cubes */ + if (inserted_cubes[i]) + continue; + + left_union = cube_union_v0(cube_left, current_cube); + rt_cube_size(union_cube, &sz1); + current_increase_l = sz1 - volume_left; + + right_union = cube_union_v0(cube_right, current_cube); + rt_cube_size(union_cube, &sz2); + current_increase_r = sz2 - volume_right; + + current_diff_increase = Abs(current_increase_l-current_increase_r); + + if (current_diff_increase >= diff_increase) + { + diff_increase = current_diff_increase; + increase_l = current_increase_l; + increase_r = current_increase_r; + selected_offset = i; + } + } + + /* now insert selected cube on leaf with minimal increase */ + selected_cube = DatumGetNDBOX(entryvec->vector[selected_offset].key); + if (increase_l < increase_r) + { + v->spl_left[v->spl_nleft++] = selected_offset; + cube_left = adjust_box(cube_left, selected_cube); + rt_cube_size(cube_left, &volume_left); + } + else + { + v->spl_right[v->spl_nright++] = selected_offset; + cube_right = adjust_box(cube_right, selected_cube); + rt_cube_size(cube_right, &volume_right); + } + + inserted_cubes[selected_offset] = true; /* mark this cube as inserted */ + remaining_count--; + } + + /* insert all remaining entries (if any) to one branch */ + if (remaining_count > 0) + { + if (v->spl_nleft < v->spl_nright) + { + for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i)) + { + if (!inserted_cubes[i]){ + v->spl_left[v->spl_nleft++] = i; + cube_left = adjust_box(cube_left, DatumGetNDBOX(entryvec->vector[i].key)); + } + } + } + else + { + for (i = FirstOffsetNumber; i <= nelems; i = OffsetNumberNext(i)) + { + if (!inserted_cubes[i]){ + v->spl_right[v->spl_nright++] = i; + cube_right = adjust_box(cube_right, DatumGetNDBOX(entryvec->vector[i].key)); + } + } + } + } + + v->spl_ldatum = PointerGetDatum(cube_left); + v->spl_rdatum = PointerGetDatum(cube_right); + return; +} + +static void +korotkov_split(GistEntryVector *entryvec, GIST_SPLITVEC *v) +{ + ConsiderSplitContext context; + OffsetNumber i, + maxoff; + NDBOX *box, + *leftBox, + *rightBox; + int dim, + commonEntriesCount; + SplitInterval *intervalsLower, + *intervalsUpper; + CommonEntry *commonEntries; + int nentries; + + memset(&context, 0, sizeof(ConsiderSplitContext)); + + maxoff = entryvec->n - 1; + nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1; + + /* Allocate arrays for intervals along axes */ + intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval)); + intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval)); + + /* + * Calculate the overall minimum bounding box over all the entries. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + box = DatumGetNDBOX(entryvec->vector[i].key); + if (i == FirstOffsetNumber) + { + context.boundingBox = palloc(VARSIZE(box)); + memcpy(context.boundingBox, box, VARSIZE(box)); + } + else + context.boundingBox = adjust_box(context.boundingBox, box); + } + + /* + * Iterate over axes for optimal split searching. + */ + context.first = true; /* nothing selected yet */ + for (dim = 0; dim < box->dim; dim++) + { + double leftUpper, + rightLower; + int i1, + i2; + + /* Project each entry as an interval on the selected axis. */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + box = DatumGetNDBOX(entryvec->vector[i].key); + intervalsLower[i - FirstOffsetNumber].lower = box->x[dim]; + intervalsLower[i - FirstOffsetNumber].upper = box->x[dim+box->dim]; + } + + /* + * Make two arrays of intervals: one sorted by lower bound and another + * sorted by upper bound. + */ + memcpy(intervalsUpper, intervalsLower, + sizeof(SplitInterval) * nentries); + qsort(intervalsLower, nentries, sizeof(SplitInterval), + interval_cmp_lower); + qsort(intervalsUpper, nentries, sizeof(SplitInterval), + interval_cmp_upper); + + /*---- + * The goal is to form a left and right interval, so that every entry + * interval is contained by either left or right interval (or both). + * + * For example, with the intervals (0,1), (1,3), (2,3), (2,4): + * + * 0 1 2 3 4 + * +-+ + * +---+ + * +-+ + * +---+ + * + * The left and right intervals are of the form (0,a) and (b,4). + * We first consider splits where b is the lower bound of an entry. + * We iterate through all entries, and for each b, calculate the + * smallest possible a. Then we consider splits where a is the + * uppper bound of an entry, and for each a, calculate the greatest + * possible b. + * + * In the above example, the first loop would consider splits: + * b=0: (0,1)-(0,4) + * b=1: (0,1)-(1,4) + * b=2: (0,3)-(2,4) + * + * And the second loop: + * a=1: (0,1)-(1,4) + * a=3: (0,3)-(2,4) + * a=4: (0,4)-(2,4) + */ + + /* + * Iterate over lower bound of right group, finding smallest possible + * upper bound of left group. + */ + i1 = 0; + i2 = 0; + rightLower = intervalsLower[i1].lower; + leftUpper = intervalsUpper[i2].lower; + while (true) + { + /* + * Find next lower bound of right group. + */ + while (i1 < nentries && rightLower == intervalsLower[i1].lower) + { + leftUpper = Max(leftUpper, intervalsLower[i1].upper); + i1++; + } + if (i1 >= nentries) + break; + rightLower = intervalsLower[i1].lower; + + /* + * Find count of intervals which anyway should be placed to the + * left group. + */ + while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper) + i2++; + + /* + * Consider found split. + */ + cube_consider_split(&context, dim, rightLower, i1, leftUpper, i2); + } + + /* + * Iterate over upper bound of left group finding greates possible + * lower bound of right group. + */ + i1 = nentries - 1; + i2 = nentries - 1; + rightLower = intervalsLower[i1].upper; + leftUpper = intervalsUpper[i2].upper; + while (true) + { + /* + * Find next upper bound of left group. + */ + while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper) + { + rightLower = Min(rightLower, intervalsUpper[i2].lower); + i2--; + } + if (i2 < 0) + break; + leftUpper = intervalsUpper[i2].upper; + + /* + * Find count of intervals which anyway should be placed to the + * right group. + */ + while (i1 >= 0 && intervalsLower[i1].lower >= rightLower) + i1--; + + /* + * Consider found split. + */ + cube_consider_split(&context, dim, + rightLower, i1 + 1, leftUpper, i2 + 1); + } + } + + /* + * If we failed to find any acceptable splits, use trivial split. + */ + if (context.first) + { + fallback_split(entryvec, v); + return; + } + + /* + * Ok, we have now selected the split across one axis. + * + * While considering the splits, we already determined that there will be + * enough entries in both groups to reach the desired ratio, but we did + * not memorize which entries go to which group. So determine that now. + */ + + /* Allocate vectors for results */ + v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber)); + v->spl_nleft = 0; + v->spl_nright = 0; + + /* Allocate bounding boxes of left and right groups */ + leftBox = palloc0(VARSIZE(context.boundingBox)); + rightBox = palloc0(VARSIZE(context.boundingBox)); + + /* + * Allocate an array for "common entries" - entries which can be placed to + * either group without affecting overlap along selected axis. + */ + commonEntriesCount = 0; + commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry)); + + /* Helper macros to place an entry in the left or right group */ +#define PLACE_LEFT(box, off) \ + do { \ + if (v->spl_nleft > 0) \ + leftBox = adjust_box(leftBox, box); \ + else \ + *leftBox = *(box); \ + v->spl_left[v->spl_nleft++] = off; \ + } while(0) + +#define PLACE_RIGHT(box, off) \ + do { \ + if (v->spl_nright > 0) \ + rightBox = adjust_box(rightBox, box);\ + else \ + *rightBox = *(box); \ + v->spl_right[v->spl_nright++] = off; \ + } while(0) + + /* + * Distribute entries which can be distributed unambiguously, and collect + * common entries. + */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + double lower, + upper; + + /* + * Get upper and lower bounds along selected axis. + */ + box = DatumGetNDBOX(entryvec->vector[i].key); + lower = box->x[context.dim]; + upper = box->x[context.dim + box->dim]; + + if (upper <= context.leftUpper) + { + /* Fits to the left group */ + if (lower >= context.rightLower) + { + /* Fits also to the right group, so "common entry" */ + commonEntries[commonEntriesCount++].index = i; + } + else + { + /* Doesn't fit to the right group, so join to the left group */ + PLACE_LEFT(box, i); + } + } + else + { + /* + * Each entry should fit on either left or right group. Since this + * entry didn't fit on the left group, it better fit in the right + * group. + */ + Assert(lower >= context.rightLower); + + /* Doesn't fit to the left group, so join to the right group */ + PLACE_RIGHT(box, i); + } + } + + /* + * Distribute "common entries", if any. + */ + if (commonEntriesCount > 0) + { + /* + * Calculate minimum number of entries that must be placed in both + * groups, to reach LIMIT_RATIO. + */ + int m = ceil(LIMIT_RATIO * (double) nentries); + + /* + * Calculate delta between penalties of join "common entries" to + * different groups. + */ + for (i = 0; i < commonEntriesCount; i++) + { + box = DatumGetNDBOX(entryvec->vector[commonEntries[i].index].key); + commonEntries[i].delta = Abs(cube_penalty(leftBox, box) - cube_penalty(rightBox, box)); + } + + /* + * Sort "common entries" by calculated deltas in order to distribute + * the most ambiguous entries first. + */ + qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp); + + /* + * Distribute "common entries" between groups. + */ + for (i = 0; i < commonEntriesCount; i++) + { + box = DatumGetNDBOX(entryvec->vector[commonEntries[i].index].key); + + /* + * Check if we have to place this entry in either group to achieve + * LIMIT_RATIO. + */ + if (v->spl_nleft + (commonEntriesCount - i) <= m) + PLACE_LEFT(box, commonEntries[i].index); + else if (v->spl_nright + (commonEntriesCount - i) <= m) + PLACE_RIGHT(box, commonEntries[i].index); + else + { + /* Otherwise select the group by minimal penalty */ + if (cube_penalty(leftBox, box) < cube_penalty(rightBox, box)) + PLACE_LEFT(box, commonEntries[i].index); + else + PLACE_RIGHT(box, commonEntries[i].index); + } + } + } + + v->spl_ldatum = PointerGetDatum(leftBox); + v->spl_rdatum = PointerGetDatum(rightBox); + + return; +} + +static void +fallback_split(GistEntryVector *entryvec, GIST_SPLITVEC *v) +{ + OffsetNumber i, + maxoff; + NDBOX *unionL = NULL, + *unionR = NULL; + int nbytes; + + maxoff = entryvec->n - 1; + + nbytes = (maxoff + 2) * sizeof(OffsetNumber); + v->spl_left = (OffsetNumber *) palloc(nbytes); + v->spl_right = (OffsetNumber *) palloc(nbytes); + v->spl_nleft = v->spl_nright = 0; + + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + NDBOX *cur = DatumGetNDBOX(entryvec->vector[i].key); + + if (i <= (maxoff - FirstOffsetNumber + 1) / 2) + { + v->spl_left[v->spl_nleft] = i; + if (unionL == NULL) + { + unionL = (NDBOX *) palloc(VARSIZE(cur)); + *unionL = *cur; + } + else + unionL = adjust_box(unionL, cur); + + v->spl_nleft++; + } + else + { + v->spl_right[v->spl_nright] = i; + if (unionR == NULL) + { + unionR = (NDBOX *) palloc(VARSIZE(cur)); + *unionR = *cur; + } + else + unionR = adjust_box(unionR, cur); + + v->spl_nright++; + } + } + + v->spl_ldatum = PointerGetDatum(unionL); + v->spl_rdatum = PointerGetDatum(unionR); +} + +/* + * Consider replacement of currently selected split with the better one. + */ +static inline void +cube_consider_split(ConsiderSplitContext *context, int dimNum, + double rightLower, int minLeftCount, + double leftUpper, int maxLeftCount) +{ + int leftCount, + rightCount; + float4 ratio, + overlap; + double range; + + /* + * Calculate entries distribution ratio assuming most uniform distribution + * of common entries. + */ + if (minLeftCount >= (context->entriesCount + 1) / 2) + { + leftCount = minLeftCount; + } + else + { + if (maxLeftCount <= context->entriesCount / 2) + leftCount = maxLeftCount; + else + leftCount = context->entriesCount / 2; + } + rightCount = context->entriesCount - leftCount; + + /* + * Ratio of split - quotient between size of lesser group and total + * entries count. + */ + ratio = ((float4) Min(leftCount, rightCount)) / + ((float4) context->entriesCount); + + if (ratio > LIMIT_RATIO) + { + bool selectthis = false; + + /* + * The ratio is acceptable, so compare current split with previously + * selected one. Between splits of one dimension we search for minimal + * overlap (allowing negative values) and minimal ration (between same + * overlaps. We switch dimension if find less overlap (non-negative) + * or less range with same overlap. + */ + range = (context->boundingBox)->x[dimNum+(context->boundingBox)->dim] + - (context->boundingBox)->x[dimNum]; + + overlap = (leftUpper - rightLower) / range; + + /* If there is no previous selection, select this */ + if (context->first) + selectthis = true; + else if (context->dim == dimNum) + { + /* + * Within the same dimension, choose the new split if it has a + * smaller overlap, or same overlap but better ratio. + */ + if (overlap < context->overlap || + (overlap == context->overlap && ratio > context->ratio)) + selectthis = true; + } + else + { + /* + * Across dimensions, choose the new split if it has a smaller + * *non-negative* overlap, or same *non-negative* overlap but + * bigger range. This condition differs from the one described in + * the article. On the datasets where leaf MBRs don't overlap + * themselves, non-overlapping splits (i.e. splits which have zero + * *non-negative* overlap) are frequently possible. In this case + * splits tends to be along one dimension, because most distant + * non-overlapping splits (i.e. having lowest negative overlap) + * appears to be in the same dimension as in the previous split. + * Therefore MBRs appear to be very prolonged along another + * dimension, which leads to bad search performance. Using range + * as the second split criteria makes MBRs more quadratic. Using + * *non-negative* overlap instead of overlap as the first split + * criteria gives to range criteria a chance to matter, because + * non-overlapping splits are equivalent in this criteria. + */ + if (non_negative(overlap) < non_negative(context->overlap) || + (range > context->range && + non_negative(overlap) <= non_negative(context->overlap))) + selectthis = true; + } + + if (selectthis) + { + /* save information about selected split */ + context->first = false; + context->ratio = ratio; + context->range = range; + context->overlap = overlap; + context->rightLower = rightLower; + context->leftUpper = leftUpper; + context->dim = dimNum; + } + } +} diff --git a/contrib/cube/cubedata.h b/contrib/cube/cubedata.h index fd0c26a..7f5fe11 100644 --- a/contrib/cube/cubedata.h +++ b/contrib/cube/cubedata.h @@ -12,3 +12,57 @@ typedef struct NDBOX #define DatumGetNDBOX(x) ((NDBOX*)DatumGetPointer(x)) #define PG_GETARG_NDBOX(x) DatumGetNDBOX( PG_DETOAST_DATUM(PG_GETARG_DATUM(x)) ) #define PG_RETURN_NDBOX(x) PG_RETURN_POINTER(x) + +/* + * Represents information about an entry that can be placed to either group + * without affecting overlap over selected axis ("common entry"). + */ +typedef struct +{ + /* Index of entry in the initial array */ + int index; + /* Delta between penalties of entry insertion into different groups */ + double delta; +} CommonEntry; + +/* + * Context for g_box_consider_split. Contains information about currently + * selected split and some general information. + */ +typedef struct +{ + int entriesCount; /* total number of entries being split */ + NDBOX *boundingBox; /* minimum bounding box across all entries */ + + /* Information about currently selected split follows */ + + bool first; /* true if no split was selected yet */ + + double leftUpper; /* upper bound of left interval */ + double rightLower; /* lower bound of right interval */ + + float4 ratio; + float4 overlap; + int dim; /* axis of this split */ + double range; /* width of general MBR projection to the + * selected axis */ +} ConsiderSplitContext; + +/* + * Interval represents projection of box to axis. + */ +typedef struct +{ + double lower, + upper; +} SplitInterval; + +/* Minimum accepted ratio of split */ +#define LIMIT_RATIO 0.3 + +/* + * We use Guttman split when cube dim >= SPLIT_THRESHOLD + * and Korotkov split otherwise. + */ +#define SPLIT_THRESHOLD 3 +