Double sorting split patch

Started by Alexander Korotkovover 14 years ago24 messages
#1Alexander Korotkov
aekorotkov@gmail.com
1 attachment(s)

Hackers,

I've got my patch with double sorting picksplit impementation for GiST into
more acceptable form. A little of testing is below. Index creation time is
slightly higher, but search is much faster. The testing datasets were
following:
1) uniform dataset - 10M rows
2) geonames points - 7.6M rows

test=# create index uniform_new_linear_idx on uniform using gist (point);
CREATE INDEX
Time: 397362,915 ms

test=# explain (analyze, buffers) select * from uniform where point <@
box(point(0.5,0.5),point(0.501,0.501));
QUERY PLAN

-------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on uniform (cost=433.27..25873.19 rows=10000 width=16)
(actual time=1.407..1.448
rows=8 loops=1)
Recheck Cond: (point <@ '(0.501,0.501),(0.5,0.5)'::box)
Buffers: shared hit=39
-> Bitmap Index Scan on uniform_new_linear_idx (cost=0.00..430.77
rows=10000 width=0) (actual time=1.388..1.388 rows=8 loops=1)
Index Cond: (point <@ '(0.501,0.501),(0.5,0.5)'::box)
Buffers: shared hit=31
Total runtime: 1.527 ms
(7 rows)

test=# explain (analyze, buffers) select * from uniform where point <@
box(point(0.3,0.4),point(0.301,0.401));
QUERY PLAN

--------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on uniform (cost=433.27..25873.19 rows=10000 width=16)
(actual time=0.715..0.795
rows=15 loops=1)
Recheck Cond: (point <@ '(0.301,0.401),(0.3,0.4)'::box)
Buffers: shared hit=30
-> Bitmap Index Scan on uniform_new_linear_idx (cost=0.00..430.77
rows=10000 width=0) (actual time=0.695..0.695 rows=15 loops=1)
Index Cond: (point <@ '(0.301,0.401),(0.3,0.4)'::box)
Buffers: shared hit=15
Total runtime: 0.892 ms
(7 rows)

test=# create index uniform_double_sorting_idx on uniform using gist
(point);
CREATE INDEX
Time: 492796,671 ms

test=# explain (analyze, buffers) select * from uniform where point <@
box(point(0.5,0.5),point(0.501,0.501));
QUERY PLAN

-----------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on uniform (cost=445.39..25885.31 rows=10000 width=16)
(actual time=0.376..0.417
rows=8 loops=1)
Recheck Cond: (point <@ '(0.501,0.501),(0.5,0.5)'::box)
Buffers: shared hit=15
-> Bitmap Index Scan on uniform_double_sorting_idx (cost=0.00..442.89
rows=10000 width=0) (actual time=0.357..0.357 rows=8 loops=1)
Index Cond: (point <@ '(0.501,0.501),(0.5,0.5)'::box)
Buffers: shared hit=7
Total runtime: 0.490 ms
(7 rows)

test=# explain (analyze, buffers) select * from uniform where point <@
box(point(0.3,0.4),point(0.301,0.401));
QUERY PLAN

------------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on uniform (cost=445.39..25885.31 rows=10000 width=16)
(actual time=0.189..0.270
rows=15 loops=1)
Recheck Cond: (point <@ '(0.301,0.401),(0.3,0.4)'::box)
Buffers: shared hit=19
-> Bitmap Index Scan on uniform_double_sorting_idx (cost=0.00..442.89
rows=10000 width=0) (actual time=0.168..0.168 rows=15 loops=1)
Index Cond: (point <@ '(0.301,0.401),(0.3,0.4)'::box)
Buffers: shared hit=4
Total runtime: 0.358 ms
(7 rows)

test=# create index geonames_new_linear_idx on geonames using gist (point);
CREATE INDEX
Time: 279922,518 ms

test=# explain (analyze, buffers) select * from geonames where point <@
box(point(34.4671,126.631),point(34.5023,126.667));
QUERY PLAN

--------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on geonames (cost=341.98..19686.88 rows=7604 width=16)
(actual time=0.905..0.948
rows=11 loops=1)
Recheck Cond: (point <@ '(34.5023,126.667),(34.4671,126.631)'::box)
Buffers: shared hit=25
-> Bitmap Index Scan on geonames_new_linear_idx (cost=0.00..340.07
rows=7604 width=0) (actual time=0.889..0.889 rows=11 loops=1)
Index Cond: (point <@ '(34.5023,126.667),(34.4671,126.631)'::box)
Buffers: shared hit=20
Total runtime: 1.029 ms
(7 rows)

test=# explain (analyze, buffers) select * from geonames where point <@
box(point(46.1384,-104.72), point(46.2088,-104.65));
QUERY PLAN

--------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on geonames (cost=341.98..19686.88 rows=7604 width=16)
(actual time=0.644..0.776
rows=10 loops=1)
Recheck Cond: (point <@ '(46.2088,-104.65),(46.1384,-104.72)'::box)
Buffers: shared hit=13 read=6
-> Bitmap Index Scan on geonames_new_linear_idx (cost=0.00..340.07
rows=7604 width=0) (actual time=0.595..0.595 rows=10 loops=1)
Index Cond: (point <@ '(46.2088,-104.65),(46.1384,-104.72)'::box)
Buffers: shared hit=13
Total runtime: 0.857 ms
(7 rows)

test=# create index geonames_double_sorting_idx on geonames using gist
(point);
CREATE INDEX
Time: 294580,774 ms

test=# explain (analyze, buffers) select * from geonames where point <@
box(point(34.4671,126.631),point(34.5023,126.667));
QUERY PLAN

------------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on geonames (cost=346.01..19690.92 rows=7604 width=16)
(actual time=0.240..0.282
rows=11 loops=1)
Recheck Cond: (point <@ '(34.5023,126.667),(34.4671,126.631)'::box)
Buffers: shared hit=11
-> Bitmap Index Scan on geonames_double_sorting_idx (cost=0.00..344.11
rows=7604 width=0) (actual time=0.209..0.209 rows=11 loops=1)
Index Cond: (point <@ '(34.5023,126.667),(34.4671,126.631)'::box)
Buffers: shared hit=6
Total runtime: 0.372 ms
(7 rows)

test=# explain (analyze, buffers) select * from geonames where point <@
box(point(46.1384,-104.72), point(46.2088,-104.65));
QUERY PLAN

------------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on geonames (cost=346.01..19690.92 rows=7604 width=16)
(actual time=0.311..0.352
rows=10 loops=1)
Recheck Cond: (point <@ '(46.2088,-104.65),(46.1384,-104.72)'::box)
Buffers: shared hit=13
-> Bitmap Index Scan on geonames_double_sorting_idx (cost=0.00..344.11
rows=7604 width=0) (actual time=0.293..0.293 rows=10 loops=1)
Index Cond: (point <@ '(46.2088,-104.65),(46.1384,-104.72)'::box)
Buffers: shared hit=7
Total runtime: 0.429 ms
(7 rows)

------
With best regards,
Alexander Korotkov.

Attachments:

double-sorting-split-0.1.patch.gzapplication/x-gzip; name=double-sorting-split-0.1.patch.gzDownload
#2Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#1)
Re: Double sorting split patch

On 11.09.2011 22:30, Alexander Korotkov wrote:

Hackers,

I've got my patch with double sorting picksplit impementation for GiST into
more acceptable form. A little of testing is below. Index creation time is
slightly higher, but search is much faster. The testing datasets were
following:
1) uniform dataset - 10M rows
2) geonames points - 7.6M rows

I've looked at the patch, and took a brief look at the paper - but I
still don't understand the algorithm. I just can't get my head around
the concepts of split pairs and left/right groups. Can you explain that
in layman's terms? Perhaps an example would help?

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#3Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#2)
Re: Double sorting split patch

On Thu, Sep 15, 2011 at 7:27 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

I've looked at the patch, and took a brief look at the paper - but I still
don't understand the algorithm. I just can't get my head around the concepts
of split pairs and left/right groups. Can you explain that in layman's
terms? Perhaps an example would help?

In short algorithm works as following:
1) Each box can be projected to the axis as an interval. Box (x1,y1)-(x2,y2)
are projected to X axis as (x1,x2) interval and to the Y axis as (y1,y2)
interval. At the first step we search for splits of those intervals and
select the best one.
2) At the second step produced split are converting into terms of boxes
and ambiguities are solving.

Let's see a little deeper how intervals split search are performed by
considering an example. We've intervals (0,1), (1,3), (2,3), (2,4). We
assume intervals of the groups to be (0,a), (b,4). So, "a" can be some upper
bound of interval: {1,3,4}, and "b" can be some lower bound of inteval:
{0,1,2}.
We consider following splits: each "a" with greatest possible "b"
(0,1), (1,4)
(0,3), (2,4)
(0,4), (2,4)
and each "b" with least possible "a". In this example splits will be:
(0,1), (0,4)
(0,1), (1,4)
(0,3), (2,4)
By removing the duplicates we've following splits:
(0,1), (0,4)
(0,1), (1,4)
(0,3), (2,4)
(0,4), (2,4)
Proposed algorithm finds following splits by single pass on two arrays: one
sorted by lower bound of interval and another sorted by upper bound of
interval.

------
With best regards,
Alexander Korotkov.

#4Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#3)
Re: Double sorting split patch

On 15.09.2011 21:46, Alexander Korotkov wrote:

On Thu, Sep 15, 2011 at 7:27 PM, Heikki Linnakangas<
heikki.linnakangas@enterprisedb.com> wrote:

I've looked at the patch, and took a brief look at the paper - but I still
don't understand the algorithm. I just can't get my head around the concepts
of split pairs and left/right groups. Can you explain that in layman's
terms? Perhaps an example would help?

In short algorithm works as following:
1) Each box can be projected to the axis as an interval. Box (x1,y1)-(x2,y2)
are projected to X axis as (x1,x2) interval and to the Y axis as (y1,y2)
interval. At the first step we search for splits of those intervals and
select the best one.
2) At the second step produced split are converting into terms of boxes
and ambiguities are solving.

Let's see a little deeper how intervals split search are performed by
considering an example. We've intervals (0,1), (1,3), (2,3), (2,4). We
assume intervals of the groups to be (0,a), (b,4). So, "a" can be some upper
bound of interval: {1,3,4}, and "b" can be some lower bound of inteval:
{0,1,2}.
We consider following splits: each "a" with greatest possible "b"
(0,1), (1,4)
(0,3), (2,4)
(0,4), (2,4)
and each "b" with least possible "a". In this example splits will be:
(0,1), (0,4)
(0,1), (1,4)
(0,3), (2,4)
By removing the duplicates we've following splits:
(0,1), (0,4)
(0,1), (1,4)
(0,3), (2,4)
(0,4), (2,4)

Ok, thanks, I understand that now.

Proposed algorithm finds following splits by single pass on two arrays: one
sorted by lower bound of interval and another sorted by upper bound of
interval.

That looks awfully complicated. I don't understand how that works. I
wonder if two passes would be simpler?

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#5Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#4)
1 attachment(s)
Re: Double sorting split patch

On Fri, Sep 16, 2011 at 3:07 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

That looks awfully complicated. I don't understand how that works. I wonder
if two passes would be simpler?

I doubt it becomes much simpler, but here it is.

------
With best regards,
Alexander Korotkov.

Attachments:

double-sorting-split-0.2.patch.gzapplication/x-gzip; name=double-sorting-split-0.2.patch.gzDownload
#6Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#5)
Re: Double sorting split patch

On 17.09.2011 17:36, Alexander Korotkov wrote:

On Fri, Sep 16, 2011 at 3:07 PM, Heikki Linnakangas<
heikki.linnakangas@enterprisedb.com> wrote:

That looks awfully complicated. I don't understand how that works. I wonder
if two passes would be simpler?

I doubt it becomes much simpler, but here it is.

Thanks! It does look a lot simpler that way, IMHO. I presume this didn't
change the performance characteristics?

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#7Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#6)
Re: Double sorting split patch

On Sat, Sep 17, 2011 at 9:00 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

Thanks! It does look a lot simpler that way, IMHO. I presume this didn't
change the performance characteristics?

On the tests I provided in the first letter difference seems to be
insignificant.

------
With best regards,
Alexander Korotkov.

#8Peter Geoghegan
peter@2ndquadrant.com
In reply to: Alexander Korotkov (#3)
Re: Double sorting split patch

On 15 September 2011 19:46, Alexander Korotkov <aekorotkov@gmail.com> wrote:

Proposed algorithm finds following splits by single pass on two arrays: one
sorted by lower bound of interval and another sorted by upper bound of
interval.

Have you considered if further performance improvements could come
from providing a qsort implementation that allows for the inlining of
the comparator? I find it curious that we go to the trouble of
providing a custom qsort implementation in qsort.c, pg_qsort, but
haven't gone one step further and considered inlining effects. Here's
a macro-based inlining implementation:

http://www.corpit.ru/mjt/qsort.html

I wondered in passing if compiler vendors had got around to figuring
out a way of solving the "inlining function pointer" problem that I
first read about years ago, so I ran this code, which benchmarks the
macro-based qsort above:

http://www.lemoda.net/c/inline-qsort-example/index.html

It's actually C++, but that isn't significant (c stdlib qsort() is
called). When I built this code with GCC 4.5, the results were:

C quick-sort time elapsed: 3.24
Inline C quick-sort time elapsed: 2.01

It looks like this is something that could be revisited.

--
Peter Geoghegan       http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training and Services

#9Greg Stark
stark@mit.edu
In reply to: Peter Geoghegan (#8)
Re: Double sorting split patch

On Sat, Sep 17, 2011 at 9:09 PM, Peter Geoghegan <peter@2ndquadrant.com> wrote:

I find it curious that we go to the trouble of
providing a custom qsort implementation in qsort.c, pg_qsort, but
haven't gone one step further and considered inlining effects.

I think we provided the qsort implementation for the benefit of
platforms that didn't have a decent one and then ended up switching to
use it always for some reason I don't recall. It wasn't because we
thought we were better at writing qsort implementations than OS
vendors.

I wondered in passing if compiler vendors had got around to figuring
out a way of solving the "inlining function pointer" problem that I
first read about years ago, so I ran this code, which benchmarks the
macro-based qsort above:

You might need -fipa-pta or some other option. Or maybe LLVM would
have a better chance of pulling this off. Compilers are usually pretty
loath to generate specializations for every call site for fear of
bloating the code.

In any case I don't see how you're going to inline random database
functions. Those are the cases where large amounts of data are being
sorted. It's possible we sort small sets of data for internal reasons
very frequently but I don't recall it turning up at the top of the
profiles posted in recent days.

Now a JIT that actually inlined random database functions into qsort
and optimized the result would be pretty cool. But it doesn't seem
like it's going to happen tomorrow...

--
greg

#10Peter Geoghegan
peter@2ndquadrant.com
In reply to: Greg Stark (#9)
Re: Double sorting split patch

On 18 September 2011 01:51, Greg Stark <stark@mit.edu> wrote:

I think we provided the qsort implementation for the benefit of
platforms that didn't have a decent one and then ended up switching to
use it always for some reason I don't recall.  It wasn't because we
thought we were better at writing qsort implementations than OS
vendors.

The comment:

* We have modified their original by adding a check for already-sorted input,
* which seems to be a win per discussions on pgsql-hackers around 2006-03-21.

sort of suggests that it *was* at least in part because we thought we
could do a better job, but that isn't a significant detail.

I wondered in passing if compiler vendors had got around to figuring
out a way of solving the "inlining function pointer" problem that I
first read about years ago, so I ran this code, which benchmarks the
macro-based qsort above:

You might need -fipa-pta or some other option. Or maybe LLVM would
have a better chance of pulling this off. Compilers are usually pretty
loath to generate specializations for every call site for fear of
bloating the code.

Compilers do a fairly good job of generating the specialisations
appropriately, which leads to the observation that the inline keyword
is kind of at the wrong level of granularity, as individual call sites
are inlined, not functions.

I built this program with a recent build of clang from SVN (left over
from my work on Clang a few weeks back, as it happens) on a more
powerful machine, and the difference narrowed:

C quick-sort time elapsed: 1.89
Inline C quick-sort time elapsed: 1.54

This still seems significant though.

In any case I don't see how you're going to inline random database
functions. Those are the cases where large amounts of data are being
sorted. It's possible we sort small sets of data for internal reasons
very frequently but I don't recall it turning up at the top of the
profiles posted in recent days.

Well, this point was raised in relation to the OP's patch, which does
have a couple of simple comparators that look like good candidates for
inlining. There are other really simple comparators around like that -
one that I'm aware of off-hand is the one used to sort
pg_stat_statements entries to find victims. It doesn't have to have
major benefits to be worth undertaking - it only has to be worth the
effort of its original implementation and ongoing maintenance.

Now a JIT that actually inlined random database functions into qsort
and optimized the result would be pretty cool. But it doesn't seem
like it's going to happen tomorrow...

Yeah, that would be extremely cool. I'd guess that the main reason it
isn't going to happen tomorrow is not so much technical, but because
there would be about a dozen controversies involved in integrating an
available, suitable JIT - how does it integrate with the build system,
licensing, choice of implementation language, support on less popular
platforms, how do package managers handle the dependency, can we trust
the developers/steering committee of the JIT (Okay, so I'm really
thinking about LLVM here), and so on. That's just off the top of my
head.

--
Peter Geoghegan       http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training and Services

#11Alexander Korotkov
aekorotkov@gmail.com
In reply to: Alexander Korotkov (#7)
1 attachment(s)
Re: Double sorting split patch

I've processed the results of the tests with double sorting split which I've
sheduled for buffering build. I've updated wiki page with them:
http://wiki.postgresql.org/wiki/Fast_GiST_index_build_GSoC_2011#Testing_results
Raw results of query speed measues are in the attachment. There number of
pages accesse is presented in dependency of table, buffering build and split
method.
Note, that tests were run with not last version of fast build patch
("gist_fast_build-heikki-0.14.1.1.patch" was used). Therefore, build time
with buffering can be better.

------
With best regards,
Alexander Korotkov.

Attachments:

search_time.txttext/plain; charset=US-ASCII; name=search_time.txtDownload
#12Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#11)
Re: Double sorting split patch

! /*
! * Calculate delta between penalties of join "common entries" to
! * different groups.
! */
! for (i = 0; i < commonEntriesCount; i++)
{
! double lower,
! upper;
!
! box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! if (context.dim == 0)
! {
! lower = box->low.x;
! upper = box->high.x;
! }
! else
! {
! lower = box->low.y;
! upper = box->high.y;
! }
! commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
! box_penalty(rightBox, box));
}

'lower' and 'upper' are not used for anything in the above. Is that just
dead code that can be removed, or is there something missing that should
be using them?

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#13Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#12)
Re: Double sorting split patch

On Thu, Sep 22, 2011 at 3:22 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

! /*

! * Calculate delta between penalties of join "common
entries" to
! * different groups.
! */
! for (i = 0; i < commonEntriesCount; i++)
{
! double lower,
! upper;
!
! box = DatumGetBoxP(entryvec->vector[**
commonEntries[i].index].key);
! if (context.dim == 0)
! {
! lower = box->low.x;
! upper = box->high.x;
! }
! else
! {
! lower = box->low.y;
! upper = box->high.y;
! }
! commonEntries[i].delta = Abs(box_penalty(leftBox,
box) -
!
box_penalty(rightBox, box));
}

'lower' and 'upper' are not used for anything in the above. Is that just
dead code that can be removed, or is there something missing that should be
using them?

Yes, it's just dead code.

------
With best regards,
Alexander Korotkov.

#14Alexander Korotkov
aekorotkov@gmail.com
In reply to: Alexander Korotkov (#13)
1 attachment(s)
Re: Double sorting split patch

On Thu, Sep 22, 2011 at 3:31 PM, Alexander Korotkov <aekorotkov@gmail.com>wrote:

On Thu, Sep 22, 2011 at 3:22 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

'lower' and 'upper' are not used for anything in the above. Is that just

dead code that can be removed, or is there something missing that should be
using them?

Yes, it's just dead code.

Patch without that dead code is attached.

------
With best regards,
Alexander Korotkov.

Attachments:

double-sorting-split-0.3.patch.gzapplication/x-gzip; name=double-sorting-split-0.3.patch.gzDownload
#15Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#14)
Re: Double sorting split patch

On 22.09.2011 22:12, Alexander Korotkov wrote:

Patch without that dead code is attached.

Thanks.

Can you elaborate the consider-split algorithm? The criteria to select
the new split over the previously selected one is this:

! /*
! * If ratio is acceptable, we should compare current split with
! * previously selected one. If no split was selected then we select
! * current anyway. Between splits of one dimension we search for
! * minimal overlap (allowing negative values) and minimal ration
! * (between same overlaps. We switch dimension if find less overlap
! * (non-negative) or less range with same overlap.
! */
! range = diminfo->upper - diminfo->lower;
! overlap = ((leftUpper) - (rightLower)) / range;
! if (context->first ||
! (context->dim == dimNum &&
! (overlap < context->overlap ||
! (overlap == context->overlap && ratio > context->ratio))) ||
! (context->dim != dimNum &&
! ((range > context->range &&
! non_negative(overlap) <= non_negative(context->overlap)) ||
! non_negative(overlap) < non_negative(context->overlap)))
! )
! {

Why are negative overlaps handled differently across dimensions and
within the same dimension? Your considerSplit algorithm in the SYRCoSE
2011 paper doesn't seem to make that distinction.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#16Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#15)
Re: Double sorting split patch

On Tue, Oct 4, 2011 at 12:12 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

Can you elaborate the consider-split algorithm? The criteria to select the
new split over the previously selected one is this:

! /*
! * If ratio is acceptable, we should compare current split
with
! * previously selected one. If no split was selected then
we select
! * current anyway. Between splits of one dimension we
search for
! * minimal overlap (allowing negative values) and minimal
ration
! * (between same overlaps. We switch dimension if find
less overlap
! * (non-negative) or less range with same overlap.
! */
! range = diminfo->upper - diminfo->lower;
! overlap = ((leftUpper) - (rightLower)) / range;
! if (context->first ||
! (context->dim == dimNum &&
! (overlap < context->overlap ||
! (overlap == context->overlap && ratio >
context->ratio))) ||
! (context->dim != dimNum &&
! ((range > context->range &&
! non_negative(overlap) <=
non_negative(context->overlap)**) ||
! non_negative(overlap) <
non_negative(context->overlap)**))
! )
! {

Why are negative overlaps handled differently across dimensions and within
the same dimension? Your considerSplit algorithm in the SYRCoSE 2011 paper
doesn't seem to make that distinction.

Yes, I've changed this behaviour after experiments on real-life datasets. On
the datasets where data don't overlap themselfs (points are always such
datasets), non-overlapping splits are frequently possible. In this case
splits tends to be along one dimension, because most distant non-overlapping
splits (i.e. having lowest negative overlap) appears to be in the same
dimension as previous split. Therefore MBRs appear to be very prolonged
along another dimension, that's bad for search. In order to prevent this
behavour I've made transition to another dimension by non-nagative part of
overlap and range. Using range as the split criteria makes MBRs more
quadratic, and using non-negative part of overlap as priority criteria give
to range chance to matter.

------
With best regards,
Alexander Korotkov.

#17Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#16)
1 attachment(s)
Re: Double sorting split patch

On 04.10.2011 11:51, Alexander Korotkov wrote:

On Tue, Oct 4, 2011 at 12:12 PM, Heikki Linnakangas<
heikki.linnakangas@enterprisedb.com> wrote:

Can you elaborate the consider-split algorithm? The criteria to select the
new split over the previously selected one is this:

! /*
! * If ratio is acceptable, we should compare current split
with
! * previously selected one. If no split was selected then
we select
! * current anyway. Between splits of one dimension we
search for
! * minimal overlap (allowing negative values) and minimal
ration
! * (between same overlaps. We switch dimension if find
less overlap
! * (non-negative) or less range with same overlap.
! */
! range = diminfo->upper - diminfo->lower;
! overlap = ((leftUpper) - (rightLower)) / range;
! if (context->first ||
! (context->dim == dimNum&&
! (overlap< context->overlap ||
! (overlap == context->overlap&& ratio>
context->ratio))) ||
! (context->dim != dimNum&&
! ((range> context->range&&
! non_negative(overlap)<=
non_negative(context->overlap)**) ||
! non_negative(overlap)<
non_negative(context->overlap)**))
! )
! {

Why are negative overlaps handled differently across dimensions and within
the same dimension? Your considerSplit algorithm in the SYRCoSE 2011 paper
doesn't seem to make that distinction.

Yes, I've changed this behaviour after experiments on real-life datasets. On
the datasets where data don't overlap themselfs (points are always such
datasets), non-overlapping splits are frequently possible. In this case
splits tends to be along one dimension, because most distant non-overlapping
splits (i.e. having lowest negative overlap) appears to be in the same
dimension as previous split. Therefore MBRs appear to be very prolonged
along another dimension, that's bad for search. In order to prevent this
behavour I've made transition to another dimension by non-nagative part of
overlap and range. Using range as the split criteria makes MBRs more
quadratic, and using non-negative part of overlap as priority criteria give
to range chance to matter.

Ok. Could you phrase that as a code comment?

Here's a version of the patch I've been working on. There's no
functional changes, just a lot of moving things around, comment changes,
etc. to hopefully make it more readable.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

Attachments:

double-sorting-split-0.4-heikki.patchtext/x-diff; name=double-sorting-split-0.4-heikki.patchDownload
diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c
index 43c4b12..af1a42d 100644
--- a/src/backend/access/gist/gistproc.c
+++ b/src/backend/access/gist/gistproc.c
@@ -26,6 +26,10 @@ static bool gist_box_leaf_consistent(BOX *key, BOX *query,
 static double size_box(Datum dbox);
 static bool rtree_internal_consistent(BOX *key, BOX *query,
 						  StrategyNumber strategy);
+static BOX *empty_box(void);
+
+/* Minimal possible ratio of split */
+#define LIMIT_RATIO 0.3
 
 
 /**************************************************
@@ -49,30 +53,6 @@ rt_box_union(PG_FUNCTION_ARGS)
 	PG_RETURN_BOX_P(n);
 }
 
-static Datum
-rt_box_inter(PG_FUNCTION_ARGS)
-{
-	BOX		   *a = PG_GETARG_BOX_P(0);
-	BOX		   *b = PG_GETARG_BOX_P(1);
-	BOX		   *n;
-
-	n = (BOX *) palloc(sizeof(BOX));
-
-	n->high.x = Min(a->high.x, b->high.x);
-	n->high.y = Min(a->high.y, b->high.y);
-	n->low.x = Max(a->low.x, b->low.x);
-	n->low.y = Max(a->low.y, b->low.y);
-
-	if (n->high.x < n->low.x || n->high.y < n->low.y)
-	{
-		pfree(n);
-		/* Indicate "no intersection" by returning NULL pointer */
-		n = NULL;
-	}
-
-	PG_RETURN_BOX_P(n);
-}
-
 /*
  * The GiST Consistent method for boxes
  *
@@ -194,86 +174,6 @@ gist_box_penalty(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(result);
 }
 
-static void
-chooseLR(GIST_SPLITVEC *v,
-		 OffsetNumber *list1, int nlist1, BOX *union1,
-		 OffsetNumber *list2, int nlist2, BOX *union2)
-{
-	bool		firstToLeft = true;
-
-	if (v->spl_ldatum_exists || v->spl_rdatum_exists)
-	{
-		if (v->spl_ldatum_exists && v->spl_rdatum_exists)
-		{
-			BOX			LRl = *union1,
-						LRr = *union2;
-			BOX			RLl = *union2,
-						RLr = *union1;
-			double		sizeLR,
-						sizeRL;
-
-			adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
-			adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
-			adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
-			adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
-
-			sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
-			sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
-
-			if (sizeLR > sizeRL)
-				firstToLeft = false;
-
-		}
-		else
-		{
-			float		p1,
-						p2;
-			GISTENTRY	oldUnion,
-						addon;
-
-			gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
-						  NULL, NULL, InvalidOffsetNumber, FALSE);
-
-			gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
-			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
-			gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
-			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
-
-			if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
-				firstToLeft = false;
-		}
-	}
-
-	if (firstToLeft)
-	{
-		v->spl_left = list1;
-		v->spl_right = list2;
-		v->spl_nleft = nlist1;
-		v->spl_nright = nlist2;
-		if (v->spl_ldatum_exists)
-			adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
-		v->spl_ldatum = BoxPGetDatum(union1);
-		if (v->spl_rdatum_exists)
-			adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
-		v->spl_rdatum = BoxPGetDatum(union2);
-	}
-	else
-	{
-		v->spl_left = list2;
-		v->spl_right = list1;
-		v->spl_nleft = nlist2;
-		v->spl_nright = nlist1;
-		if (v->spl_ldatum_exists)
-			adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
-		v->spl_ldatum = BoxPGetDatum(union2);
-		if (v->spl_rdatum_exists)
-			adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
-		v->spl_rdatum = BoxPGetDatum(union1);
-	}
-
-	v->spl_ldatum_exists = v->spl_rdatum_exists = false;
-}
-
 /*
  * Trivial split: half of entries will be placed on one page
  * and another half - to another
@@ -338,199 +238,576 @@ fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
 }
 
 /*
- * The GiST PickSplit method
+ * Represents information about entry which can be present to any group without
+ * affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+	/* Index of entry in the initial array */
+	int			index;
+	/* Delta between penalties of entry insertion into different groups */
+	double		delta;
+}	CommonEntry;
+
+/*
+ * Context for g_box_consider_split. Contains information about currently
+ * selected split and some general information.
+ */
+typedef struct
+{
+	/* number of splitting entries */
+	int			entriesCount;
+	/* minimum bounding box across all entries */
+	BOX			boundingBox;
+
+	/* Information about currently selected split follows */
+
+	bool		first;	/* true if no split was selected yet */
+
+	double		leftUpper;		/* upper bound of left interval */
+	double		rightLower;		/* lower bound of right interval */
+
+	float4		ratio;
+	float4		overlap;
+	int			dim;			/* axis of this split */
+	double		range;			/* width of general MBR projection to the selected axis */
+}	ConsiderSplitContext;
+
+/*
+ * Interval represents projection of box to axis.
+ */
+typedef struct
+{
+	double		lower,
+				upper;
+}	SplitInterval;
+
+/*
+ * Interval comparison function by lower bound of the interval;
+ */
+static int
+interval_cmp_lower(const void *i1, const void *i2)
+{
+	double		lower1 = ((SplitInterval *) i1)->lower,
+				lower2 = ((SplitInterval *) i2)->lower;
+
+	if (lower1 < lower2)
+		return -1;
+	else if (lower1 > lower2)
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * Interval comparison function by upper bound of the interval;
+ */
+static int
+interval_cmp_upper(const void *i1, const void *i2)
+{
+	double		upper1 = ((SplitInterval *) i1)->upper,
+				upper2 = ((SplitInterval *) i2)->upper;
+
+	if (upper1 < upper2)
+		return -1;
+	else if (upper1 > upper2)
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * Replace negative value with zero.
+ */
+static inline float
+non_negative(float val)
+{
+	if (val >= 0.0f)
+		return val;
+	else
+		return 0.0f;
+}
+
+/*
+ * Consider replacement of currently selected split with the better one.
+ */
+static void inline
+g_box_consider_split(ConsiderSplitContext *context, int dimNum,
+					 double rightLower, int minLeftCount,
+					 double leftUpper, int maxLeftCount)
+{
+	int			leftCount,
+				rightCount;
+	float4		ratio,
+				overlap;
+	double		range;
+
+	/*
+	 * Calculate entries distribution ration assuming most uniform
+	 * distribution of common entries.
+	 */
+	if (minLeftCount >= (context->entriesCount + 1) / 2)
+	{
+		leftCount = minLeftCount;
+	}
+	else
+	{
+		if (maxLeftCount <= context->entriesCount / 2)
+			leftCount = maxLeftCount;
+		else
+			leftCount = context->entriesCount / 2;
+	}
+	rightCount = context->entriesCount - leftCount;
+
+	/*
+	 * Ratio of split - quotient between size of lesser group and total entries
+	 * count.
+	 */
+	ratio = ((float4) Min(leftCount, rightCount)) /
+		((float4) context->entriesCount);
+
+	if (ratio > LIMIT_RATIO)
+	{
+		bool selectthis = false;
+
+		/*
+		 * The ratio is acceptable, so compare current split with previously
+		 * selected one. Between splits of one dimension we search for
+		 * minimal overlap (allowing negative values) and minimal ration
+		 * (between same overlaps. We switch dimension if find less overlap
+		 * (non-negative) or less range with same overlap.
+		 */
+		if (dimNum == 0)
+			range = context->boundingBox.high.x - context->boundingBox.low.x;
+		else
+			range = context->boundingBox.high.y - context->boundingBox.low.y;
+
+		overlap = (leftUpper - rightLower) / range;
+
+		/* If there is no previous selection, select this */
+		if (context->first)
+			selectthis = true;
+		else if (context->dim == dimNum)
+		{
+			/*
+			 * Within the same dimension, choose the new split if it has a
+			 * smaller overlap, or same overlap but better ratio.
+			 */
+			if (overlap < context->overlap ||
+				(overlap == context->overlap && ratio > context->ratio))
+				selectthis = true;
+		}
+		else
+		{
+			/*
+			 * Across dimensions, choose the new split if it has a
+			 * smaller *non-negative* overlap, or same overlap but bigger
+			 * range.
+			 */
+			if (non_negative(overlap) < non_negative(context->overlap) ||
+				(range > context->range &&
+				 non_negative(overlap) <= non_negative(context->overlap)))
+				selectthis = true;
+		}
+
+		if (selectthis)
+		{
+			/* save information about selected split */
+			context->first = false;
+			context->ratio = ratio;
+			context->range = range;
+			context->overlap = overlap;
+			context->rightLower = rightLower;
+			context->leftUpper = leftUpper;
+			context->dim = dimNum;
+		}
+	}
+}
+
+/*
+ * Create new empty BOX
+ */
+static BOX *
+empty_box(void)
+{
+	BOX		   *result;
+
+	result = (BOX *) palloc(sizeof(BOX));
+	result->low.x = 0.0f;
+	result->low.y = 0.0f;
+	result->high.x = 0.0f;
+	result->high.y = 0.0f;
+	return result;
+}
+
+/*
+ * Return increase of original BOX area by new BOX area insertion.
+ */
+static double
+box_penalty(BOX *original, BOX *new)
+{
+	double		union_width,
+				union_height;
+
+	union_width = Max(original->high.x, new->high.x) -
+		Min(original->low.x, new->low.x);
+	union_height = Max(original->high.y, new->high.y) -
+		Min(original->low.y, new->low.y);
+	return union_width * union_height - (original->high.x - original->low.x) *
+		(original->high.y - original->low.y);
+}
+
+/*
+ * Compare common entries by their deltas.
+ */
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+	double		delta1 = ((CommonEntry *) i1)->delta,
+				delta2 = ((CommonEntry *) i2)->delta;
+
+	if (delta1 < delta2)
+		return -1;
+	else if (delta1 > delta2)
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * --------------------------------------------------------------------------
+ * Double sorting split algorithm. Finds split of boxes by considering splits
+ * along each axis. Searching splits along axes are based on the notions of
+ * split pair and corner split pair.
  *
- * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
- * C.H.Ang and T.C.Tan
+ * Split pair is a pair <a, b> where 'a' is upper bound of left group and 'b'
+ * is lower bound of right group. Lower bound or left group and upper bound of
+ * right group are assumed to be the general bounds along axis.
  *
- * This is used for both boxes and points.
+ * Corner split pair is a split pair <a, b> where 'a' is upper bound of some
+ * interval, 'b' is lower bound of some, 'a' can't be lower or 'b' can't be
+ * higher with split pair property preservation.
+ *
+ * Split along axis divides entries to the following groups:
+ * 1) Entries which should be placed to the left group
+ * 2) Entries which should be placed to the right group
+ * 3) "Common entries" which can be placed to any of groups without affecting
+ *	  of overlap along selected axis.
+ *
+ * Split along axis is selected by overlap along that axis and some other
+ * criteria (see g_box_consider_split). When split along axis is selected,
+ * "common entries" are distributed by penalty with group boxes.
+ *
+ * For details see:
+ * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
+ * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
+ * --------------------------------------------------------------------------
  */
 Datum
 gist_box_picksplit(PG_FUNCTION_ARGS)
 {
 	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
 	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
-	OffsetNumber i;
-	OffsetNumber *listL,
-			   *listR,
-			   *listB,
-			   *listT;
-	BOX		   *unionL,
-			   *unionR,
-			   *unionB,
-			   *unionT;
-	int			posL,
-				posR,
-				posB,
-				posT;
-	BOX			pageunion;
-	BOX		   *cur;
-	char		direction = ' ';
-	bool		allisequal = true;
-	OffsetNumber maxoff;
-	int			nbytes;
+	OffsetNumber i,
+				maxoff;
+	ConsiderSplitContext context;
+	BOX		   *box,
+			   *leftBox,
+			   *rightBox;
+	int			dim,
+				commonEntriesCount,
+				nbytes;
+	SplitInterval *intervalsLower,
+			   *intervalsUpper;
+	CommonEntry *commonEntries;
+	int			nentries;
+
+#define PLACE_LEFT(box, off)					\
+	do {										\
+		if (v->spl_nleft > 0)					\
+			adjustBox(leftBox, box);			\
+		else									\
+			*leftBox = *(box);					\
+		v->spl_left[v->spl_nleft++] = off;		\
+	} while(0)
+
+#define PLACE_RIGHT(box, off)					\
+	do {										\
+		if (v->spl_nright > 0)					\
+			adjustBox(rightBox, box);			\
+		else									\
+			*rightBox = *(box);					\
+		v->spl_right[v->spl_nright++] = off;	\
+	} while(0)
+
+	memset(&context, 0, sizeof(ConsiderSplitContext));
 
-	posL = posR = posB = posT = 0;
 	maxoff = entryvec->n - 1;
+	nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
 
-	cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
-	memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
+	/* Allocate arrays for intervals along axes */
+	intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
+	intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
 
-	/* find MBR */
-	for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
+	/*
+	 * Calculate the overall minimum bounding box over all the entries.
+	 */
+	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 	{
-		cur = DatumGetBoxP(entryvec->vector[i].key);
-		if (allisequal && (
-						   pageunion.high.x != cur->high.x ||
-						   pageunion.high.y != cur->high.y ||
-						   pageunion.low.x != cur->low.x ||
-						   pageunion.low.y != cur->low.y
-						   ))
-			allisequal = false;
-
-		adjustBox(&pageunion, cur);
+		box = DatumGetBoxP(entryvec->vector[i].key);
+		if (i == FirstOffsetNumber)
+			context.boundingBox = *box;
+		else
+			adjustBox(&context.boundingBox, box);
 	}
 
-	if (allisequal)
+	/*
+	 * Iterate over axes for optimal split searching.
+	 */
+	context.first = true;		/* nothing selected yet */
+	for (dim = 0; dim < 2; dim++)
 	{
+		double		leftUpper,
+					rightLower;
+		int			i1,
+					i2;
+
+		/* Prepare array of intervals along axis */
+		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+		{
+			box = DatumGetBoxP(entryvec->vector[i].key);
+			if (dim == 0)
+			{
+				intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+				intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+			}
+			else
+			{
+				intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+				intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+			}
+		}
+
 		/*
-		 * All entries are the same
+		 * Make two arrays of intervals: sorted by lower bound and sorted by
+		 * upper bound.
 		 */
+		memcpy(intervalsUpper, intervalsLower,
+			   sizeof(SplitInterval) * nentries);
+		qsort(intervalsLower, nentries, sizeof(SplitInterval),
+			  interval_cmp_lower);
+		qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+			  interval_cmp_upper);
+
+		/*
+		 * Iterate over lower bound of right group finding corresponding
+		 * lowest upper bound of left group.
+		 */
+		i1 = 0;
+		i2 = 0;
+		rightLower = intervalsLower[i1].lower;
+		leftUpper = intervalsUpper[i2].lower;
+
+		while (true)
+		{
+			/*
+			 * Find next lower bound of right group.
+			 */
+			while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+			{
+				leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+				i1++;
+			}
+			if (i1 >= nentries)
+				break;
+			rightLower = intervalsLower[i1].lower;
+
+			/*
+			 * Find count of intervals which anyway should be placed to the
+			 * left group.
+			 */
+			while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+				i2++;
+
+			/*
+			 * Consider found split.
+			 */
+			g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+		}
+
+		/*
+		 * Iterate over upper bound of left group finding corresponding
+		 * highest lower bound of right group.
+		 */
+		i1 = nentries - 1;
+		i2 = nentries - 1;
+		rightLower = intervalsLower[i1].upper;
+		leftUpper = intervalsUpper[i2].upper;
+
+		while (true)
+		{
+			/*
+			 * Find next upper bound of left group.
+			 */
+			while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+			{
+				rightLower = Min(rightLower, intervalsUpper[i2].lower);
+				i2--;
+			}
+			if (i2 < 0)
+				break;
+			leftUpper = intervalsUpper[i2].upper;
+
+			/*
+			 * Find count of intervals which anyway should be placed to the
+			 * right group.
+			 */
+			while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+				i1--;
+
+			/*
+			 * Consider found split.
+			 */
+			g_box_consider_split(&context, dim,
+								 rightLower, i1 + 1, leftUpper, i2 + 1);
+		}
+	}
+
+	/*
+	 * If we failed to find any acceptable splits, use trivial split.
+	 */
+	if (context.first)
+	{
 		fallbackSplit(entryvec, v);
 		PG_RETURN_POINTER(v);
 	}
 
+	/* Allocate vectors for results */
 	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
-	listL = (OffsetNumber *) palloc(nbytes);
-	listR = (OffsetNumber *) palloc(nbytes);
-	listB = (OffsetNumber *) palloc(nbytes);
-	listT = (OffsetNumber *) palloc(nbytes);
-	unionL = (BOX *) palloc(sizeof(BOX));
-	unionR = (BOX *) palloc(sizeof(BOX));
-	unionB = (BOX *) palloc(sizeof(BOX));
-	unionT = (BOX *) palloc(sizeof(BOX));
-
-#define ADDLIST( list, unionD, pos, num ) do { \
-	if ( pos ) { \
-		if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x	= cur->high.x; \
-		if ( (unionD)->low.x  > cur->low.x	) (unionD)->low.x	= cur->low.x; \
-		if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y	= cur->high.y; \
-		if ( (unionD)->low.y  > cur->low.y	) (unionD)->low.y	= cur->low.y; \
-	} else { \
-			memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) );	\
-	} \
-	(list)[pos] = num; \
-	(pos)++; \
-} while(0)
+	v->spl_left = (OffsetNumber *) palloc(nbytes);
+	v->spl_right = (OffsetNumber *) palloc(nbytes);
+	v->spl_nleft = 0;
+	v->spl_nright = 0;
 
-	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-	{
-		cur = DatumGetBoxP(entryvec->vector[i].key);
-		if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
-			ADDLIST(listL, unionL, posL, i);
-		else
-			ADDLIST(listR, unionR, posR, i);
-		if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
-			ADDLIST(listB, unionB, posB, i);
-		else
-			ADDLIST(listT, unionT, posT, i);
-	}
+	/* Allocate boxes of left and right groups */
+	leftBox = empty_box();
+	rightBox = empty_box();
 
-#define LIMIT_RATIO 0.1
-#define _IS_BADRATIO(x,y)	( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
-#define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
-	/* bad disposition, try to split by centers of boxes  */
-	if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+	/*
+	 * Allocate an array for "Common entries" - entries which can be placed
+	 * to either group without affecting overlap along selected axis.
+	 */
+	commonEntriesCount = 0;
+	commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+	/*
+	 * Distribute entries which can be distributed unambiguously, and collect
+	 * "common entries".
+	 */
+	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 	{
-		double		avgCenterX = 0.0,
-					avgCenterY = 0.0;
-		double		CenterX,
-					CenterY;
+		double		lower,
+					upper;
 
-		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+		/*
+		 * Get upper and lower bounds along selected axis.
+		 */
+		box = DatumGetBoxP(entryvec->vector[i].key);
+		if (context.dim == 0)
 		{
-			cur = DatumGetBoxP(entryvec->vector[i].key);
-			avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
-			avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
+			lower = box->low.x;
+			upper = box->high.x;
 		}
-
-		avgCenterX /= maxoff;
-		avgCenterY /= maxoff;
-
-		posL = posR = posB = posT = 0;
-		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+		else
 		{
-			cur = DatumGetBoxP(entryvec->vector[i].key);
-
-			CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
-			CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
+			lower = box->low.y;
+			upper = box->high.y;
+		}
 
-			if (CenterX < avgCenterX)
-				ADDLIST(listL, unionL, posL, i);
-			else if (CenterX == avgCenterX)
+		if (upper <= context.leftUpper)
+		{
+			/* Fits to the left group */
+			if (lower >= context.rightLower)
 			{
-				if (posL > posR)
-					ADDLIST(listR, unionR, posR, i);
-				else
-					ADDLIST(listL, unionL, posL, i);
+				/* Fits also to the right group, so "common entry" */
+				commonEntries[commonEntriesCount++].index = i;
 			}
 			else
-				ADDLIST(listR, unionR, posR, i);
-
-			if (CenterY < avgCenterY)
-				ADDLIST(listB, unionB, posB, i);
-			else if (CenterY == avgCenterY)
 			{
-				if (posB > posT)
-					ADDLIST(listT, unionT, posT, i);
-				else
-					ADDLIST(listB, unionB, posB, i);
+				/* Doesn't fit to the right group, so join to the left group */
+				PLACE_LEFT(box, i);
 			}
-			else
-				ADDLIST(listT, unionT, posT, i);
 		}
-
-		if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+		else
 		{
-			fallbackSplit(entryvec, v);
-			PG_RETURN_POINTER(v);
+			/*
+			 * Each entry should fit on either left or right group, so since
+			 * this didn't fit on the left group, it better fit in the right
+			 * group.
+			 */
+			Assert(lower >= context.rightLower);
+
+			/* Doesn't fit to the left group, so join to the right group */
+			PLACE_RIGHT(box, i);
 		}
 	}
 
-	/* which split more optimal? */
-	if (Max(posL, posR) < Max(posB, posT))
-		direction = 'x';
-	else if (Max(posL, posR) > Max(posB, posT))
-		direction = 'y';
-	else
+	/*
+	 * Distribute "common entries", if any
+	 */
+	if (commonEntriesCount > 0)
 	{
-		Datum		interLR = DirectFunctionCall2(rt_box_inter,
-												  BoxPGetDatum(unionL),
-												  BoxPGetDatum(unionR));
-		Datum		interBT = DirectFunctionCall2(rt_box_inter,
-												  BoxPGetDatum(unionB),
-												  BoxPGetDatum(unionT));
-		double		sizeLR,
-					sizeBT;
-
-		sizeLR = size_box(interLR);
-		sizeBT = size_box(interBT);
-
-		if (sizeLR < sizeBT)
-			direction = 'x';
-		else
-			direction = 'y';
-	}
+		/*
+		 * Calculate minimum number of entries that must be placed in both
+		 * groups, to reach LIMIT_RATIO.
+		 */
+		int		m = ceil(LIMIT_RATIO * (double) nentries);
 
-	if (direction == 'x')
-		chooseLR(v,
-				 listL, posL, unionL,
-				 listR, posR, unionR);
-	else
-		chooseLR(v,
-				 listB, posB, unionB,
-				 listT, posT, unionT);
+		/*
+		 * Calculate delta between penalties of join "common entries" to
+		 * different groups.
+		 */
+		for (i = 0; i < commonEntriesCount; i++)
+		{
+			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+			commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
+										 box_penalty(rightBox, box));
+		}
+
+		/*
+		 * Sort "common entries" by calculated deltas in order to distribute
+		 * the most ambiguous entries first.
+		 */
+		qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
+
+		/*
+		 * Distribute "common entries" between groups.
+		 */
+		for (i = 0; i < commonEntriesCount; i++)
+		{
+			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+
+			/*
+			 * Check if we have to place this entry in either group to achieve 
+			 * LIMIT_RATIO.
+			 */
+			if (v->spl_nleft + (commonEntriesCount - i) <= m)
+				PLACE_LEFT(box, commonEntries[i].index);
+			else if (v->spl_nright + (commonEntriesCount - i) <= m)
+				PLACE_RIGHT(box, commonEntries[i].index);
+			else
+			{
+				/* Otherwise select the group by minimal penalty */
+				if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
+					PLACE_LEFT(box, commonEntries[i].index);
+				else
+					PLACE_RIGHT(box, commonEntries[i].index);
+			}
+		}
+	}
 
+	v->spl_ldatum = PointerGetDatum(leftBox);
+	v->spl_rdatum = PointerGetDatum(rightBox);
 	PG_RETURN_POINTER(v);
 }
 
#18Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#17)
1 attachment(s)
Re: Double sorting split patch

On Tue, Oct 4, 2011 at 1:46 PM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

Ok. Could you phrase that as a code comment?

Here's a version of the patch I've been working on. There's no functional
changes, just a lot of moving things around, comment changes, etc. to
hopefully make it more readable.

Thanks for your work on this patch. Patch with comment is attached.

------
With best regards,
Alexander Korotkov.

Attachments:

double-sorting-split-0.5.patchtext/x-patch; charset=US-ASCII; name=double-sorting-split-0.5.patchDownload
*** a/src/backend/access/gist/gistproc.c
--- b/src/backend/access/gist/gistproc.c
***************
*** 26,31 **** static bool gist_box_leaf_consistent(BOX *key, BOX *query,
--- 26,35 ----
  static double size_box(Datum dbox);
  static bool rtree_internal_consistent(BOX *key, BOX *query,
  						  StrategyNumber strategy);
+ static BOX *empty_box(void);
+ 
+ /* Minimal possible ratio of split */
+ #define LIMIT_RATIO 0.3
  
  
  /**************************************************
***************
*** 49,78 **** rt_box_union(PG_FUNCTION_ARGS)
  	PG_RETURN_BOX_P(n);
  }
  
- static Datum
- rt_box_inter(PG_FUNCTION_ARGS)
- {
- 	BOX		   *a = PG_GETARG_BOX_P(0);
- 	BOX		   *b = PG_GETARG_BOX_P(1);
- 	BOX		   *n;
- 
- 	n = (BOX *) palloc(sizeof(BOX));
- 
- 	n->high.x = Min(a->high.x, b->high.x);
- 	n->high.y = Min(a->high.y, b->high.y);
- 	n->low.x = Max(a->low.x, b->low.x);
- 	n->low.y = Max(a->low.y, b->low.y);
- 
- 	if (n->high.x < n->low.x || n->high.y < n->low.y)
- 	{
- 		pfree(n);
- 		/* Indicate "no intersection" by returning NULL pointer */
- 		n = NULL;
- 	}
- 
- 	PG_RETURN_BOX_P(n);
- }
- 
  /*
   * The GiST Consistent method for boxes
   *
--- 53,58 ----
***************
*** 194,279 **** gist_box_penalty(PG_FUNCTION_ARGS)
  	PG_RETURN_POINTER(result);
  }
  
- static void
- chooseLR(GIST_SPLITVEC *v,
- 		 OffsetNumber *list1, int nlist1, BOX *union1,
- 		 OffsetNumber *list2, int nlist2, BOX *union2)
- {
- 	bool		firstToLeft = true;
- 
- 	if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- 	{
- 		if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- 		{
- 			BOX			LRl = *union1,
- 						LRr = *union2;
- 			BOX			RLl = *union2,
- 						RLr = *union1;
- 			double		sizeLR,
- 						sizeRL;
- 
- 			adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- 			adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
- 
- 			sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- 			sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
- 
- 			if (sizeLR > sizeRL)
- 				firstToLeft = false;
- 
- 		}
- 		else
- 		{
- 			float		p1,
- 						p2;
- 			GISTENTRY	oldUnion,
- 						addon;
- 
- 			gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- 						  NULL, NULL, InvalidOffsetNumber, FALSE);
- 
- 			gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- 			gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
- 
- 			if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- 				firstToLeft = false;
- 		}
- 	}
- 
- 	if (firstToLeft)
- 	{
- 		v->spl_left = list1;
- 		v->spl_right = list2;
- 		v->spl_nleft = nlist1;
- 		v->spl_nright = nlist2;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union1);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union2);
- 	}
- 	else
- 	{
- 		v->spl_left = list2;
- 		v->spl_right = list1;
- 		v->spl_nleft = nlist2;
- 		v->spl_nright = nlist1;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union2);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union1);
- 	}
- 
- 	v->spl_ldatum_exists = v->spl_rdatum_exists = false;
- }
- 
  /*
   * Trivial split: half of entries will be placed on one page
   * and another half - to another
--- 174,179 ----
***************
*** 338,536 **** fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
  }
  
  /*
!  * The GiST PickSplit method
   *
!  * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
!  * C.H.Ang and T.C.Tan
   *
!  * This is used for both boxes and points.
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i;
! 	OffsetNumber *listL,
! 			   *listR,
! 			   *listB,
! 			   *listT;
! 	BOX		   *unionL,
! 			   *unionR,
! 			   *unionB,
! 			   *unionT;
! 	int			posL,
! 				posR,
! 				posB,
! 				posT;
! 	BOX			pageunion;
! 	BOX		   *cur;
! 	char		direction = ' ';
! 	bool		allisequal = true;
! 	OffsetNumber maxoff;
! 	int			nbytes;
  
- 	posL = posR = posB = posT = 0;
  	maxoff = entryvec->n - 1;
  
! 	cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
! 	memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
  
! 	/* find MBR */
! 	for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (allisequal && (
! 						   pageunion.high.x != cur->high.x ||
! 						   pageunion.high.y != cur->high.y ||
! 						   pageunion.low.x != cur->low.x ||
! 						   pageunion.low.y != cur->low.y
! 						   ))
! 			allisequal = false;
! 
! 		adjustBox(&pageunion, cur);
  	}
  
! 	if (allisequal)
  	{
  		/*
! 		 * All entries are the same
  		 */
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
  	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	listL = (OffsetNumber *) palloc(nbytes);
! 	listR = (OffsetNumber *) palloc(nbytes);
! 	listB = (OffsetNumber *) palloc(nbytes);
! 	listT = (OffsetNumber *) palloc(nbytes);
! 	unionL = (BOX *) palloc(sizeof(BOX));
! 	unionR = (BOX *) palloc(sizeof(BOX));
! 	unionB = (BOX *) palloc(sizeof(BOX));
! 	unionT = (BOX *) palloc(sizeof(BOX));
! 
! #define ADDLIST( list, unionD, pos, num ) do { \
! 	if ( pos ) { \
! 		if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x	= cur->high.x; \
! 		if ( (unionD)->low.x  > cur->low.x	) (unionD)->low.x	= cur->low.x; \
! 		if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y	= cur->high.y; \
! 		if ( (unionD)->low.y  > cur->low.y	) (unionD)->low.y	= cur->low.y; \
! 	} else { \
! 			memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) );	\
! 	} \
! 	(list)[pos] = num; \
! 	(pos)++; \
! } while(0)
  
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
! 	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
! 			ADDLIST(listL, unionL, posL, i);
! 		else
! 			ADDLIST(listR, unionR, posR, i);
! 		if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
! 			ADDLIST(listB, unionB, posB, i);
! 		else
! 			ADDLIST(listT, unionT, posT, i);
! 	}
  
! #define LIMIT_RATIO 0.1
! #define _IS_BADRATIO(x,y)	( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
! #define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
! 	/* bad disposition, try to split by centers of boxes  */
! 	if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  	{
! 		double		avgCenterX = 0.0,
! 					avgCenterY = 0.0;
! 		double		CenterX,
! 					CenterY;
  
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 			avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
  		}
! 
! 		avgCenterX /= maxoff;
! 		avgCenterY /= maxoff;
! 
! 		posL = posR = posB = posT = 0;
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 
! 			CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
  
! 			if (CenterX < avgCenterX)
! 				ADDLIST(listL, unionL, posL, i);
! 			else if (CenterX == avgCenterX)
  			{
! 				if (posL > posR)
! 					ADDLIST(listR, unionR, posR, i);
! 				else
! 					ADDLIST(listL, unionL, posL, i);
  			}
  			else
- 				ADDLIST(listR, unionR, posR, i);
- 
- 			if (CenterY < avgCenterY)
- 				ADDLIST(listB, unionB, posB, i);
- 			else if (CenterY == avgCenterY)
  			{
! 				if (posB > posT)
! 					ADDLIST(listT, unionT, posT, i);
! 				else
! 					ADDLIST(listB, unionB, posB, i);
  			}
- 			else
- 				ADDLIST(listT, unionT, posT, i);
  		}
! 
! 		if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  		{
! 			fallbackSplit(entryvec, v);
! 			PG_RETURN_POINTER(v);
  		}
  	}
  
! 	/* which split more optimal? */
! 	if (Max(posL, posR) < Max(posB, posT))
! 		direction = 'x';
! 	else if (Max(posL, posR) > Max(posB, posT))
! 		direction = 'y';
! 	else
  	{
! 		Datum		interLR = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionL),
! 												  BoxPGetDatum(unionR));
! 		Datum		interBT = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionB),
! 												  BoxPGetDatum(unionT));
! 		double		sizeLR,
! 					sizeBT;
! 
! 		sizeLR = size_box(interLR);
! 		sizeBT = size_box(interBT);
! 
! 		if (sizeLR < sizeBT)
! 			direction = 'x';
! 		else
! 			direction = 'y';
! 	}
  
! 	if (direction == 'x')
! 		chooseLR(v,
! 				 listL, posL, unionL,
! 				 listR, posR, unionR);
! 	else
! 		chooseLR(v,
! 				 listB, posB, unionB,
! 				 listT, posT, unionT);
  
  	PG_RETURN_POINTER(v);
  }
  
--- 238,825 ----
  }
  
  /*
!  * Represents information about entry which can be present to any group without
!  * affecting overlap over selected axis ("common entry").
!  */
! typedef struct
! {
! 	/* Index of entry in the initial array */
! 	int			index;
! 	/* Delta between penalties of entry insertion into different groups */
! 	double		delta;
! }	CommonEntry;
! 
! /*
!  * Context for g_box_consider_split. Contains information about currently
!  * selected split and some general information.
!  */
! typedef struct
! {
! 	/* number of splitting entries */
! 	int			entriesCount;
! 	/* minimum bounding box across all entries */
! 	BOX			boundingBox;
! 
! 	/* Information about currently selected split follows */
! 
! 	bool		first;	/* true if no split was selected yet */
! 
! 	double		leftUpper;		/* upper bound of left interval */
! 	double		rightLower;		/* lower bound of right interval */
! 
! 	float4		ratio;
! 	float4		overlap;
! 	int			dim;			/* axis of this split */
! 	double		range;			/* width of general MBR projection to the selected axis */
! }	ConsiderSplitContext;
! 
! /*
!  * Interval represents projection of box to axis.
!  */
! typedef struct
! {
! 	double		lower,
! 				upper;
! }	SplitInterval;
! 
! /*
!  * Interval comparison function by lower bound of the interval;
!  */
! static int
! interval_cmp_lower(const void *i1, const void *i2)
! {
! 	double		lower1 = ((SplitInterval *) i1)->lower,
! 				lower2 = ((SplitInterval *) i2)->lower;
! 
! 	if (lower1 < lower2)
! 		return -1;
! 	else if (lower1 > lower2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Interval comparison function by upper bound of the interval;
!  */
! static int
! interval_cmp_upper(const void *i1, const void *i2)
! {
! 	double		upper1 = ((SplitInterval *) i1)->upper,
! 				upper2 = ((SplitInterval *) i2)->upper;
! 
! 	if (upper1 < upper2)
! 		return -1;
! 	else if (upper1 > upper2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Replace negative value with zero.
!  */
! static inline float
! non_negative(float val)
! {
! 	if (val >= 0.0f)
! 		return val;
! 	else
! 		return 0.0f;
! }
! 
! /*
!  * Consider replacement of currently selected split with the better one.
!  */
! static void inline
! g_box_consider_split(ConsiderSplitContext *context, int dimNum,
! 					 double rightLower, int minLeftCount,
! 					 double leftUpper, int maxLeftCount)
! {
! 	int			leftCount,
! 				rightCount;
! 	float4		ratio,
! 				overlap;
! 	double		range;
! 
! 	/*
! 	 * Calculate entries distribution ration assuming most uniform
! 	 * distribution of common entries.
! 	 */
! 	if (minLeftCount >= (context->entriesCount + 1) / 2)
! 	{
! 		leftCount = minLeftCount;
! 	}
! 	else
! 	{
! 		if (maxLeftCount <= context->entriesCount / 2)
! 			leftCount = maxLeftCount;
! 		else
! 			leftCount = context->entriesCount / 2;
! 	}
! 	rightCount = context->entriesCount - leftCount;
! 
! 	/*
! 	 * Ratio of split - quotient between size of lesser group and total entries
! 	 * count.
! 	 */
! 	ratio = ((float4) Min(leftCount, rightCount)) /
! 		((float4) context->entriesCount);
! 
! 	if (ratio > LIMIT_RATIO)
! 	{
! 		bool selectthis = false;
! 
! 		/*
! 		 * The ratio is acceptable, so compare current split with previously
! 		 * selected one. Between splits of one dimension we search for
! 		 * minimal overlap (allowing negative values) and minimal ration
! 		 * (between same overlaps. We switch dimension if find less overlap
! 		 * (non-negative) or less range with same overlap.
! 		 */
! 		if (dimNum == 0)
! 			range = context->boundingBox.high.x - context->boundingBox.low.x;
! 		else
! 			range = context->boundingBox.high.y - context->boundingBox.low.y;
! 
! 		overlap = (leftUpper - rightLower) / range;
! 
! 		/* If there is no previous selection, select this */
! 		if (context->first)
! 			selectthis = true;
! 		else if (context->dim == dimNum)
! 		{
! 			/*
! 			 * Within the same dimension, choose the new split if it has a
! 			 * smaller overlap, or same overlap but better ratio.
! 			 */
! 			if (overlap < context->overlap ||
! 				(overlap == context->overlap && ratio > context->ratio))
! 				selectthis = true;
! 		}
! 		else
! 		{
! 			/*
! 			 * Across dimensions, choose the new split if it has a
! 			 * smaller *non-negative* overlap, or same *non-negative* overlap
! 			 * but bigger range. This condition differs from described in the
! 			 * article. On the datasets where leaf MBRs don't overlap
! 			 * themselves, non-overlapping splits (i.e. splits which have zero
! 			 * *non-negative* overlap) are frequently possible. In this case
! 			 * splits tends to be along one dimension, because most distant
! 			 * non-overlapping splits (i.e. having lowest negative overlap)
! 			 * appears to be in the same dimension as in the previous split.
! 			 * Therefore MBRs appear to be very prolonged along another
! 			 * dimension, that leads to bad search performance. Using range
! 			 * as the second split criteria makes MBRs more quadratic. Using
! 			 * *non-negative* overlap instead of overlap as the first split
! 			 * criteria gives to range criteria chance to matter, because
! 			 * non-overlapping splits are equivalent in this criteria.
! 			 */
! 			if (non_negative(overlap) < non_negative(context->overlap) ||
! 				(range > context->range &&
! 				 non_negative(overlap) <= non_negative(context->overlap)))
! 				selectthis = true;
! 		}
! 
! 		if (selectthis)
! 		{
! 			/* save information about selected split */
! 			context->first = false;
! 			context->ratio = ratio;
! 			context->range = range;
! 			context->overlap = overlap;
! 			context->rightLower = rightLower;
! 			context->leftUpper = leftUpper;
! 			context->dim = dimNum;
! 		}
! 	}
! }
! 
! /*
!  * Create new empty BOX
!  */
! static BOX *
! empty_box(void)
! {
! 	BOX		   *result;
! 
! 	result = (BOX *) palloc(sizeof(BOX));
! 	result->low.x = 0.0f;
! 	result->low.y = 0.0f;
! 	result->high.x = 0.0f;
! 	result->high.y = 0.0f;
! 	return result;
! }
! 
! /*
!  * Return increase of original BOX area by new BOX area insertion.
!  */
! static double
! box_penalty(BOX *original, BOX *new)
! {
! 	double		union_width,
! 				union_height;
! 
! 	union_width = Max(original->high.x, new->high.x) -
! 		Min(original->low.x, new->low.x);
! 	union_height = Max(original->high.y, new->high.y) -
! 		Min(original->low.y, new->low.y);
! 	return union_width * union_height - (original->high.x - original->low.x) *
! 		(original->high.y - original->low.y);
! }
! 
! /*
!  * Compare common entries by their deltas.
!  */
! static int
! common_entry_cmp(const void *i1, const void *i2)
! {
! 	double		delta1 = ((CommonEntry *) i1)->delta,
! 				delta2 = ((CommonEntry *) i2)->delta;
! 
! 	if (delta1 < delta2)
! 		return -1;
! 	else if (delta1 > delta2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * --------------------------------------------------------------------------
!  * Double sorting split algorithm. Finds split of boxes by considering splits
!  * along each axis. Searching splits along axes are based on the notions of
!  * split pair and corner split pair.
   *
!  * Split pair is a pair <a, b> where 'a' is upper bound of left group and 'b'
!  * is lower bound of right group. Lower bound or left group and upper bound of
!  * right group are assumed to be the general bounds along axis.
   *
!  * Corner split pair is a split pair <a, b> where 'a' is upper bound of some
!  * interval, 'b' is lower bound of some, 'a' can't be lower or 'b' can't be
!  * higher with split pair property preservation.
!  *
!  * Split along axis divides entries to the following groups:
!  * 1) Entries which should be placed to the left group
!  * 2) Entries which should be placed to the right group
!  * 3) "Common entries" which can be placed to any of groups without affecting
!  *	  of overlap along selected axis.
!  *
!  * Split along axis is selected by overlap along that axis and some other
!  * criteria (see g_box_consider_split). When split along axis is selected,
!  * "common entries" are distributed by penalty with group boxes.
!  *
!  * For details see:
!  * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
!  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
!  * --------------------------------------------------------------------------
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i,
! 				maxoff;
! 	ConsiderSplitContext context;
! 	BOX		   *box,
! 			   *leftBox,
! 			   *rightBox;
! 	int			dim,
! 				commonEntriesCount,
! 				nbytes;
! 	SplitInterval *intervalsLower,
! 			   *intervalsUpper;
! 	CommonEntry *commonEntries;
! 	int			nentries;
! 
! #define PLACE_LEFT(box, off)					\
! 	do {										\
! 		if (v->spl_nleft > 0)					\
! 			adjustBox(leftBox, box);			\
! 		else									\
! 			*leftBox = *(box);					\
! 		v->spl_left[v->spl_nleft++] = off;		\
! 	} while(0)
! 
! #define PLACE_RIGHT(box, off)					\
! 	do {										\
! 		if (v->spl_nright > 0)					\
! 			adjustBox(rightBox, box);			\
! 		else									\
! 			*rightBox = *(box);					\
! 		v->spl_right[v->spl_nright++] = off;	\
! 	} while(0)
! 
! 	memset(&context, 0, sizeof(ConsiderSplitContext));
  
  	maxoff = entryvec->n - 1;
+ 	nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
  
! 	/* Allocate arrays for intervals along axes */
! 	intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
! 	intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
  
! 	/*
! 	 * Calculate the overall minimum bounding box over all the entries.
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (i == FirstOffsetNumber)
! 			context.boundingBox = *box;
! 		else
! 			adjustBox(&context.boundingBox, box);
  	}
  
! 	/*
! 	 * Iterate over axes for optimal split searching.
! 	 */
! 	context.first = true;		/* nothing selected yet */
! 	for (dim = 0; dim < 2; dim++)
  	{
+ 		double		leftUpper,
+ 					rightLower;
+ 		int			i1,
+ 					i2;
+ 
+ 		/* Prepare array of intervals along axis */
+ 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ 		{
+ 			box = DatumGetBoxP(entryvec->vector[i].key);
+ 			if (dim == 0)
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ 			}
+ 			else
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ 			}
+ 		}
+ 
  		/*
! 		 * Make two arrays of intervals: sorted by lower bound and sorted by
! 		 * upper bound.
  		 */
+ 		memcpy(intervalsUpper, intervalsLower,
+ 			   sizeof(SplitInterval) * nentries);
+ 		qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_lower);
+ 		qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_upper);
+ 
+ 		/*
+ 		 * Iterate over lower bound of right group finding corresponding
+ 		 * lowest upper bound of left group.
+ 		 */
+ 		i1 = 0;
+ 		i2 = 0;
+ 		rightLower = intervalsLower[i1].lower;
+ 		leftUpper = intervalsUpper[i2].lower;
+ 
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next lower bound of right group.
+ 			 */
+ 			while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ 			{
+ 				leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ 				i1++;
+ 			}
+ 			if (i1 >= nentries)
+ 				break;
+ 			rightLower = intervalsLower[i1].lower;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * left group.
+ 			 */
+ 			while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ 				i2++;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ 		}
+ 
+ 		/*
+ 		 * Iterate over upper bound of left group finding corresponding
+ 		 * highest lower bound of right group.
+ 		 */
+ 		i1 = nentries - 1;
+ 		i2 = nentries - 1;
+ 		rightLower = intervalsLower[i1].upper;
+ 		leftUpper = intervalsUpper[i2].upper;
+ 
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next upper bound of left group.
+ 			 */
+ 			while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ 			{
+ 				rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ 				i2--;
+ 			}
+ 			if (i2 < 0)
+ 				break;
+ 			leftUpper = intervalsUpper[i2].upper;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * right group.
+ 			 */
+ 			while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ 				i1--;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim,
+ 								 rightLower, i1 + 1, leftUpper, i2 + 1);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * If we failed to find any acceptable splits, use trivial split.
+ 	 */
+ 	if (context.first)
+ 	{
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
+ 	/* Allocate vectors for results */
  	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	v->spl_left = (OffsetNumber *) palloc(nbytes);
! 	v->spl_right = (OffsetNumber *) palloc(nbytes);
! 	v->spl_nleft = 0;
! 	v->spl_nright = 0;
  
! 	/* Allocate boxes of left and right groups */
! 	leftBox = empty_box();
! 	rightBox = empty_box();
  
! 	/*
! 	 * Allocate an array for "Common entries" - entries which can be placed
! 	 * to either group without affecting overlap along selected axis.
! 	 */
! 	commonEntriesCount = 0;
! 	commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
! 
! 	/*
! 	 * Distribute entries which can be distributed unambiguously, and collect
! 	 * "common entries".
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		double		lower,
! 					upper;
  
! 		/*
! 		 * Get upper and lower bounds along selected axis.
! 		 */
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (context.dim == 0)
  		{
! 			lower = box->low.x;
! 			upper = box->high.x;
  		}
! 		else
  		{
! 			lower = box->low.y;
! 			upper = box->high.y;
! 		}
  
! 		if (upper <= context.leftUpper)
! 		{
! 			/* Fits to the left group */
! 			if (lower >= context.rightLower)
  			{
! 				/* Fits also to the right group, so "common entry" */
! 				commonEntries[commonEntriesCount++].index = i;
  			}
  			else
  			{
! 				/* Doesn't fit to the right group, so join to the left group */
! 				PLACE_LEFT(box, i);
  			}
  		}
! 		else
  		{
! 			/*
! 			 * Each entry should fit on either left or right group, so since
! 			 * this didn't fit on the left group, it better fit in the right
! 			 * group.
! 			 */
! 			Assert(lower >= context.rightLower);
! 
! 			/* Doesn't fit to the left group, so join to the right group */
! 			PLACE_RIGHT(box, i);
  		}
  	}
  
! 	/*
! 	 * Distribute "common entries", if any
! 	 */
! 	if (commonEntriesCount > 0)
  	{
! 		/*
! 		 * Calculate minimum number of entries that must be placed in both
! 		 * groups, to reach LIMIT_RATIO.
! 		 */
! 		int		m = ceil(LIMIT_RATIO * (double) nentries);
  
! 		/*
! 		 * Calculate delta between penalties of join "common entries" to
! 		 * different groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 			commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
! 										 box_penalty(rightBox, box));
! 		}
! 
! 		/*
! 		 * Sort "common entries" by calculated deltas in order to distribute
! 		 * the most ambiguous entries first.
! 		 */
! 		qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
! 
! 		/*
! 		 * Distribute "common entries" between groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 
! 			/*
! 			 * Check if we have to place this entry in either group to achieve 
! 			 * LIMIT_RATIO.
! 			 */
! 			if (v->spl_nleft + (commonEntriesCount - i) <= m)
! 				PLACE_LEFT(box, commonEntries[i].index);
! 			else if (v->spl_nright + (commonEntriesCount - i) <= m)
! 				PLACE_RIGHT(box, commonEntries[i].index);
! 			else
! 			{
! 				/* Otherwise select the group by minimal penalty */
! 				if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
! 					PLACE_LEFT(box, commonEntries[i].index);
! 				else
! 					PLACE_RIGHT(box, commonEntries[i].index);
! 			}
! 		}
! 	}
  
+ 	v->spl_ldatum = PointerGetDatum(leftBox);
+ 	v->spl_rdatum = PointerGetDatum(rightBox);
  	PG_RETURN_POINTER(v);
  }
  
#19Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#18)
1 attachment(s)
Re: Double sorting split patch

On 04.10.2011 15:10, Alexander Korotkov wrote:

On Tue, Oct 4, 2011 at 1:46 PM, Heikki Linnakangas<
heikki.linnakangas@enterprisedb.com> wrote:

Ok. Could you phrase that as a code comment?

Here's a version of the patch I've been working on. There's no functional
changes, just a lot of moving things around, comment changes, etc. to
hopefully make it more readable.

Thanks for your work on this patch. Patch with comment is attached.

Thanks, I incorporated that, and did a lot of other comment changes. I
included the example you gave earlier on how the first phase of the
algorithm works, in a comment. Please review, and if you have some test
cases at hand, run them. I think this is ready for commit now.

One more thing:

/* Allocate vectors for results */
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);

Why "maxoff + 2" ? Allocating a few extra bytes is obviously harmless,
but I wonder if it was just a leftover from something.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

Attachments:

double-sorting-split-0.6-heikki.patchtext/x-diff; name=double-sorting-split-0.6-heikki.patchDownload
*** a/src/backend/access/gist/gistproc.c
--- b/src/backend/access/gist/gistproc.c
***************
*** 27,32 **** static double size_box(Datum dbox);
--- 27,35 ----
  static bool rtree_internal_consistent(BOX *key, BOX *query,
  						  StrategyNumber strategy);
  
+ /* Minimal possible ratio of split */
+ #define LIMIT_RATIO 0.3
+ 
  
  /**************************************************
   * Box ops
***************
*** 49,78 **** rt_box_union(PG_FUNCTION_ARGS)
  	PG_RETURN_BOX_P(n);
  }
  
- static Datum
- rt_box_inter(PG_FUNCTION_ARGS)
- {
- 	BOX		   *a = PG_GETARG_BOX_P(0);
- 	BOX		   *b = PG_GETARG_BOX_P(1);
- 	BOX		   *n;
- 
- 	n = (BOX *) palloc(sizeof(BOX));
- 
- 	n->high.x = Min(a->high.x, b->high.x);
- 	n->high.y = Min(a->high.y, b->high.y);
- 	n->low.x = Max(a->low.x, b->low.x);
- 	n->low.y = Max(a->low.y, b->low.y);
- 
- 	if (n->high.x < n->low.x || n->high.y < n->low.y)
- 	{
- 		pfree(n);
- 		/* Indicate "no intersection" by returning NULL pointer */
- 		n = NULL;
- 	}
- 
- 	PG_RETURN_BOX_P(n);
- }
- 
  /*
   * The GiST Consistent method for boxes
   *
--- 52,57 ----
***************
*** 194,279 **** gist_box_penalty(PG_FUNCTION_ARGS)
  	PG_RETURN_POINTER(result);
  }
  
- static void
- chooseLR(GIST_SPLITVEC *v,
- 		 OffsetNumber *list1, int nlist1, BOX *union1,
- 		 OffsetNumber *list2, int nlist2, BOX *union2)
- {
- 	bool		firstToLeft = true;
- 
- 	if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- 	{
- 		if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- 		{
- 			BOX			LRl = *union1,
- 						LRr = *union2;
- 			BOX			RLl = *union2,
- 						RLr = *union1;
- 			double		sizeLR,
- 						sizeRL;
- 
- 			adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- 			adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
- 
- 			sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- 			sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
- 
- 			if (sizeLR > sizeRL)
- 				firstToLeft = false;
- 
- 		}
- 		else
- 		{
- 			float		p1,
- 						p2;
- 			GISTENTRY	oldUnion,
- 						addon;
- 
- 			gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- 						  NULL, NULL, InvalidOffsetNumber, FALSE);
- 
- 			gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- 			gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
- 
- 			if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- 				firstToLeft = false;
- 		}
- 	}
- 
- 	if (firstToLeft)
- 	{
- 		v->spl_left = list1;
- 		v->spl_right = list2;
- 		v->spl_nleft = nlist1;
- 		v->spl_nright = nlist2;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union1);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union2);
- 	}
- 	else
- 	{
- 		v->spl_left = list2;
- 		v->spl_right = list1;
- 		v->spl_nleft = nlist2;
- 		v->spl_nright = nlist1;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union2);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union1);
- 	}
- 
- 	v->spl_ldatum_exists = v->spl_rdatum_exists = false;
- }
- 
  /*
   * Trivial split: half of entries will be placed on one page
   * and another half - to another
--- 173,178 ----
***************
*** 338,536 **** fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
  }
  
  /*
!  * The GiST PickSplit method
   *
!  * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
!  * C.H.Ang and T.C.Tan
   *
!  * This is used for both boxes and points.
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i;
! 	OffsetNumber *listL,
! 			   *listR,
! 			   *listB,
! 			   *listT;
! 	BOX		   *unionL,
! 			   *unionR,
! 			   *unionB,
! 			   *unionT;
! 	int			posL,
! 				posR,
! 				posB,
! 				posT;
! 	BOX			pageunion;
! 	BOX		   *cur;
! 	char		direction = ' ';
! 	bool		allisequal = true;
! 	OffsetNumber maxoff;
! 	int			nbytes;
  
- 	posL = posR = posB = posT = 0;
  	maxoff = entryvec->n - 1;
  
! 	cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
! 	memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
  
! 	/* find MBR */
! 	for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (allisequal && (
! 						   pageunion.high.x != cur->high.x ||
! 						   pageunion.high.y != cur->high.y ||
! 						   pageunion.low.x != cur->low.x ||
! 						   pageunion.low.y != cur->low.y
! 						   ))
! 			allisequal = false;
! 
! 		adjustBox(&pageunion, cur);
  	}
  
! 	if (allisequal)
  	{
  		/*
! 		 * All entries are the same
  		 */
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
  	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	listL = (OffsetNumber *) palloc(nbytes);
! 	listR = (OffsetNumber *) palloc(nbytes);
! 	listB = (OffsetNumber *) palloc(nbytes);
! 	listT = (OffsetNumber *) palloc(nbytes);
! 	unionL = (BOX *) palloc(sizeof(BOX));
! 	unionR = (BOX *) palloc(sizeof(BOX));
! 	unionB = (BOX *) palloc(sizeof(BOX));
! 	unionT = (BOX *) palloc(sizeof(BOX));
! 
! #define ADDLIST( list, unionD, pos, num ) do { \
! 	if ( pos ) { \
! 		if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x	= cur->high.x; \
! 		if ( (unionD)->low.x  > cur->low.x	) (unionD)->low.x	= cur->low.x; \
! 		if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y	= cur->high.y; \
! 		if ( (unionD)->low.y  > cur->low.y	) (unionD)->low.y	= cur->low.y; \
! 	} else { \
! 			memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) );	\
! 	} \
! 	(list)[pos] = num; \
! 	(pos)++; \
! } while(0)
  
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
! 	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
! 			ADDLIST(listL, unionL, posL, i);
! 		else
! 			ADDLIST(listR, unionR, posR, i);
! 		if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
! 			ADDLIST(listB, unionB, posB, i);
! 		else
! 			ADDLIST(listT, unionT, posT, i);
! 	}
  
! #define LIMIT_RATIO 0.1
! #define _IS_BADRATIO(x,y)	( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
! #define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
! 	/* bad disposition, try to split by centers of boxes  */
! 	if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  	{
! 		double		avgCenterX = 0.0,
! 					avgCenterY = 0.0;
! 		double		CenterX,
! 					CenterY;
  
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 			avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
  		}
! 
! 		avgCenterX /= maxoff;
! 		avgCenterY /= maxoff;
! 
! 		posL = posR = posB = posT = 0;
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 
! 			CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
  
! 			if (CenterX < avgCenterX)
! 				ADDLIST(listL, unionL, posL, i);
! 			else if (CenterX == avgCenterX)
  			{
! 				if (posL > posR)
! 					ADDLIST(listR, unionR, posR, i);
! 				else
! 					ADDLIST(listL, unionL, posL, i);
  			}
  			else
- 				ADDLIST(listR, unionR, posR, i);
- 
- 			if (CenterY < avgCenterY)
- 				ADDLIST(listB, unionB, posB, i);
- 			else if (CenterY == avgCenterY)
  			{
! 				if (posB > posT)
! 					ADDLIST(listT, unionT, posT, i);
! 				else
! 					ADDLIST(listB, unionB, posB, i);
  			}
- 			else
- 				ADDLIST(listT, unionT, posT, i);
  		}
! 
! 		if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  		{
! 			fallbackSplit(entryvec, v);
! 			PG_RETURN_POINTER(v);
  		}
  	}
  
! 	/* which split more optimal? */
! 	if (Max(posL, posR) < Max(posB, posT))
! 		direction = 'x';
! 	else if (Max(posL, posR) > Max(posB, posT))
! 		direction = 'y';
! 	else
  	{
! 		Datum		interLR = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionL),
! 												  BoxPGetDatum(unionR));
! 		Datum		interBT = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionB),
! 												  BoxPGetDatum(unionT));
! 		double		sizeLR,
! 					sizeBT;
! 
! 		sizeLR = size_box(interLR);
! 		sizeBT = size_box(interBT);
! 
! 		if (sizeLR < sizeBT)
! 			direction = 'x';
! 		else
! 			direction = 'y';
! 	}
  
! 	if (direction == 'x')
! 		chooseLR(v,
! 				 listL, posL, unionL,
! 				 listR, posR, unionR);
! 	else
! 		chooseLR(v,
! 				 listB, posB, unionB,
! 				 listT, posT, unionT);
  
  	PG_RETURN_POINTER(v);
  }
  
--- 237,841 ----
  }
  
  /*
!  * Represents information about an entry that can be placed to either group
!  * without affecting overlap over selected axis ("common entry").
!  */
! typedef struct
! {
! 	/* Index of entry in the initial array */
! 	int			index;
! 	/* Delta between penalties of entry insertion into different groups */
! 	double		delta;
! }	CommonEntry;
! 
! /*
!  * Context for g_box_consider_split. Contains information about currently
!  * selected split and some general information.
!  */
! typedef struct
! {
! 	int			entriesCount;	/* total number of entries being split */
! 	BOX			boundingBox;	/* minimum bounding box across all entries */
! 
! 	/* Information about currently selected split follows */
! 
! 	bool		first;			/* true if no split was selected yet */
! 
! 	double		leftUpper;		/* upper bound of left interval */
! 	double		rightLower;		/* lower bound of right interval */
! 
! 	float4		ratio;
! 	float4		overlap;
! 	int			dim;			/* axis of this split */
! 	double		range;			/* width of general MBR projection to the
! 								 * selected axis */
! }	ConsiderSplitContext;
! 
! /*
!  * Interval represents projection of box to axis.
!  */
! typedef struct
! {
! 	double		lower,
! 				upper;
! }	SplitInterval;
! 
! /*
!  * Interval comparison function by lower bound of the interval;
!  */
! static int
! interval_cmp_lower(const void *i1, const void *i2)
! {
! 	double		lower1 = ((SplitInterval *) i1)->lower,
! 				lower2 = ((SplitInterval *) i2)->lower;
! 
! 	if (lower1 < lower2)
! 		return -1;
! 	else if (lower1 > lower2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Interval comparison function by upper bound of the interval;
!  */
! static int
! interval_cmp_upper(const void *i1, const void *i2)
! {
! 	double		upper1 = ((SplitInterval *) i1)->upper,
! 				upper2 = ((SplitInterval *) i2)->upper;
! 
! 	if (upper1 < upper2)
! 		return -1;
! 	else if (upper1 > upper2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Replace negative value with zero.
!  */
! static inline float
! non_negative(float val)
! {
! 	if (val >= 0.0f)
! 		return val;
! 	else
! 		return 0.0f;
! }
! 
! /*
!  * Consider replacement of currently selected split with the better one.
!  */
! static void inline
! g_box_consider_split(ConsiderSplitContext *context, int dimNum,
! 					 double rightLower, int minLeftCount,
! 					 double leftUpper, int maxLeftCount)
! {
! 	int			leftCount,
! 				rightCount;
! 	float4		ratio,
! 				overlap;
! 	double		range;
! 
! 	/*
! 	 * Calculate entries distribution ratio assuming most uniform distribution
! 	 * of common entries.
! 	 */
! 	if (minLeftCount >= (context->entriesCount + 1) / 2)
! 	{
! 		leftCount = minLeftCount;
! 	}
! 	else
! 	{
! 		if (maxLeftCount <= context->entriesCount / 2)
! 			leftCount = maxLeftCount;
! 		else
! 			leftCount = context->entriesCount / 2;
! 	}
! 	rightCount = context->entriesCount - leftCount;
! 
! 	/*
! 	 * Ratio of split - quotient between size of lesser group and total
! 	 * entries count.
! 	 */
! 	ratio = ((float4) Min(leftCount, rightCount)) /
! 		((float4) context->entriesCount);
! 
! 	if (ratio > LIMIT_RATIO)
! 	{
! 		bool		selectthis = false;
! 
! 		/*
! 		 * The ratio is acceptable, so compare current split with previously
! 		 * selected one. Between splits of one dimension we search for minimal
! 		 * overlap (allowing negative values) and minimal ration (between same
! 		 * overlaps. We switch dimension if find less overlap (non-negative)
! 		 * or less range with same overlap.
! 		 */
! 		if (dimNum == 0)
! 			range = context->boundingBox.high.x - context->boundingBox.low.x;
! 		else
! 			range = context->boundingBox.high.y - context->boundingBox.low.y;
! 
! 		overlap = (leftUpper - rightLower) / range;
! 
! 		/* If there is no previous selection, select this */
! 		if (context->first)
! 			selectthis = true;
! 		else if (context->dim == dimNum)
! 		{
! 			/*
! 			 * Within the same dimension, choose the new split if it has a
! 			 * smaller overlap, or same overlap but better ratio.
! 			 */
! 			if (overlap < context->overlap ||
! 				(overlap == context->overlap && ratio > context->ratio))
! 				selectthis = true;
! 		}
! 		else
! 		{
! 			/*
! 			 * Across dimensions, choose the new split if it has a smaller
! 			 * *non-negative* overlap, or same *non-negative* overlap but
! 			 * bigger range. This condition differs from the one described in
! 			 * the article. On the datasets where leaf MBRs don't overlap
! 			 * themselves, non-overlapping splits (i.e. splits which have zero
! 			 * *non-negative* overlap) are frequently possible. In this case
! 			 * splits tends to be along one dimension, because most distant
! 			 * non-overlapping splits (i.e. having lowest negative overlap)
! 			 * appears to be in the same dimension as in the previous split.
! 			 * Therefore MBRs appear to be very prolonged along another
! 			 * dimension, which leads to bad search performance. Using range
! 			 * as the second split criteria makes MBRs more quadratic. Using
! 			 * *non-negative* overlap instead of overlap as the first split
! 			 * criteria gives to range criteria a chance to matter, because
! 			 * non-overlapping splits are equivalent in this criteria.
! 			 */
! 			if (non_negative(overlap) < non_negative(context->overlap) ||
! 				(range > context->range &&
! 				 non_negative(overlap) <= non_negative(context->overlap)))
! 				selectthis = true;
! 		}
! 
! 		if (selectthis)
! 		{
! 			/* save information about selected split */
! 			context->first = false;
! 			context->ratio = ratio;
! 			context->range = range;
! 			context->overlap = overlap;
! 			context->rightLower = rightLower;
! 			context->leftUpper = leftUpper;
! 			context->dim = dimNum;
! 		}
! 	}
! }
! 
! /*
!  * Return increase of original BOX area by new BOX area insertion.
!  */
! static double
! box_penalty(BOX *original, BOX *new)
! {
! 	double		union_width,
! 				union_height;
! 
! 	union_width = Max(original->high.x, new->high.x) -
! 		Min(original->low.x, new->low.x);
! 	union_height = Max(original->high.y, new->high.y) -
! 		Min(original->low.y, new->low.y);
! 	return union_width * union_height - (original->high.x - original->low.x) *
! 		(original->high.y - original->low.y);
! }
! 
! /*
!  * Compare common entries by their deltas.
!  */
! static int
! common_entry_cmp(const void *i1, const void *i2)
! {
! 	double		delta1 = ((CommonEntry *) i1)->delta,
! 				delta2 = ((CommonEntry *) i2)->delta;
! 
! 	if (delta1 < delta2)
! 		return -1;
! 	else if (delta1 > delta2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * --------------------------------------------------------------------------
!  * Double sorting split algorithm. This is used for both boxes and points.
   *
!  * The algorithm finds split of boxes by considering splits along each axis.
!  * Each entry is first projected as an interval on the X-axis, and different
!  * ways to split the intervals into two groups are considered, trying to
!  * minimize the overlap of the groups. Then the same is repeated for the
!  * Y-axis, and the overall best split is chosen. The quality of a split is
!  * determined by overlap along that axis and some other criteria (see
!  * g_box_consider_split).
   *
!  * After that, all the entries are divided into three groups:
!  *
!  * 1) Entries which should be placed to the left group
!  * 2) Entries which should be placed to the right group
!  * 3) "Common entries" which can be placed to any of groups without affecting
!  *	  of overlap along selected axis.
!  *
!  * The common entries are distributed by minimizing penalty.
!  *
!  * For details see:
!  * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
!  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
!  * --------------------------------------------------------------------------
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i,
! 				maxoff;
! 	ConsiderSplitContext context;
! 	BOX		   *box,
! 			   *leftBox,
! 			   *rightBox;
! 	int			dim,
! 				commonEntriesCount,
! 				nbytes;
! 	SplitInterval *intervalsLower,
! 			   *intervalsUpper;
! 	CommonEntry *commonEntries;
! 	int			nentries;
! 
! 	memset(&context, 0, sizeof(ConsiderSplitContext));
  
  	maxoff = entryvec->n - 1;
+ 	nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
  
! 	/* Allocate arrays for intervals along axes */
! 	intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
! 	intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
  
! 	/*
! 	 * Calculate the overall minimum bounding box over all the entries.
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (i == FirstOffsetNumber)
! 			context.boundingBox = *box;
! 		else
! 			adjustBox(&context.boundingBox, box);
  	}
  
! 	/*
! 	 * Iterate over axes for optimal split searching.
! 	 */
! 	context.first = true;		/* nothing selected yet */
! 	for (dim = 0; dim < 2; dim++)
  	{
+ 		double		leftUpper,
+ 					rightLower;
+ 		int			i1,
+ 					i2;
+ 
+ 		/* Project each entry as an interval on the selected axis. */
+ 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ 		{
+ 			box = DatumGetBoxP(entryvec->vector[i].key);
+ 			if (dim == 0)
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ 			}
+ 			else
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ 			}
+ 		}
+ 
+ 		/*
+ 		 * Make two arrays of intervals: one sorted by lower bound and another
+ 		 * sorted by upper bound.
+ 		 */
+ 		memcpy(intervalsUpper, intervalsLower,
+ 			   sizeof(SplitInterval) * nentries);
+ 		qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_lower);
+ 		qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_upper);
+ 
+ 		/*----
+ 		 * The goal is to form a left and right interval, so that every entry
+ 		 * interval is contained by either left or right interval (or both).
+ 		 *
+ 		 * For example, with the intervals (0,1), (1,3), (2,3), (2,4):
+ 		 *
+ 		 * 0 1 2 3 4
+ 		 * +-+
+ 		 *	 +---+
+ 		 *	   +-+
+ 		 *	   +---+
+ 		 *
+ 		 * The left and right intervals are of the form (0,a) and (b,4).
+ 		 * We first consider splits where b is the lower bound of an entry.
+ 		 * We iterate through all entries, and for each b, calculate the
+ 		 * smallest possible a. Then we consider splits where a is the
+ 		 * uppper bound of an entry, and for each a, calculate the greatest
+ 		 * possible b.
+ 		 *
+ 		 * In the above example, the first loop would consider splits:
+ 		 * b=0: (0,1)-(0,4)
+ 		 * b=1: (0,1)-(1,4)
+ 		 * b=2: (0,3)-(2,4)
+ 		 *
+ 		 * And the second loop:
+ 		 * a=1: (0,1)-(1,4)
+ 		 * a=3: (0,3)-(2,4)
+ 		 * a=4: (0,4)-(2,4)
+ 		 */
+ 
+ 		/*
+ 		 * Iterate over lower bound of right group, finding smallest possible
+ 		 * upper bound of left group.
+ 		 */
+ 		i1 = 0;
+ 		i2 = 0;
+ 		rightLower = intervalsLower[i1].lower;
+ 		leftUpper = intervalsUpper[i2].lower;
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next lower bound of right group.
+ 			 */
+ 			while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ 			{
+ 				leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ 				i1++;
+ 			}
+ 			if (i1 >= nentries)
+ 				break;
+ 			rightLower = intervalsLower[i1].lower;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * left group.
+ 			 */
+ 			while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ 				i2++;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ 		}
+ 
  		/*
! 		 * Iterate over upper bound of left group finding greates possible
! 		 * lower bound of right group.
  		 */
+ 		i1 = nentries - 1;
+ 		i2 = nentries - 1;
+ 		rightLower = intervalsLower[i1].upper;
+ 		leftUpper = intervalsUpper[i2].upper;
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next upper bound of left group.
+ 			 */
+ 			while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ 			{
+ 				rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ 				i2--;
+ 			}
+ 			if (i2 < 0)
+ 				break;
+ 			leftUpper = intervalsUpper[i2].upper;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * right group.
+ 			 */
+ 			while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ 				i1--;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim,
+ 								 rightLower, i1 + 1, leftUpper, i2 + 1);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * If we failed to find any acceptable splits, use trivial split.
+ 	 */
+ 	if (context.first)
+ 	{
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
+ 	/*
+ 	 * Ok, we have now selected the split across one axis.
+ 	 *
+ 	 * While considering the splits, we already determined that there will be
+ 	 * enough entries in both groups to reach the desired ratio, but we did
+ 	 * not memorize which entries go to which group. So determine that now.
+ 	 */
+ 
+ 	/* Allocate vectors for results */
  	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	v->spl_left = (OffsetNumber *) palloc(nbytes);
! 	v->spl_right = (OffsetNumber *) palloc(nbytes);
! 	v->spl_nleft = 0;
! 	v->spl_nright = 0;
  
! 	/* Allocate bounding boxes of left and right groups */
! 	leftBox = palloc0(sizeof(BOX));
! 	rightBox = palloc0(sizeof(BOX));
  
! 	/*
! 	 * Allocate an array for "common entries" - entries which can be placed to
! 	 * either group without affecting overlap along selected axis.
! 	 */
! 	commonEntriesCount = 0;
! 	commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
! 
! 	/* Helper macros to place an entry in the left or right group */
! #define PLACE_LEFT(box, off)					\
! 	do {										\
! 		if (v->spl_nleft > 0)					\
! 			adjustBox(leftBox, box);			\
! 		else									\
! 			*leftBox = *(box);					\
! 		v->spl_left[v->spl_nleft++] = off;		\
! 	} while(0)
! 
! #define PLACE_RIGHT(box, off)					\
! 	do {										\
! 		if (v->spl_nright > 0)					\
! 			adjustBox(rightBox, box);			\
! 		else									\
! 			*rightBox = *(box);					\
! 		v->spl_right[v->spl_nright++] = off;	\
! 	} while(0)
! 
! 	/*
! 	 * Distribute entries which can be distributed unambiguously, and collect
! 	 * common entries.
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		double		lower,
! 					upper;
  
! 		/*
! 		 * Get upper and lower bounds along selected axis.
! 		 */
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (context.dim == 0)
  		{
! 			lower = box->low.x;
! 			upper = box->high.x;
  		}
! 		else
  		{
! 			lower = box->low.y;
! 			upper = box->high.y;
! 		}
  
! 		if (upper <= context.leftUpper)
! 		{
! 			/* Fits to the left group */
! 			if (lower >= context.rightLower)
  			{
! 				/* Fits also to the right group, so "common entry" */
! 				commonEntries[commonEntriesCount++].index = i;
  			}
  			else
  			{
! 				/* Doesn't fit to the right group, so join to the left group */
! 				PLACE_LEFT(box, i);
  			}
  		}
! 		else
  		{
! 			/*
! 			 * Each entry should fit on either left or right group. Since this
! 			 * entry didn't fit on the left group, it better fit in the right
! 			 * group.
! 			 */
! 			Assert(lower >= context.rightLower);
! 
! 			/* Doesn't fit to the left group, so join to the right group */
! 			PLACE_RIGHT(box, i);
  		}
  	}
  
! 	/*
! 	 * Distribute "common entries", if any.
! 	 */
! 	if (commonEntriesCount > 0)
  	{
! 		/*
! 		 * Calculate minimum number of entries that must be placed in both
! 		 * groups, to reach LIMIT_RATIO.
! 		 */
! 		int			m = ceil(LIMIT_RATIO * (double) nentries);
  
! 		/*
! 		 * Calculate delta between penalties of join "common entries" to
! 		 * different groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 			commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
! 										 box_penalty(rightBox, box));
! 		}
! 
! 		/*
! 		 * Sort "common entries" by calculated deltas in order to distribute
! 		 * the most ambiguous entries first.
! 		 */
! 		qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
! 
! 		/*
! 		 * Distribute "common entries" between groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 
! 			/*
! 			 * Check if we have to place this entry in either group to achieve
! 			 * LIMIT_RATIO.
! 			 */
! 			if (v->spl_nleft + (commonEntriesCount - i) <= m)
! 				PLACE_LEFT(box, commonEntries[i].index);
! 			else if (v->spl_nright + (commonEntriesCount - i) <= m)
! 				PLACE_RIGHT(box, commonEntries[i].index);
! 			else
! 			{
! 				/* Otherwise select the group by minimal penalty */
! 				if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
! 					PLACE_LEFT(box, commonEntries[i].index);
! 				else
! 					PLACE_RIGHT(box, commonEntries[i].index);
! 			}
! 		}
! 	}
  
+ 	v->spl_ldatum = PointerGetDatum(leftBox);
+ 	v->spl_rdatum = PointerGetDatum(rightBox);
  	PG_RETURN_POINTER(v);
  }
  
#20Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#19)
Re: Double sorting split patch

On Wed, Oct 5, 2011 at 11:37 AM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

On 04.10.2011 15:10, Alexander Korotkov wrote:

On Tue, Oct 4, 2011 at 1:46 PM, Heikki Linnakangas<
heikki.linnakangas@**enterprisedb.com<heikki.linnakangas@enterprisedb.com>>
wrote:

Ok. Could you phrase that as a code comment?

Here's a version of the patch I've been working on. There's no functional
changes, just a lot of moving things around, comment changes, etc. to
hopefully make it more readable.

Thanks for your work on this patch. Patch with comment is attached.

Thanks, I incorporated that, and did a lot of other comment changes. I
included the example you gave earlier on how the first phase of the
algorithm works, in a comment. Please review, and if you have some test
cases at hand, run them. I think this is ready for commit now.

Comments looks good, thanks. I'm going to try also some datasets from
rtreeportal.org <http://www.rtreeportal.org/&gt;

One more thing:

/* Allocate vectors for results */
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);

Why "maxoff + 2" ? Allocating a few extra bytes is obviously harmless, but
I wonder if it was just a leftover from something.

It was nested from old code. This extra bytes are useless in modern versions
of PostgreSQL as we found while seg picksplit patch discussion. Modern
version of seg picksplit doesn't contain them:
http://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=2a6ebe70fb2f7ec97a08dc07214fe2ca571d2780;hp=290f1603b4208ca6a13776f744b586a958e98d74

------
With best regards,
Alexander Korotkov.

#21Alexander Korotkov
aekorotkov@gmail.com
In reply to: Alexander Korotkov (#20)
3 attachment(s)
Re: Double sorting split patch

Path without allocating extra bytes is attached.
I run some more detailed tests on geonames and two smaller datasets from
rtreeportal.org.
Test tables are following:
1) test1 - geonames
2) test2 - California Roads, containing the MBRs of 2,249,727 streets
3) test3 - Tiger Streams, containing the MBRs of 194,971 streams
Scripts for test queries generation and execution are attached
(scripts.tar.gz).

Build times are given in the following table:
New linear Double sorting
test1 277.889630 273.766355
test2 73.262561 75.114079
test3 4.251563 4.425139
As we can see, build times differ insignificantly.

Index sizes are given in the following table:
New linear Double sorting
test1 572383232 578977792
test2 179929088 178569216
test3 15409152 15532032
As we can see, index sizes differ insignificantly.

Data about index quality testing are in the table test_results of the
following structure:
tablename - name of the queried table
count - actual count of rows matching query
nominal_count - count of rows matching query which test generator tried to
provide (test generator not always can create test query which return
exactly same amount of rows as it was expected)
strategy - split strategy: 1 - new liner split(current), 2 - double
sorting(this patch)
hits - number of pages hits for query execution.

Raw data are in the attachment (test_result.sql.gz). Report is below.
test=# select tablename, nominal_count, round(avg(count),2) as avg_count,
strategy, round(avg(hits),2) as avg_hits from test_results group by
tablename, nominal_count, strategy order by tablename, nominal_count,
strategy;
tablename | nominal_count | avg_count | strategy | avg_hits
-----------+---------------+-----------+----------+----------
test1 | 1 | 4.87 | 1 | 19.94
test1 | 1 | 4.87 | 2 | 6.46
test1 | 10 | 11.07 | 1 | 23.95
test1 | 10 | 11.07 | 2 | 7.36
test1 | 100 | 101.36 | 1 | 43.30
test1 | 100 | 101.36 | 2 | 10.19
test1 | 1000 | 998.70 | 1 | 86.48
test1 | 1000 | 998.70 | 2 | 24.21
test2 | 1 | 1.32 | 1 | 8.67
test2 | 1 | 1.32 | 2 | 5.99
test2 | 10 | 11.32 | 1 | 9.40
test2 | 10 | 11.32 | 2 | 6.71
test2 | 100 | 102.93 | 1 | 13.10
test2 | 100 | 102.93 | 2 | 9.02
test2 | 1000 | 999.67 | 1 | 32.16
test2 | 1000 | 999.67 | 2 | 23.51
test3 | 1 | 0.99 | 1 | 6.03
test3 | 1 | 0.99 | 2 | 4.32
test3 | 10 | 9.95 | 1 | 7.52
test3 | 10 | 9.95 | 2 | 5.09
test3 | 100 | 99.92 | 1 | 10.73
test3 | 100 | 99.92 | 2 | 7.73
test3 | 1000 | 999.75 | 1 | 27.47
test3 | 1000 | 999.75 | 2 | 22.44
(24 rows)

As we can see, double sorting split outperfoms new linear split in all the
presented cases. Difference in test2 and test3 is not so big as it is in
test1. It can be explained by fact that test2 and test3 are smaller than
test1.

------
With best regards,
Alexander Korotkov.

Attachments:

scripts.tar.gzapplication/x-gzip; name=scripts.tar.gzDownload
double-sorting-split-0.7.patchtext/x-patch; charset=US-ASCII; name=double-sorting-split-0.7.patchDownload
*** a/src/backend/access/gist/gistproc.c
--- b/src/backend/access/gist/gistproc.c
***************
*** 27,32 **** static double size_box(Datum dbox);
--- 27,35 ----
  static bool rtree_internal_consistent(BOX *key, BOX *query,
  						  StrategyNumber strategy);
  
+ /* Minimal possible ratio of split */
+ #define LIMIT_RATIO 0.3
+ 
  
  /**************************************************
   * Box ops
***************
*** 49,78 **** rt_box_union(PG_FUNCTION_ARGS)
  	PG_RETURN_BOX_P(n);
  }
  
- static Datum
- rt_box_inter(PG_FUNCTION_ARGS)
- {
- 	BOX		   *a = PG_GETARG_BOX_P(0);
- 	BOX		   *b = PG_GETARG_BOX_P(1);
- 	BOX		   *n;
- 
- 	n = (BOX *) palloc(sizeof(BOX));
- 
- 	n->high.x = Min(a->high.x, b->high.x);
- 	n->high.y = Min(a->high.y, b->high.y);
- 	n->low.x = Max(a->low.x, b->low.x);
- 	n->low.y = Max(a->low.y, b->low.y);
- 
- 	if (n->high.x < n->low.x || n->high.y < n->low.y)
- 	{
- 		pfree(n);
- 		/* Indicate "no intersection" by returning NULL pointer */
- 		n = NULL;
- 	}
- 
- 	PG_RETURN_BOX_P(n);
- }
- 
  /*
   * The GiST Consistent method for boxes
   *
--- 52,57 ----
***************
*** 194,279 **** gist_box_penalty(PG_FUNCTION_ARGS)
  	PG_RETURN_POINTER(result);
  }
  
- static void
- chooseLR(GIST_SPLITVEC *v,
- 		 OffsetNumber *list1, int nlist1, BOX *union1,
- 		 OffsetNumber *list2, int nlist2, BOX *union2)
- {
- 	bool		firstToLeft = true;
- 
- 	if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- 	{
- 		if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- 		{
- 			BOX			LRl = *union1,
- 						LRr = *union2;
- 			BOX			RLl = *union2,
- 						RLr = *union1;
- 			double		sizeLR,
- 						sizeRL;
- 
- 			adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- 			adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
- 
- 			sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- 			sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
- 
- 			if (sizeLR > sizeRL)
- 				firstToLeft = false;
- 
- 		}
- 		else
- 		{
- 			float		p1,
- 						p2;
- 			GISTENTRY	oldUnion,
- 						addon;
- 
- 			gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- 						  NULL, NULL, InvalidOffsetNumber, FALSE);
- 
- 			gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- 			gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
- 
- 			if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- 				firstToLeft = false;
- 		}
- 	}
- 
- 	if (firstToLeft)
- 	{
- 		v->spl_left = list1;
- 		v->spl_right = list2;
- 		v->spl_nleft = nlist1;
- 		v->spl_nright = nlist2;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union1);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union2);
- 	}
- 	else
- 	{
- 		v->spl_left = list2;
- 		v->spl_right = list1;
- 		v->spl_nleft = nlist2;
- 		v->spl_nright = nlist1;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union2);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union1);
- 	}
- 
- 	v->spl_ldatum_exists = v->spl_rdatum_exists = false;
- }
- 
  /*
   * Trivial split: half of entries will be placed on one page
   * and another half - to another
--- 173,178 ----
***************
*** 338,536 **** fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
  }
  
  /*
!  * The GiST PickSplit method
   *
!  * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
!  * C.H.Ang and T.C.Tan
   *
!  * This is used for both boxes and points.
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i;
! 	OffsetNumber *listL,
! 			   *listR,
! 			   *listB,
! 			   *listT;
! 	BOX		   *unionL,
! 			   *unionR,
! 			   *unionB,
! 			   *unionT;
! 	int			posL,
! 				posR,
! 				posB,
! 				posT;
! 	BOX			pageunion;
! 	BOX		   *cur;
! 	char		direction = ' ';
! 	bool		allisequal = true;
! 	OffsetNumber maxoff;
! 	int			nbytes;
  
- 	posL = posR = posB = posT = 0;
  	maxoff = entryvec->n - 1;
  
! 	cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
! 	memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
  
! 	/* find MBR */
! 	for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (allisequal && (
! 						   pageunion.high.x != cur->high.x ||
! 						   pageunion.high.y != cur->high.y ||
! 						   pageunion.low.x != cur->low.x ||
! 						   pageunion.low.y != cur->low.y
! 						   ))
! 			allisequal = false;
! 
! 		adjustBox(&pageunion, cur);
  	}
  
! 	if (allisequal)
  	{
  		/*
! 		 * All entries are the same
  		 */
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
! 	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	listL = (OffsetNumber *) palloc(nbytes);
! 	listR = (OffsetNumber *) palloc(nbytes);
! 	listB = (OffsetNumber *) palloc(nbytes);
! 	listT = (OffsetNumber *) palloc(nbytes);
! 	unionL = (BOX *) palloc(sizeof(BOX));
! 	unionR = (BOX *) palloc(sizeof(BOX));
! 	unionB = (BOX *) palloc(sizeof(BOX));
! 	unionT = (BOX *) palloc(sizeof(BOX));
! 
! #define ADDLIST( list, unionD, pos, num ) do { \
! 	if ( pos ) { \
! 		if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x	= cur->high.x; \
! 		if ( (unionD)->low.x  > cur->low.x	) (unionD)->low.x	= cur->low.x; \
! 		if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y	= cur->high.y; \
! 		if ( (unionD)->low.y  > cur->low.y	) (unionD)->low.y	= cur->low.y; \
! 	} else { \
! 			memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) );	\
! 	} \
! 	(list)[pos] = num; \
! 	(pos)++; \
! } while(0)
  
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
! 	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
! 			ADDLIST(listL, unionL, posL, i);
! 		else
! 			ADDLIST(listR, unionR, posR, i);
! 		if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
! 			ADDLIST(listB, unionB, posB, i);
! 		else
! 			ADDLIST(listT, unionT, posT, i);
! 	}
  
! #define LIMIT_RATIO 0.1
! #define _IS_BADRATIO(x,y)	( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
! #define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
! 	/* bad disposition, try to split by centers of boxes  */
! 	if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  	{
! 		double		avgCenterX = 0.0,
! 					avgCenterY = 0.0;
! 		double		CenterX,
! 					CenterY;
  
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 			avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
  		}
! 
! 		avgCenterX /= maxoff;
! 		avgCenterY /= maxoff;
! 
! 		posL = posR = posB = posT = 0;
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 
! 			CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
  
! 			if (CenterX < avgCenterX)
! 				ADDLIST(listL, unionL, posL, i);
! 			else if (CenterX == avgCenterX)
  			{
! 				if (posL > posR)
! 					ADDLIST(listR, unionR, posR, i);
! 				else
! 					ADDLIST(listL, unionL, posL, i);
  			}
  			else
- 				ADDLIST(listR, unionR, posR, i);
- 
- 			if (CenterY < avgCenterY)
- 				ADDLIST(listB, unionB, posB, i);
- 			else if (CenterY == avgCenterY)
  			{
! 				if (posB > posT)
! 					ADDLIST(listT, unionT, posT, i);
! 				else
! 					ADDLIST(listB, unionB, posB, i);
  			}
- 			else
- 				ADDLIST(listT, unionT, posT, i);
  		}
! 
! 		if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  		{
! 			fallbackSplit(entryvec, v);
! 			PG_RETURN_POINTER(v);
  		}
  	}
  
! 	/* which split more optimal? */
! 	if (Max(posL, posR) < Max(posB, posT))
! 		direction = 'x';
! 	else if (Max(posL, posR) > Max(posB, posT))
! 		direction = 'y';
! 	else
  	{
! 		Datum		interLR = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionL),
! 												  BoxPGetDatum(unionR));
! 		Datum		interBT = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionB),
! 												  BoxPGetDatum(unionT));
! 		double		sizeLR,
! 					sizeBT;
! 
! 		sizeLR = size_box(interLR);
! 		sizeBT = size_box(interBT);
! 
! 		if (sizeLR < sizeBT)
! 			direction = 'x';
! 		else
! 			direction = 'y';
! 	}
  
! 	if (direction == 'x')
! 		chooseLR(v,
! 				 listL, posL, unionL,
! 				 listR, posR, unionR);
! 	else
! 		chooseLR(v,
! 				 listB, posB, unionB,
! 				 listT, posT, unionT);
  
  	PG_RETURN_POINTER(v);
  }
  
--- 237,839 ----
  }
  
  /*
!  * Represents information about an entry that can be placed to either group
!  * without affecting overlap over selected axis ("common entry").
!  */
! typedef struct
! {
! 	/* Index of entry in the initial array */
! 	int			index;
! 	/* Delta between penalties of entry insertion into different groups */
! 	double		delta;
! }	CommonEntry;
! 
! /*
!  * Context for g_box_consider_split. Contains information about currently
!  * selected split and some general information.
!  */
! typedef struct
! {
! 	int			entriesCount;	/* total number of entries being split */
! 	BOX			boundingBox;	/* minimum bounding box across all entries */
! 
! 	/* Information about currently selected split follows */
! 
! 	bool		first;			/* true if no split was selected yet */
! 
! 	double		leftUpper;		/* upper bound of left interval */
! 	double		rightLower;		/* lower bound of right interval */
! 
! 	float4		ratio;
! 	float4		overlap;
! 	int			dim;			/* axis of this split */
! 	double		range;			/* width of general MBR projection to the
! 								 * selected axis */
! }	ConsiderSplitContext;
! 
! /*
!  * Interval represents projection of box to axis.
!  */
! typedef struct
! {
! 	double		lower,
! 				upper;
! }	SplitInterval;
! 
! /*
!  * Interval comparison function by lower bound of the interval;
!  */
! static int
! interval_cmp_lower(const void *i1, const void *i2)
! {
! 	double		lower1 = ((SplitInterval *) i1)->lower,
! 				lower2 = ((SplitInterval *) i2)->lower;
! 
! 	if (lower1 < lower2)
! 		return -1;
! 	else if (lower1 > lower2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Interval comparison function by upper bound of the interval;
!  */
! static int
! interval_cmp_upper(const void *i1, const void *i2)
! {
! 	double		upper1 = ((SplitInterval *) i1)->upper,
! 				upper2 = ((SplitInterval *) i2)->upper;
! 
! 	if (upper1 < upper2)
! 		return -1;
! 	else if (upper1 > upper2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Replace negative value with zero.
!  */
! static inline float
! non_negative(float val)
! {
! 	if (val >= 0.0f)
! 		return val;
! 	else
! 		return 0.0f;
! }
! 
! /*
!  * Consider replacement of currently selected split with the better one.
!  */
! static void inline
! g_box_consider_split(ConsiderSplitContext *context, int dimNum,
! 					 double rightLower, int minLeftCount,
! 					 double leftUpper, int maxLeftCount)
! {
! 	int			leftCount,
! 				rightCount;
! 	float4		ratio,
! 				overlap;
! 	double		range;
! 
! 	/*
! 	 * Calculate entries distribution ratio assuming most uniform distribution
! 	 * of common entries.
! 	 */
! 	if (minLeftCount >= (context->entriesCount + 1) / 2)
! 	{
! 		leftCount = minLeftCount;
! 	}
! 	else
! 	{
! 		if (maxLeftCount <= context->entriesCount / 2)
! 			leftCount = maxLeftCount;
! 		else
! 			leftCount = context->entriesCount / 2;
! 	}
! 	rightCount = context->entriesCount - leftCount;
! 
! 	/*
! 	 * Ratio of split - quotient between size of lesser group and total
! 	 * entries count.
! 	 */
! 	ratio = ((float4) Min(leftCount, rightCount)) /
! 		((float4) context->entriesCount);
! 
! 	if (ratio > LIMIT_RATIO)
! 	{
! 		bool		selectthis = false;
! 
! 		/*
! 		 * The ratio is acceptable, so compare current split with previously
! 		 * selected one. Between splits of one dimension we search for minimal
! 		 * overlap (allowing negative values) and minimal ration (between same
! 		 * overlaps. We switch dimension if find less overlap (non-negative)
! 		 * or less range with same overlap.
! 		 */
! 		if (dimNum == 0)
! 			range = context->boundingBox.high.x - context->boundingBox.low.x;
! 		else
! 			range = context->boundingBox.high.y - context->boundingBox.low.y;
! 
! 		overlap = (leftUpper - rightLower) / range;
! 
! 		/* If there is no previous selection, select this */
! 		if (context->first)
! 			selectthis = true;
! 		else if (context->dim == dimNum)
! 		{
! 			/*
! 			 * Within the same dimension, choose the new split if it has a
! 			 * smaller overlap, or same overlap but better ratio.
! 			 */
! 			if (overlap < context->overlap ||
! 				(overlap == context->overlap && ratio > context->ratio))
! 				selectthis = true;
! 		}
! 		else
! 		{
! 			/*
! 			 * Across dimensions, choose the new split if it has a smaller
! 			 * *non-negative* overlap, or same *non-negative* overlap but
! 			 * bigger range. This condition differs from the one described in
! 			 * the article. On the datasets where leaf MBRs don't overlap
! 			 * themselves, non-overlapping splits (i.e. splits which have zero
! 			 * *non-negative* overlap) are frequently possible. In this case
! 			 * splits tends to be along one dimension, because most distant
! 			 * non-overlapping splits (i.e. having lowest negative overlap)
! 			 * appears to be in the same dimension as in the previous split.
! 			 * Therefore MBRs appear to be very prolonged along another
! 			 * dimension, which leads to bad search performance. Using range
! 			 * as the second split criteria makes MBRs more quadratic. Using
! 			 * *non-negative* overlap instead of overlap as the first split
! 			 * criteria gives to range criteria a chance to matter, because
! 			 * non-overlapping splits are equivalent in this criteria.
! 			 */
! 			if (non_negative(overlap) < non_negative(context->overlap) ||
! 				(range > context->range &&
! 				 non_negative(overlap) <= non_negative(context->overlap)))
! 				selectthis = true;
! 		}
! 
! 		if (selectthis)
! 		{
! 			/* save information about selected split */
! 			context->first = false;
! 			context->ratio = ratio;
! 			context->range = range;
! 			context->overlap = overlap;
! 			context->rightLower = rightLower;
! 			context->leftUpper = leftUpper;
! 			context->dim = dimNum;
! 		}
! 	}
! }
! 
! /*
!  * Return increase of original BOX area by new BOX area insertion.
!  */
! static double
! box_penalty(BOX *original, BOX *new)
! {
! 	double		union_width,
! 				union_height;
! 
! 	union_width = Max(original->high.x, new->high.x) -
! 		Min(original->low.x, new->low.x);
! 	union_height = Max(original->high.y, new->high.y) -
! 		Min(original->low.y, new->low.y);
! 	return union_width * union_height - (original->high.x - original->low.x) *
! 		(original->high.y - original->low.y);
! }
! 
! /*
!  * Compare common entries by their deltas.
!  */
! static int
! common_entry_cmp(const void *i1, const void *i2)
! {
! 	double		delta1 = ((CommonEntry *) i1)->delta,
! 				delta2 = ((CommonEntry *) i2)->delta;
! 
! 	if (delta1 < delta2)
! 		return -1;
! 	else if (delta1 > delta2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * --------------------------------------------------------------------------
!  * Double sorting split algorithm. This is used for both boxes and points.
   *
!  * The algorithm finds split of boxes by considering splits along each axis.
!  * Each entry is first projected as an interval on the X-axis, and different
!  * ways to split the intervals into two groups are considered, trying to
!  * minimize the overlap of the groups. Then the same is repeated for the
!  * Y-axis, and the overall best split is chosen. The quality of a split is
!  * determined by overlap along that axis and some other criteria (see
!  * g_box_consider_split).
   *
!  * After that, all the entries are divided into three groups:
!  *
!  * 1) Entries which should be placed to the left group
!  * 2) Entries which should be placed to the right group
!  * 3) "Common entries" which can be placed to any of groups without affecting
!  *	  of overlap along selected axis.
!  *
!  * The common entries are distributed by minimizing penalty.
!  *
!  * For details see:
!  * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
!  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
!  * --------------------------------------------------------------------------
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i,
! 				maxoff;
! 	ConsiderSplitContext context;
! 	BOX		   *box,
! 			   *leftBox,
! 			   *rightBox;
! 	int			dim,
! 				commonEntriesCount;
! 	SplitInterval *intervalsLower,
! 			   *intervalsUpper;
! 	CommonEntry *commonEntries;
! 	int			nentries;
! 
! 	memset(&context, 0, sizeof(ConsiderSplitContext));
  
  	maxoff = entryvec->n - 1;
+ 	nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
  
! 	/* Allocate arrays for intervals along axes */
! 	intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
! 	intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
  
! 	/*
! 	 * Calculate the overall minimum bounding box over all the entries.
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (i == FirstOffsetNumber)
! 			context.boundingBox = *box;
! 		else
! 			adjustBox(&context.boundingBox, box);
  	}
  
! 	/*
! 	 * Iterate over axes for optimal split searching.
! 	 */
! 	context.first = true;		/* nothing selected yet */
! 	for (dim = 0; dim < 2; dim++)
  	{
+ 		double		leftUpper,
+ 					rightLower;
+ 		int			i1,
+ 					i2;
+ 
+ 		/* Project each entry as an interval on the selected axis. */
+ 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ 		{
+ 			box = DatumGetBoxP(entryvec->vector[i].key);
+ 			if (dim == 0)
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ 			}
+ 			else
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ 			}
+ 		}
+ 
+ 		/*
+ 		 * Make two arrays of intervals: one sorted by lower bound and another
+ 		 * sorted by upper bound.
+ 		 */
+ 		memcpy(intervalsUpper, intervalsLower,
+ 			   sizeof(SplitInterval) * nentries);
+ 		qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_lower);
+ 		qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_upper);
+ 
+ 		/*----
+ 		 * The goal is to form a left and right interval, so that every entry
+ 		 * interval is contained by either left or right interval (or both).
+ 		 *
+ 		 * For example, with the intervals (0,1), (1,3), (2,3), (2,4):
+ 		 *
+ 		 * 0 1 2 3 4
+ 		 * +-+
+ 		 *	 +---+
+ 		 *	   +-+
+ 		 *	   +---+
+ 		 *
+ 		 * The left and right intervals are of the form (0,a) and (b,4).
+ 		 * We first consider splits where b is the lower bound of an entry.
+ 		 * We iterate through all entries, and for each b, calculate the
+ 		 * smallest possible a. Then we consider splits where a is the
+ 		 * uppper bound of an entry, and for each a, calculate the greatest
+ 		 * possible b.
+ 		 *
+ 		 * In the above example, the first loop would consider splits:
+ 		 * b=0: (0,1)-(0,4)
+ 		 * b=1: (0,1)-(1,4)
+ 		 * b=2: (0,3)-(2,4)
+ 		 *
+ 		 * And the second loop:
+ 		 * a=1: (0,1)-(1,4)
+ 		 * a=3: (0,3)-(2,4)
+ 		 * a=4: (0,4)-(2,4)
+ 		 */
+ 
+ 		/*
+ 		 * Iterate over lower bound of right group, finding smallest possible
+ 		 * upper bound of left group.
+ 		 */
+ 		i1 = 0;
+ 		i2 = 0;
+ 		rightLower = intervalsLower[i1].lower;
+ 		leftUpper = intervalsUpper[i2].lower;
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next lower bound of right group.
+ 			 */
+ 			while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ 			{
+ 				leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ 				i1++;
+ 			}
+ 			if (i1 >= nentries)
+ 				break;
+ 			rightLower = intervalsLower[i1].lower;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * left group.
+ 			 */
+ 			while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ 				i2++;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ 		}
+ 
  		/*
! 		 * Iterate over upper bound of left group finding greates possible
! 		 * lower bound of right group.
  		 */
+ 		i1 = nentries - 1;
+ 		i2 = nentries - 1;
+ 		rightLower = intervalsLower[i1].upper;
+ 		leftUpper = intervalsUpper[i2].upper;
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next upper bound of left group.
+ 			 */
+ 			while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ 			{
+ 				rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ 				i2--;
+ 			}
+ 			if (i2 < 0)
+ 				break;
+ 			leftUpper = intervalsUpper[i2].upper;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * right group.
+ 			 */
+ 			while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ 				i1--;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim,
+ 								 rightLower, i1 + 1, leftUpper, i2 + 1);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * If we failed to find any acceptable splits, use trivial split.
+ 	 */
+ 	if (context.first)
+ 	{
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
! 	/*
! 	 * Ok, we have now selected the split across one axis.
! 	 *
! 	 * While considering the splits, we already determined that there will be
! 	 * enough entries in both groups to reach the desired ratio, but we did
! 	 * not memorize which entries go to which group. So determine that now.
! 	 */
  
! 	/* Allocate vectors for results */
! 	v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
! 	v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
! 	v->spl_nleft = 0;
! 	v->spl_nright = 0;
! 
! 	/* Allocate bounding boxes of left and right groups */
! 	leftBox = palloc0(sizeof(BOX));
! 	rightBox = palloc0(sizeof(BOX));
  
! 	/*
! 	 * Allocate an array for "common entries" - entries which can be placed to
! 	 * either group without affecting overlap along selected axis.
! 	 */
! 	commonEntriesCount = 0;
! 	commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
! 
! 	/* Helper macros to place an entry in the left or right group */
! #define PLACE_LEFT(box, off)					\
! 	do {										\
! 		if (v->spl_nleft > 0)					\
! 			adjustBox(leftBox, box);			\
! 		else									\
! 			*leftBox = *(box);					\
! 		v->spl_left[v->spl_nleft++] = off;		\
! 	} while(0)
! 
! #define PLACE_RIGHT(box, off)					\
! 	do {										\
! 		if (v->spl_nright > 0)					\
! 			adjustBox(rightBox, box);			\
! 		else									\
! 			*rightBox = *(box);					\
! 		v->spl_right[v->spl_nright++] = off;	\
! 	} while(0)
! 
! 	/*
! 	 * Distribute entries which can be distributed unambiguously, and collect
! 	 * common entries.
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		double		lower,
! 					upper;
  
! 		/*
! 		 * Get upper and lower bounds along selected axis.
! 		 */
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (context.dim == 0)
  		{
! 			lower = box->low.x;
! 			upper = box->high.x;
  		}
! 		else
  		{
! 			lower = box->low.y;
! 			upper = box->high.y;
! 		}
  
! 		if (upper <= context.leftUpper)
! 		{
! 			/* Fits to the left group */
! 			if (lower >= context.rightLower)
  			{
! 				/* Fits also to the right group, so "common entry" */
! 				commonEntries[commonEntriesCount++].index = i;
  			}
  			else
  			{
! 				/* Doesn't fit to the right group, so join to the left group */
! 				PLACE_LEFT(box, i);
  			}
  		}
! 		else
  		{
! 			/*
! 			 * Each entry should fit on either left or right group. Since this
! 			 * entry didn't fit on the left group, it better fit in the right
! 			 * group.
! 			 */
! 			Assert(lower >= context.rightLower);
! 
! 			/* Doesn't fit to the left group, so join to the right group */
! 			PLACE_RIGHT(box, i);
  		}
  	}
  
! 	/*
! 	 * Distribute "common entries", if any.
! 	 */
! 	if (commonEntriesCount > 0)
  	{
! 		/*
! 		 * Calculate minimum number of entries that must be placed in both
! 		 * groups, to reach LIMIT_RATIO.
! 		 */
! 		int			m = ceil(LIMIT_RATIO * (double) nentries);
  
! 		/*
! 		 * Calculate delta between penalties of join "common entries" to
! 		 * different groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 			commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
! 										 box_penalty(rightBox, box));
! 		}
! 
! 		/*
! 		 * Sort "common entries" by calculated deltas in order to distribute
! 		 * the most ambiguous entries first.
! 		 */
! 		qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
! 
! 		/*
! 		 * Distribute "common entries" between groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 
! 			/*
! 			 * Check if we have to place this entry in either group to achieve
! 			 * LIMIT_RATIO.
! 			 */
! 			if (v->spl_nleft + (commonEntriesCount - i) <= m)
! 				PLACE_LEFT(box, commonEntries[i].index);
! 			else if (v->spl_nright + (commonEntriesCount - i) <= m)
! 				PLACE_RIGHT(box, commonEntries[i].index);
! 			else
! 			{
! 				/* Otherwise select the group by minimal penalty */
! 				if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
! 					PLACE_LEFT(box, commonEntries[i].index);
! 				else
! 					PLACE_RIGHT(box, commonEntries[i].index);
! 			}
! 		}
! 	}
  
+ 	v->spl_ldatum = PointerGetDatum(leftBox);
+ 	v->spl_rdatum = PointerGetDatum(rightBox);
  	PG_RETURN_POINTER(v);
  }
  
test_result.sql.gzapplication/x-gzip; name=test_result.sql.gzDownload
#22Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#21)
Re: Double sorting split patch

On 05.10.2011 15:59, Alexander Korotkov wrote:

Path without allocating extra bytes is attached.
I run some more detailed tests on geonames and two smaller datasets from
rtreeportal.org.

Ok, thanks. Looks good to me now, so committed.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#23Alexander Korotkov
aekorotkov@gmail.com
In reply to: Heikki Linnakangas (#22)
Re: Double sorting split patch

On Thu, Oct 6, 2011 at 11:06 AM, Heikki Linnakangas <
heikki.linnakangas@enterprisedb.com> wrote:

On 05.10.2011 15:59, Alexander Korotkov wrote:

Path without allocating extra bytes is attached.
I run some more detailed tests on geonames and two smaller datasets from
rtreeportal.org.

Ok, thanks. Looks good to me now, so committed.

Thanks. I'm going to continue work on application of this split method in
following areas:
1) range types
2) seg contrib module
3) cube contrib module (there situation is not so easy, probably some
heuristic of split method selection would be required)
Do you think that separation of some common parts of algorithm implemetation
is resonable in order to avoid code duplication? For example, different
application of algorithm could share function of splits enumeration along
axis which takes pointer to consider_split as an argument.

------
With best regards,
Alexander Korotkov.

#24Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alexander Korotkov (#23)
Re: Double sorting split patch

On 06.10.2011 11:22, Alexander Korotkov wrote:

Thanks. I'm going to continue work on application of this split method in
following areas:
1) range types
2) seg contrib module
3) cube contrib module (there situation is not so easy, probably some
heuristic of split method selection would be required)
Do you think that separation of some common parts of algorithm implemetation
is resonable in order to avoid code duplication? For example, different
application of algorithm could share function of splits enumeration along
axis which takes pointer to consider_split as an argument.

Dunno. I'd suggest that you just copy-paste the code for now, and we'll
see how much duplication there is in the end.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com