[WiP] GiST intrapage indexing
Hi, hackers!
Few years ago I've posted proposal [0]/messages/by-id/CAJEAwVE0rrr+OBT-P0gDCtXbVDkBBG_WcXwCBK=GHo4fewu3Yg@mail.gmail.com </messages/by-id/CAJEAwVE0rrr+OBT-P0gDCtXbVDkBBG_WcXwCBK=GHo4fewu3Yg@mail.gmail.com> to improve GiST performance by building small GiST inside each GiST page.
Currently each page is just an unordered vector of tuples. Scan is testing each tuple one by one. This causes suboptimal performance for both inserts and searches.
I've attached patch with intrapage indexing to this message.
==How it works==
The patch implements two-level scheme (thanks to Heikki for the suggestion to give up multilevel scheme - it'd take forever to debug). Among regular tuples, page contains so-called skip tuples. Skip tuple does not have tid, but instead has counter of tuples, whose keys this tuple covers (thanks to Alexander for this suggestions - earliest versions of the patch had to pg_upgrade).
When the page is scanned, if key of skip tuples does not match - all the group is skipped.
When GiST is choosing subtree - if given skiptuple has penalty more than already found - it is skipped. Here I rely on the assumption that adjusted (extended) key have penalty always less or equal to not adjusted tuple (each tuple in it's group).
When GiST is splitting page - if possible it splits by skipgroups.
Currently, skip tuples are placed only on intrapages. This is a tradeoff in favor of inserts and scans, searching for big amount of data.
Skiptuples are first formed when new root is created. Then, skiptuple count grows when group overgrowth threshold of 16 tuples - it is splitted by regular split algorithm.
==TODO==
1. Buffered build does not create skip tuples as for now
2. KNN searches do not take advantage of skiptuples. Current approach of KNN scan has node lookup overhead suitable for page-size nodes, but in case of skipgroups this overhead may be just too much. I've not tested though. I'm trying to compose some lean hacks for KNN with skipgroups.
3. More scrupulous performance testing needed
===Performance=
Currently I observe 30-40% gain in performance of inserts, independently of datatype and distribution.
On grid-alligned points I observe about 5-10% improvement of searches.
On randomized points I do not observe statistically significant improvement of searches.
I expect that this technology will protect GiST from degradation on big block sizes (thanks to Oleg for suggestion)
Currently, patch is stable, passes all my test and checkworlds, WAL seems to work. Though, it is "work in progress".
Best regards, Andrey Borodin.
[0]: /messages/by-id/CAJEAwVE0rrr+OBT-P0gDCtXbVDkBBG_WcXwCBK=GHo4fewu3Yg@mail.gmail.com </messages/by-id/CAJEAwVE0rrr+OBT-P0gDCtXbVDkBBG_WcXwCBK=GHo4fewu3Yg@mail.gmail.com>
Attachments:
0001-GiST-intrapage-indexing-v1.patchapplication/octet-stream; name=0001-GiST-intrapage-indexing-v1.patch; x-unix-mode=0644Download
From 06b880dbc24eef6ffd6ed0ddf313c36164e6efb4 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Mon, 5 Feb 2018 15:34:42 +0300
Subject: [PATCH] GiST intrapage indexing v1
---
src/backend/access/gist/gist.c | 384 +++++++++++++++++++++++++++++++----
src/backend/access/gist/gistbuild.c | 15 +-
src/backend/access/gist/gistget.c | 9 +
src/backend/access/gist/gistutil.c | 250 +++++++++++++++--------
src/backend/access/gist/gistvacuum.c | 5 +-
src/backend/access/gist/gistxlog.c | 36 +++-
src/include/access/gist_private.h | 31 ++-
src/include/access/gistxlog.h | 5 +-
8 files changed, 603 insertions(+), 132 deletions(-)
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 51c32e4afe..ec73b232f4 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -33,7 +33,7 @@ static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate,
IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
Buffer leftchild, Buffer rightchild,
- bool unlockbuf, bool unlockleftchild);
+ bool unlockbuf, bool unlockleftchild, int ndeltup, OffsetNumber skipoffnum);
static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate, List *splitinfo, bool releasebuf);
static void gistvacuumpage(Relation rel, Page page, Buffer buffer);
@@ -215,7 +215,9 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
BlockNumber *newblkno,
Buffer leftchildbuf,
List **splitinfo,
- bool markfollowright)
+ bool markfollowright,
+ int ndeltup,
+ OffsetNumber skipoffnum)
{
BlockNumber blkno = BufferGetBlockNumber(buffer);
Page page = BufferGetPage(buffer);
@@ -248,7 +250,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
* one-element todelete array; in the split case, it's handled implicitly
* because the tuple vector passed to gistSplit won't include this tuple.
*/
- is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+ is_split = gistnospace(page, itup, ntup, oldoffnum, freespace, ndeltup);
/*
* If leaf page is full, try at first to delete dead tuples. And then
@@ -257,7 +259,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
{
gistvacuumpage(rel, page, buffer);
- is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+ is_split = gistnospace(page, itup, ntup, oldoffnum, freespace, ndeltup);
}
if (is_split)
@@ -280,17 +282,38 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
* remove the old version from the vector.
*/
itvec = gistextractpage(page, &tlen);
- if (OffsetNumberIsValid(oldoffnum))
+
+ if (OffsetNumberIsValid(skipoffnum))
+ {
+ IndexTuple skiptuple = itvec[skipoffnum - FirstOffsetNumber];
+ int newskipgroupsize = GistTupleGetSkipCount(skiptuple) + ntup - ndeltup;
+
+ Assert(GistTupleIsSkip(skiptuple));
+ Assert(skipoffnum + newskipgroupsize <= PageGetMaxOffsetNumber(page) + ntup - ndeltup);
+ GistTupleSetSkipCount(skiptuple, newskipgroupsize);
+ }
+
+ if (OffsetNumberIsValid(oldoffnum) && ndeltup)
{
- /* on inner page we should remove old tuple */
+ /* on inner page we should remove old tuples */
int pos = oldoffnum - FirstOffsetNumber;
- tlen--;
+ tlen -= ndeltup;
if (pos != tlen)
- memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
+ memmove(itvec + pos, itvec + pos + ndeltup, sizeof(IndexTuple) * (tlen - pos));
+ }
+
+ itvec = gistjoinvector(itvec, &tlen, itup, ntup, oldoffnum);
+
+ if (GistTupleIsSkip(*itvec))
+ {
+ dist = gistSplitBySkipgroup(rel, page, itvec, tlen, giststate);
+ }
+ if (dist == NULL)
+ {
+ gistfiltervector(itvec, &tlen);
+ dist = gistSplit(rel, page, itvec, tlen, giststate);
}
- itvec = gistjoinvector(itvec, &tlen, itup, ntup);
- dist = gistSplit(rel, page, itvec, tlen, giststate);
/*
* Check that split didn't produce too many pages.
@@ -357,7 +380,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
if (is_rootsplit)
{
IndexTuple *downlinks;
- int ndownlinks = 0;
+ int ndownlinks = 1;
int i;
rootpg.buffer = buffer;
@@ -368,8 +391,11 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
for (ptr = dist; ptr; ptr = ptr->next)
ndownlinks++;
downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
- for (i = 0, ptr = dist; ptr; ptr = ptr->next)
+ for (i = 1, ptr = dist; ptr; ptr = ptr->next)
downlinks[i++] = ptr->itup;
+ downlinks[0] = gistunion(rel,downlinks + 1, ndownlinks - 1, giststate);
+ GistTupleMakeSkip(downlinks[0]);
+ GistTupleSetSkipCount(downlinks[0], ndownlinks - 1);
rootpg.block.blkno = GIST_ROOT_BLKNO;
rootpg.block.num = ndownlinks;
@@ -505,26 +531,37 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
*/
START_CRIT_SECTION();
+ if (OffsetNumberIsValid(skipoffnum))
+ {
+ IndexTuple skiptuple = (IndexTuple) PageGetItem(page, PageGetItemId(page, skipoffnum));
+ int newskipgroupsize = GistTupleGetSkipCount(skiptuple) + ntup - ndeltup;
+
+ Assert(GistTupleIsSkip(skiptuple));
+ Assert(skipoffnum + newskipgroupsize <= PageGetMaxOffsetNumber(page) + ntup - ndeltup);
+ GistTupleSetSkipCount(skiptuple, newskipgroupsize);
+ }
+
/*
* Delete old tuple if any, then insert new tuple(s) if any. If
* possible, use the fast path of PageIndexTupleOverwrite.
*/
if (OffsetNumberIsValid(oldoffnum))
{
- if (ntup == 1)
+ int noverwrite = Min(ntup,ndeltup);
+ for (i = 0; i < noverwrite; i++)
{
- /* One-for-one replacement, so use PageIndexTupleOverwrite */
- if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
- IndexTupleSize(*itup)))
+ if (!PageIndexTupleOverwrite(page, oldoffnum + i, (Item) itup[i],
+ IndexTupleSize(itup[i])))
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(rel));
}
- else
+
+ for (i = noverwrite; i < ndeltup; i++)
{
- /* Delete old, then append new tuple(s) to page */
- PageIndexTupleDelete(page, oldoffnum);
- gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
+ PageIndexTupleDelete(page, oldoffnum + i);
}
+
+ gistfillbuffer(page, itup + noverwrite, ntup - noverwrite, oldoffnum + noverwrite);
}
else
{
@@ -540,17 +577,18 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
if (RelationNeedsWAL(rel))
{
OffsetNumber ndeloffs = 0,
- deloffs[1];
+ deloffs[BLCKSZ /sizeof(ItemIdData)];
if (OffsetNumberIsValid(oldoffnum))
{
- deloffs[0] = oldoffnum;
- ndeloffs = 1;
+ for (i = 0; i < ndeltup; i++)
+ deloffs[i] = oldoffnum + i;
+ ndeloffs = ndeltup;
}
recptr = gistXLogUpdate(buffer,
deloffs, ndeloffs, itup, ntup,
- leftchildbuf);
+ leftchildbuf, skipoffnum);
PageSetLSN(page, recptr);
}
@@ -685,6 +723,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
if (!GistPageIsLeaf(stack->page))
{
+ OffsetNumber skipoffnum;
/*
* This is an internal page so continue to walk down the tree.
* Find the child node that has the minimum insertion penalty.
@@ -694,9 +733,71 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
GISTInsertStack *item;
OffsetNumber downlinkoffnum;
- downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
+ downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate, &skipoffnum);
+
+ if (OffsetNumberIsValid(skipoffnum))
+ {
+ iid = PageGetItemId(stack->page, skipoffnum);
+ idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+ Assert(GistTupleIsSkip(idxtuple));
+
+ /*
+ * Check that the key representing the target child node is
+ * consistent with the key we're inserting. Update it if it's not.
+ */
+ newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
+ if (newtup)
+ {
+ /*
+ * Swap shared lock for an exclusive one. Beware, the page may
+ * change while we unlock/lock the page...
+ */
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ stack->page = (Page) BufferGetPage(stack->buffer);
+
+ if (PageGetLSN(stack->page) != stack->lsn)
+ {
+ /* the page was changed while we unlocked it, retry */
+ continue;
+ }
+ }
+
+ /*
+ * Update the tuple.
+ *
+ * We still hold the lock after gistinserttuple(), but it
+ * might have to split the page to make the updated tuple fit.
+ * In that case the updated tuple might migrate to the other
+ * half of the split, so we have to go back to the parent and
+ * descend back to the half that's a better fit for the new
+ * tuple.
+ */
+ if (gistinserttuple(&state, stack, giststate, newtup,
+ skipoffnum))
+ {
+ /*
+ * If this was a root split, the root page continues to be
+ * the parent and the updated tuple went to one of the
+ * child pages, so we just need to retry from the root
+ * page.
+ */
+ if (stack->blkno != GIST_ROOT_BLKNO)
+ {
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ }
+ continue;
+ }
+ }
+ }
iid = PageGetItemId(stack->page, downlinkoffnum);
idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+ Assert(!GistTupleIsSkip(idxtuple));
childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
/*
@@ -762,6 +863,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
continue;
}
}
+
LockBuffer(stack->buffer, GIST_UNLOCK);
xlocked = false;
@@ -770,6 +872,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
item->blkno = childblkno;
item->parent = stack;
item->downlinkoffnum = downlinkoffnum;
+ item->skipoffnum = skipoffnum;
state.stack = stack = item;
}
else
@@ -983,10 +1086,18 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
while (true)
{
maxoff = PageGetMaxOffsetNumber(parent->page);
+ child->skipoffnum = InvalidOffsetNumber;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
iid = PageGetItemId(parent->page, i);
idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
+
+ if (GistTupleIsSkip(idxtuple))
+ {
+ child->skipoffnum = i;
+ continue;
+ }
+
if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
{
/* yes!!, found */
@@ -994,6 +1105,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
return;
}
}
+ child->skipoffnum = InvalidOffsetNumber;
parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
UnlockReleaseBuffer(parent->buffer);
@@ -1174,7 +1286,7 @@ gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
{
return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
- InvalidBuffer, InvalidBuffer, false, false);
+ InvalidBuffer, InvalidBuffer, false, false, 1, InvalidOffsetNumber);
}
/* ----------------
@@ -1208,7 +1320,7 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate,
IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
Buffer leftchild, Buffer rightchild,
- bool unlockbuf, bool unlockleftchild)
+ bool unlockbuf, bool unlockleftchild, int ndeltup, OffsetNumber skipoffnum)
{
List *splitinfo;
bool is_split;
@@ -1220,8 +1332,9 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
oldoffnum, NULL,
leftchild,
&splitinfo,
- true);
-
+ true,
+ ndeltup,
+ skipoffnum);
/*
* Before recursing up in case the page was split, release locks on the
* child pages. We don't need to keep them locked when updating the
@@ -1246,6 +1359,85 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
return is_split;
}
+inline void
+gistcheckskippage(Page page)
+{
+ OffsetNumber i,
+ maxoff;
+ int skiplast = 0;
+ bool wasskip = false;
+ Assert(false);
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ IndexTuple tup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+
+ if (GistTupleIsSkip(tup))
+ {
+ if (skiplast)
+ {
+ elog(NOTICE, "GS: at % d expecting %d more tuples but have skipgroup %d",i, skiplast,GistTupleGetSkipCount(tup));
+ }
+ Assert(skiplast == 0);
+ skiplast = GistTupleGetSkipCount(tup);
+ wasskip = true;
+ }
+ else
+ {
+ if (!wasskip)
+ continue;
+ if (skiplast<=0)
+ {
+ elog(NOTICE, "GS: fail at %d expecting more skips", i);
+ }
+ Assert(skiplast>0);
+ skiplast--;
+ }
+ }
+}
+
+static void
+gisttestskipgroup(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, OffsetNumber skipoffnum)
+{
+ Page page = BufferGetPage(stack->buffer);
+ IndexTuple skiptuple = (IndexTuple) PageGetItem(page, PageGetItemId(page, skipoffnum));
+ int skipsize;
+
+ Assert(!GistPageIsLeaf(page));
+ Assert(GistTupleIsSkip(skiptuple));
+ skipsize = GistTupleGetSkipCount(skiptuple);
+ Assert((skipsize > 0) && (skipsize < BLCKSZ / sizeof(ItemIdData)));
+ if (skipsize > GIST_SKIPGROUP_THRESHOLD)
+ {
+ IndexTuple* itvec = gistextractrange(page, skipoffnum+1, skipsize);
+ SplitedPageLayout *ptr;
+ SplitedPageLayout *dist = gistSplit(state->r, page, itvec, skipsize, giststate);
+ int totalsize = 0;
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ char *data;
+ int i;
+ itvec[totalsize++] = ptr->itup;
+ GistTupleMakeSkip(ptr->itup);
+ GistTupleSetSkipCount(ptr->itup, ptr->block.num);
+ data = (char *) (ptr->list);
+
+ for (i = 0; i < ptr->block.num; i++)
+ {
+ IndexTuple thistup = (IndexTuple) data;
+
+ itvec[totalsize++] = thistup;
+
+ data += IndexTupleSize(thistup);
+ }
+ }
+ gistinserttuples(state, stack, giststate, itvec, totalsize, skipoffnum,
+ InvalidBuffer, InvalidBuffer, false, false, skipsize + 1, InvalidOffsetNumber);
+ }
+}
+
/*
* Finish an incomplete split by inserting/updating the downlinks in parent
* page. 'splitinfo' contains all the child pages involved in the split,
@@ -1264,6 +1456,7 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
GISTPageSplitInfo *right;
GISTPageSplitInfo *left;
IndexTuple tuples[2];
+ bool lastmomentsplit;
/* A split always contains at least two halves */
Assert(list_length(splitinfo) >= 2);
@@ -1297,8 +1490,8 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
if (gistinserttuples(state, stack->parent, giststate,
&right->downlink, 1,
- InvalidOffsetNumber,
- left->buf, right->buf, false, false))
+ stack->downlinkoffnum,
+ left->buf, right->buf, false, false, 0, stack->skipoffnum))
{
/*
* If the parent page was split, need to relocate the original
@@ -1320,13 +1513,19 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
*/
tuples[0] = left->downlink;
tuples[1] = right->downlink;
- gistinserttuples(state, stack->parent, giststate,
+ lastmomentsplit = gistinserttuples(state, stack->parent, giststate,
tuples, 2,
stack->downlinkoffnum,
left->buf, right->buf,
- true, /* Unlock parent */
- unlockbuf /* Unlock stack->buffer if caller wants that */
+ false, /* Unlock parent */
+ unlockbuf, /* Unlock stack->buffer if caller wants that */
+ 1,
+ stack->skipoffnum
);
+ if (OffsetNumberIsValid(stack->skipoffnum) && !lastmomentsplit)
+ gisttestskipgroup(state,stack->parent, giststate, stack->skipoffnum);
+
+ LockBuffer(stack->parent->buffer, GIST_UNLOCK);
Assert(left->buf == stack->buffer);
}
@@ -1417,6 +1616,121 @@ gistSplit(Relation r,
return res;
}
+SplitedPageLayout *
+gistSplitBySkipgroup(Relation r,
+ Page page,
+ IndexTuple *itup, /* contains compressed entry */
+ int len,
+ GISTSTATE *giststate)
+{
+ IndexTuple *lvectup,
+ *rvectup,
+ *skiptuples;
+ OffsetNumber *skipoffsets;
+ GistSplitVector v;
+ int i, o, p;
+ int skipcount = 0;
+ SplitedPageLayout *res = NULL;
+
+ /* this should never recurse very deeply, but better safe than sorry */
+ check_stack_depth();
+
+ skiptuples = (IndexTuple *) palloc(sizeof(IndexTuple) * len);
+ skipoffsets = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
+ for (i = 0; i < len; i++)
+ {
+ if (GistTupleIsSkip(itup[i]))
+ {
+ skipoffsets[skipcount] = i;
+ skiptuples[skipcount++] = itup[i];
+ }
+ }
+
+ /*
+ * If a single skipgroup doesn't fit on a page, no amount of splitting will
+ * help.
+ */
+ if (skipcount <= 1)
+ return NULL;
+
+ memset(v.spl_lisnull, true, sizeof(bool) * giststate->tupdesc->natts);
+ memset(v.spl_risnull, true, sizeof(bool) * giststate->tupdesc->natts);
+ gistSplitByKey(r, page, skiptuples, skipcount, giststate, &v, 0);
+
+ /* form left and right vector */
+ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+ rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+
+ o = 0;
+ for (i = 0; i < v.splitVector.spl_nleft; i++)
+ {
+ int index = skipoffsets[v.splitVector.spl_left[i] - 1];
+ Assert(GistTupleIsSkip(itup[index]));
+ int skipsize = GistTupleGetSkipCount(itup[index]);
+ for (p = 0; p <= skipsize; p++)
+ {
+ Assert(p==0 || !GistTupleIsSkip(itup[index + p]));
+ lvectup[o++] = itup[index + p];
+ }
+ }
+ v.splitVector.spl_nleft = o;
+
+ o = 0;
+ for (i = 0; i < v.splitVector.spl_nright; i++)
+ {
+ int index = skipoffsets[v.splitVector.spl_right[i] - 1];
+ Assert(GistTupleIsSkip(itup[index]));
+ int skipsize = GistTupleGetSkipCount(itup[index]);
+ for (p = 0; p <= skipsize; p++)
+ {
+ Assert(p==0 || !GistTupleIsSkip(itup[index + p]));
+ rvectup[o++] = itup[index + p];
+ }
+ }
+ v.splitVector.spl_nright = o;
+
+ /* finalize splitting (may need another split) */
+ if (!gistfitpage(rvectup, v.splitVector.spl_nright))
+ {
+ res = gistSplitBySkipgroup(r, page, rvectup, v.splitVector.spl_nright, giststate);
+ if (res == NULL)
+ return NULL;
+ }
+ else
+ {
+ ROTATEDIST(res);
+ res->block.num = v.splitVector.spl_nright;
+ res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
+ res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
+ }
+
+ if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
+ {
+ SplitedPageLayout *resptr,
+ *subres;
+
+ resptr = subres = gistSplitBySkipgroup(r, page, lvectup, v.splitVector.spl_nleft, giststate);
+ if (subres == NULL)
+ return NULL;
+
+ /* install on list's tail */
+ while (resptr->next)
+ resptr = resptr->next;
+
+ resptr->next = res;
+ res = subres;
+ }
+ else
+ {
+ ROTATEDIST(res);
+ res->block.num = v.splitVector.spl_nleft;
+ res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
+ res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
+ }
+
+ return res;
+}
+
/*
* Create a GISTSTATE and fill it with information about the index
*/
@@ -1579,7 +1893,7 @@ gistvacuumpage(Relation rel, Page page, Buffer buffer)
recptr = gistXLogUpdate(buffer,
deletable, ndeletable,
- NULL, 0, InvalidBuffer);
+ NULL, 0, InvalidBuffer, InvalidOffsetNumber);
PageSetLSN(page, recptr);
}
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 434f15f014..a91776029c 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -550,6 +550,7 @@ gistProcessItup(GISTBuildState *buildstate, IndexTuple itup,
GISTBuildBuffers *gfbb = buildstate->gfbb;
Relation indexrel = buildstate->indexrel;
BlockNumber childblkno;
+ OffsetNumber skipoffnum;
Buffer buffer;
bool result = false;
BlockNumber blkno;
@@ -591,7 +592,7 @@ gistProcessItup(GISTBuildState *buildstate, IndexTuple itup,
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
- childoffnum = gistchoose(indexrel, page, itup, giststate);
+ childoffnum = gistchoose(indexrel, page, itup, giststate, &skipoffnum);
iid = PageGetItemId(page, childoffnum);
idxtuple = (IndexTuple) PageGetItem(page, iid);
childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
@@ -690,7 +691,9 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
itup, ntup, oldoffnum, &placed_to_blk,
InvalidBuffer,
&splitinfo,
- false);
+ false,
+ 1,
+ InvalidOffsetNumber);
/*
* If this is a root split, update the root path item kept in memory. This
@@ -721,6 +724,8 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
{
ItemId iid = PageGetItemId(page, off);
IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
+ if (GistTupleIsSkip(idxtuple))
+ continue;
BlockNumber childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
Buffer childbuf = ReadBuffer(buildstate->indexrel, childblkno);
@@ -879,7 +884,7 @@ gistBufferingFindCorrectParent(GISTBuildState *buildstate,
ItemId iid = PageGetItemId(page, *downlinkoffnum);
IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
- if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == childblkno)
+ if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == childblkno && !GistTupleIsSkip(idxtuple))
{
/* Still there */
return buffer;
@@ -897,6 +902,8 @@ gistBufferingFindCorrectParent(GISTBuildState *buildstate,
{
ItemId iid = PageGetItemId(page, off);
IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
+ if (GistTupleIsSkip(idxtuple))
+ continue;
if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == childblkno)
{
@@ -1177,6 +1184,8 @@ gistMemorizeAllDownlinks(GISTBuildState *buildstate, Buffer parentbuf)
{
ItemId iid = PageGetItemId(page, off);
IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
+ if (GistTupleIsSkip(idxtuple))
+ continue;
BlockNumber childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
gistMemorizeParent(buildstate, childblkno, parentblkno);
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index b30b931c3b..a9406a0aa1 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -421,6 +421,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
/* Ignore tuple if it doesn't match */
if (!match)
+ {
+ if (GistTupleIsSkip(it))
+ {
+ i += GistTupleGetSkipCount(it);
+ }
+ continue;
+ }
+
+ if (GistTupleIsSkip(it))
continue;
if (tbm && GistPageIsLeaf(page))
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 55cccd247a..87af0ad0f7 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -55,7 +55,7 @@ gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
* Check space for itup vector on page
*/
bool
-gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
+gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace, int ndeltup)
{
unsigned int size = freespace,
deleted = 0;
@@ -64,11 +64,14 @@ gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size f
for (i = 0; i < len; i++)
size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
- if (todelete != InvalidOffsetNumber)
+ if (OffsetNumberIsValid(todelete))
{
- IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
+ for (i = 0; i < ndeltup; i++)
+ {
+ IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete + i));
- deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+ deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+ }
}
return (PageGetFreeSpace(page) + deleted < size);
@@ -106,18 +109,71 @@ gistextractpage(Page page, int *len /* out */ )
return itvec;
}
+/*
+ * Read range into itup vector
+ */
+IndexTuple *
+gistextractrange(Page page, OffsetNumber start, int len)
+{
+ OffsetNumber i,
+ maxoff;
+ IndexTuple *itvec,
+ *itvecnext;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ Assert(maxoff >= start + len - 1);
+
+ /* caller will use this x2 allocation */
+ itvecnext = itvec = palloc(sizeof(IndexTuple) * len * 2);
+
+ for (i = start; i < start + len; i = OffsetNumberNext(i))
+ {
+ IndexTuple tup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ Assert(!GistTupleIsSkip(tup));
+ *itvecnext = tup;
+ itvecnext++;
+ }
+
+ return itvec;
+}
+
/*
* join two vectors into one
*/
IndexTuple *
-gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
+gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen, OffsetNumber oldoffnum)
{
+ if (!OffsetNumberIsValid(oldoffnum))
+ oldoffnum = *len;
+ else
+ oldoffnum -= FirstOffsetNumber;
itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
- memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
+ if (oldoffnum < *len)
+ memmove(&itvec[oldoffnum + addlen], &itvec[oldoffnum], sizeof(IndexTuple) * (*len - oldoffnum));
+ memmove(&itvec[oldoffnum], additvec, sizeof(IndexTuple) * addlen);
*len += addlen;
return itvec;
}
+/*
+ * join two vectors into one
+ */
+void
+gistfiltervector(IndexTuple *itvec, int *len)
+{
+ int i;
+ int newlen = 0;
+ int oldlen = *len;
+
+ for (i = 0; i < oldlen; i++)
+ {
+ if (GistTupleIsSkip(itvec[i]))
+ continue;
+ itvec[newlen++] = itvec[i];
+ }
+ *len = newlen;
+}
+
/*
* make plain IndexTupleVector
*/
@@ -357,6 +413,11 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
/* need to update key */
newtup = gistFormTuple(giststate, r, attr, isnull, false);
newtup->t_tid = oldtup->t_tid;
+ if (GistTupleIsSkip(oldtup))
+ {
+ GistTupleMakeSkip(newtup);
+ GistTupleSetSkipCount(newtup, GistTupleGetSkipCount(oldtup));
+ }
}
return newtup;
@@ -370,10 +431,11 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
*/
OffsetNumber
gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
- GISTSTATE *giststate)
+ GISTSTATE *giststate, OffsetNumber *skipoffnum)
{
OffsetNumber result;
OffsetNumber maxoff;
+ OffsetNumber prevskip = InvalidOffsetNumber;
OffsetNumber i;
float best_penalty[INDEX_MAX_KEYS];
GISTENTRY entry,
@@ -439,98 +501,121 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
bool zero_penalty;
int j;
- zero_penalty = true;
-
- /* Loop over index attributes. */
- for (j = 0; j < r->rd_att->natts; j++)
+ if (GistTupleIsSkip(itup))
{
Datum datum;
float usize;
bool IsNull;
- /* Compute penalty for this column. */
- datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
- gistdentryinit(giststate, j, &entry, datum, r, p, i,
- false, IsNull);
- usize = gistpenalty(giststate, j, &entry, IsNull,
- &identry[j], isnull[j]);
- if (usize > 0)
- zero_penalty = false;
+ prevskip = i;
- if (best_penalty[j] < 0 || usize < best_penalty[j])
- {
- /*
- * New best penalty for column. Tentatively select this tuple
- * as the target, and record the best penalty. Then reset the
- * next column's penalty to "unknown" (and indirectly, the
- * same for all the ones to its right). This will force us to
- * adopt this tuple's penalty values as the best for all the
- * remaining columns during subsequent loop iterations.
- */
- result = i;
- best_penalty[j] = usize;
-
- if (j < r->rd_att->natts - 1)
- best_penalty[j + 1] = -1;
-
- /* we have new best, so reset keep-it decision */
- keep_current_best = -1;
- }
- else if (best_penalty[j] == usize)
- {
- /*
- * The current tuple is exactly as good for this column as the
- * best tuple seen so far. The next iteration of this loop
- * will compare the next column.
- */
- }
- else
+ datum = index_getattr(itup, 1, giststate->tupdesc, &IsNull);
+ gistdentryinit(giststate, 0, &entry, datum, r, p, i,
+ false, IsNull);
+ usize = gistpenalty(giststate, 0, &entry, IsNull,
+ &identry[0], isnull[0]);
+ if (usize > best_penalty[0] && best_penalty[0] != -1)
{
- /*
- * The current tuple is worse for this column than the best
- * tuple seen so far. Skip the remaining columns and move on
- * to the next tuple, if any.
- */
- zero_penalty = false; /* so outer loop won't exit */
- break;
+ i += GistTupleGetSkipCount(itup);
}
}
-
- /*
- * If we looped past the last column, and did not update "result",
- * then this tuple is exactly as good as the prior best tuple.
- */
- if (j == r->rd_att->natts && result != i)
+ else
{
- if (keep_current_best == -1)
+ zero_penalty = true;
+
+ /* Loop over index attributes. */
+ for (j = 0; j < r->rd_att->natts; j++)
{
- /* we didn't make the random choice yet for this old best */
- keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+ Datum datum;
+ float usize;
+ bool IsNull;
+
+ /* Compute penalty for this column. */
+ datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
+ gistdentryinit(giststate, j, &entry, datum, r, p, i,
+ false, IsNull);
+ usize = gistpenalty(giststate, j, &entry, IsNull,
+ &identry[j], isnull[j]);
+ if (usize > 0)
+ zero_penalty = false;
+
+ if (best_penalty[j] < 0 || usize < best_penalty[j])
+ {
+ /*
+ * New best penalty for column. Tentatively select this tuple
+ * as the target, and record the best penalty. Then reset the
+ * next column's penalty to "unknown" (and indirectly, the
+ * same for all the ones to its right). This will force us to
+ * adopt this tuple's penalty values as the best for all the
+ * remaining columns during subsequent loop iterations.
+ */
+ result = i;
+ *skipoffnum = prevskip;
+ best_penalty[j] = usize;
+
+ if (j < r->rd_att->natts - 1)
+ best_penalty[j + 1] = -1;
+
+ /* we have new best, so reset keep-it decision */
+ keep_current_best = -1;
+ }
+ else if (best_penalty[j] == usize)
+ {
+ /*
+ * The current tuple is exactly as good for this column as the
+ * best tuple seen so far. The next iteration of this loop
+ * will compare the next column.
+ */
+ }
+ else
+ {
+ /*
+ * The current tuple is worse for this column than the best
+ * tuple seen so far. Skip the remaining columns and move on
+ * to the next tuple, if any.
+ */
+ zero_penalty = false; /* so outer loop won't exit */
+ break;
+ }
}
- if (keep_current_best == 0)
+
+ /*
+ * If we looped past the last column, and did not update "result",
+ * then this tuple is exactly as good as the prior best tuple.
+ */
+ if (j == r->rd_att->natts && result != i)
{
- /* we choose to use the new tuple */
- result = i;
- /* choose again if there are even more exactly-as-good ones */
- keep_current_best = -1;
+ if (keep_current_best == -1)
+ {
+ /* we didn't make the random choice yet for this old best */
+ keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+ }
+ if (keep_current_best == 0)
+ {
+ /* we choose to use the new tuple */
+ result = i;
+ *skipoffnum = prevskip;
+ /* choose again if there are even more exactly-as-good ones */
+ keep_current_best = -1;
+ }
}
- }
- /*
- * If we find a tuple with zero penalty for all columns, and we've
- * decided we don't want to search for another tuple with equal
- * penalty, there's no need to examine remaining tuples; just break
- * out of the loop and return it.
- */
- if (zero_penalty)
- {
- if (keep_current_best == -1)
+ /*
+ * If we find a tuple with zero penalty for all columns, and we've
+ * decided we don't want to search for another tuple with equal
+ * penalty, there's no need to examine remaining tuples; just break
+ * out of the loop and return it.
+ */
+ if (zero_penalty)
{
- /* we didn't make the random choice yet for this old best */
- keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+ if (keep_current_best == -1)
+ {
+ /* we didn't make the random choice yet for this old best */
+ keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+ }
+ if (keep_current_best == 1)
+ break;
}
- if (keep_current_best == 1)
- break;
}
}
@@ -643,6 +728,7 @@ gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
Datum fetchatt[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
int i;
+ Assert(!GistTupleIsSkip(tuple));
for (i = 0; i < r->rd_att->natts; i++)
{
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 22181c6299..9773ca9347 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -225,7 +225,7 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
recptr = gistXLogUpdate(buffer,
todelete, ntodelete,
- NULL, 0, InvalidBuffer);
+ NULL, 0, InvalidBuffer, InvalidOffsetNumber);
PageSetLSN(page, recptr);
}
else
@@ -247,6 +247,9 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
+ if (GistTupleIsSkip(idxtuple))
+ continue;
+
ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
ptr->parentlsn = BufferGetLSNAtomic(buffer);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 1e09126978..e46ab6f0da 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -82,6 +82,15 @@ gistRedoPageUpdateRecord(XLogReaderState *record)
page = (Page) BufferGetPage(buffer);
+ if (xldata->skipoffnum != InvalidOffsetNumber)
+ {
+ IndexTuple skiptuple = (IndexTuple) PageGetItem(page, PageGetItemId(page, xldata->skipoffnum));
+ int newskipgroupsize = GistTupleGetSkipCount(skiptuple) + xldata->ntoinsert - xldata->ntodelete;
+
+ Assert(GistTupleIsSkip(skiptuple));
+ GistTupleSetSkipCount(skiptuple, newskipgroupsize);
+ }
+
if (xldata->ntodelete == 1 && xldata->ntoinsert == 1)
{
/*
@@ -457,14 +466,17 @@ XLogRecPtr
gistXLogUpdate(Buffer buffer,
OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen,
- Buffer leftchildbuf)
+ Buffer leftchildbuf, OffsetNumber skipoffnum)
{
gistxlogPageUpdate xlrec;
int i;
XLogRecPtr recptr;
+ char *start, *end;
+
xlrec.ntodelete = ntodelete;
xlrec.ntoinsert = ituplen;
+ xlrec.skipoffnum = skipoffnum;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate));
@@ -473,8 +485,26 @@ gistXLogUpdate(Buffer buffer,
XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete);
/* new tuples */
- for (i = 0; i < ituplen; i++)
- XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i]));
+ if (ituplen > 0)
+ {
+ start = (char *) itup[0];
+ end = start + IndexTupleSize(itup[0]);
+ for (i = 1; i < ituplen; i++)
+ {
+ if (end == (char *) itup[i])
+ {
+ end += IndexTupleSize(itup[i]);
+ continue;
+ }
+ else
+ {
+ XLogRegisterBufData(0, start, end - start);
+ start = (char *) itup[i];
+ end = start + IndexTupleSize(itup[i]);
+ }
+ }
+ XLogRegisterBufData(0, start, end - start);
+ }
/*
* Include a full page image of the child buf. (only necessary if a
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 36ed7244ba..e01bfb1b2e 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -39,6 +39,8 @@
*/
#define GIST_MAX_SPLIT_PAGES 75
+#define GIST_SKIPGROUP_THRESHOLD 16
+
/* Buffer lock modes */
#define GIST_SHARE BUFFER_LOCK_SHARE
#define GIST_EXCLUSIVE BUFFER_LOCK_EXCLUSIVE
@@ -216,6 +218,8 @@ typedef struct GISTInsertStack
/* offset of the downlink in the parent page, that points to this page */
OffsetNumber downlinkoffnum;
+ OffsetNumber skipoffnum;
+
/* pointer to parent */
struct GISTInsertStack *parent;
} GISTInsertStack;
@@ -276,6 +280,12 @@ typedef struct
#define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
+#define INDEX_SKIP_MASK 0x2000
+
+#define GistTupleIsSkip(itup) ((((IndexTuple) (itup))->t_info & INDEX_SKIP_MASK))
+#define GistTupleMakeSkip(itup) ((((IndexTuple) (itup))->t_info |= INDEX_SKIP_MASK))
+#define GistTupleSetSkipCount(itup, skipTupleCnt) ((((IndexTuple) (itup))->t_tid.ip_posid = skipTupleCnt))
+#define GistTupleGetSkipCount(itup) ((((IndexTuple) (itup))->t_tid.ip_posid))
/*
@@ -404,15 +414,20 @@ extern bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
OffsetNumber oldoffnum, BlockNumber *newblkno,
Buffer leftchildbuf,
List **splitinfo,
- bool markleftchild);
+ bool markleftchild,
+ int ndeltup,
+ OffsetNumber skipoffnum);
extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
int len, GISTSTATE *giststate);
+extern SplitedPageLayout *gistSplitBySkipgroup(Relation r, Page page, IndexTuple *itup,
+ int len, GISTSTATE *giststate);
+
extern XLogRecPtr gistXLogUpdate(Buffer buffer,
OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ntup,
- Buffer leftchild);
+ Buffer leftchild, OffsetNumber skipoffnum);
extern XLogRecPtr gistXLogSplit(bool page_is_leaf,
SplitedPageLayout *dist,
@@ -440,15 +455,18 @@ extern bool gistproperty(Oid index_oid, int attno,
IndexAMProperty prop, const char *propname,
bool *res, bool *isnull);
extern bool gistfitpage(IndexTuple *itvec, int len);
-extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace, int ndeltup);
extern void gistcheckpage(Relation rel, Buffer buf);
extern Buffer gistNewBuffer(Relation r);
extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
OffsetNumber off);
-extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
+extern IndexTuple *gistextractpage(Page page, int *len /* out */);
+extern IndexTuple *gistextractrange(Page page, OffsetNumber start, int len);
extern IndexTuple *gistjoinvector(
IndexTuple *itvec, int *len,
- IndexTuple *additvec, int addlen);
+ IndexTuple *additvec, int addlen, OffsetNumber oldoffnum);
+extern void gistfiltervector(IndexTuple *itvec, int *len);
+extern inline void gistcheckskippage(Page page);
extern IndexTupleData *gistfillitupvec(IndexTuple *vec, int veclen, int *memlen);
extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
@@ -462,7 +480,8 @@ extern IndexTuple gistFormTuple(GISTSTATE *giststate,
extern OffsetNumber gistchoose(Relation r, Page p,
IndexTuple it,
- GISTSTATE *giststate);
+ GISTSTATE *giststate,
+ OffsetNumber *skipoffnum);
extern void GISTInitBuffer(Buffer b, uint32 f);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 1a2b9496d0..625a12ccd1 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -32,8 +32,9 @@
typedef struct gistxlogPageUpdate
{
/* number of deleted offsets */
- uint16 ntodelete;
- uint16 ntoinsert;
+ uint16 ntodelete;
+ uint16 ntoinsert;
+ OffsetNumber skipoffnum;
/*
* In payload of blk 0 : 1. todelete OffsetNumbers 2. tuples to insert
--
2.14.3 (Apple Git-98)