Buffering in tuplesort.c for in-sort deduplication; nbtree edition
Hi,
In the GIN parallel build thread [0]/messages/by-id/flat/6ab4003f-a8b8-4d75-a67f-f25ad98582dc@enterprisedb.com I proposed adding a 'flush'
callback to the tuplesort API [1]/messages/by-id/CAEze2WjB1vpxtvKuWVEThSaB-v4+8H0EXsOB=yLAv8pLcrQuKw@mail.gmail.com, to be called by the tuplesort
machinery after writing each run, so that users of tuplesort can
buffer write calls within each sorted run and use that buffer state to
merge/deduplicate sorttuples before writing them to disk (assuming
some invariants are held).
By implementing deduplication the user can thus reduce the number of
sortable tuples and the total working set size significantly, thus
later reducing time spent reading and processing those runs and
speeding up the build time for tuplesorts with many duplicate sort
keys, but with mergable output.
As the GIN parallel index patch was already large and complex enough,
it was suggested to split it into a separate patch. I'd said that I'd
try to build one, so here is a patch that adds the required API to the
tuplesort internals, and implements the 'buffer, deduplicate, flush'
idea for nbtree index builds that can benefit from deduplication.
If deduplication is enabled for the btree index, we now merge tuples
up to the tuplesort buffer size while we're still extracting tuples
from the heap, thereby reducing the temporary storage requirement of
the index build.
One complication that this patch needs to deal with is that parallel
scans (and syncscan wraparounds) can cause some higher TIDs to appear
before lower tids in two posting lists (e.g. lists [1, 4] and [2, 3]).
Therefore, we must take some special care while loading the tree to
make sure we only write out TIDs which are smaller than the latest
tuple's TID; which is implemented with a growing TID buffer.
As part of these changes, there are some changes to the btree build
behaviour. Where previously we wouldn't ever create posting lists
larger than the target posting list size (812 bytes), we can now emit
some posting tuples with up to 10 [^3] TIDs and up to BTMaxItemSize
large when the non-deduplicated base tuple would otherwise be large
enough to consume that 812-byte posting list size limit.
Local testing shows that index builds can see 60%+ reduced temporary
disk usage when indexing values with high duplication factors -storage
comparable to the resulting index- and I've seen 50% index reduced
build times for some text-based deduplication workloads.
Note: this isn't yet very polished, but it works. I'm reasonably happy
with happy-path performance so far, but I've seen bad cases (unique
dataset without UNIQUE specifier) regress by as much as 30%, so this
is definitely not (yet?) a panacea.
It should be noted that UNIQUE btree builds still make use of the old
non-duplicating tuplesort code, meaning they can be used to detect
regressions in this patch (vs a non-UNIQUE index build with otherwise
the same definition).
Kind regards,
Matthias van de Meent
Neon (https://neon.tech)
[0]: /messages/by-id/flat/6ab4003f-a8b8-4d75-a67f-f25ad98582dc@enterprisedb.com
[1]: /messages/by-id/CAEze2WjB1vpxtvKuWVEThSaB-v4+8H0EXsOB=yLAv8pLcrQuKw@mail.gmail.com
[2]: /messages/by-id/CAEze2WiTAeZe4t5wAeRN834xFBqROPmjeK2XTstNko6bbVPX=A@mail.gmail.com
[^3] Chosen by fair dice roll
Attachments:
v0-0001-Allow-tuplesort-implementations-to-buffer-writes.patchapplication/x-patch; name=v0-0001-Allow-tuplesort-implementations-to-buffer-writes.patchDownload
From bf0f1b64932dfc206bb6f94a875abf92b44c7da0 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Wed, 28 Aug 2024 15:28:37 +0200
Subject: [PATCH v0 1/2] Allow tuplesort implementations to buffer writes
Before, all writes to the sort tapes would have to be completed during
the call to writetup(). That's sufficient when the user of tuplesort
isn't interested in merging sorted tuples, but btree (and in the future,
GIN) sorts tuples to later merge them during insertion into the index.
If it'd merge the tuples before writing them to disk, we can save
significant disk space and IO.
As such, we allow WRITETUP to do whatever it wants when we're filling a
tape with tuples, and call FLUSHWRITES() at the end to mark the end of
that tape so that the tuplesort can flush any remaining buffers to disk.
By design, this does _not_ allow deduplication while the dataset is still
in memory. Writing data to disk is inherently expensive, so we're likely
to win time by spending some additional cycles on buffering the data in
the hopes of not writing as much data. However, in memory the additional
cycles may cause too much of an overhead to be useful.
Note that any implementation of tuple merging using the buffering
strategy that is enabled by this commit must also make sure that the
merged tuples are definitely not larger than the sum of the sizes of the
merged tuples.
---
src/include/utils/tuplesort.h | 9 +++++++++
src/backend/utils/sort/tuplesort.c | 5 +++++
src/backend/utils/sort/tuplesortvariants.c | 7 +++++++
3 files changed, 21 insertions(+)
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index cde83f6201..ed17ca00b6 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -194,6 +194,15 @@ typedef struct
void (*writetup) (Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
+ /*
+ * Flush any buffered writetup() writes.
+ *
+ * This is useful when writetup() buffers writes for more efficient
+ * use of the tape's resources, e.g. when deduplicating or merging
+ * sort tuples.
+ */
+ void (*flushwrites) (Tuplesortstate *state, LogicalTape *tape);
+
/*
* Function to read a stored tuple from tape back into memory. 'len' is
* the already-read length of the stored tuple. The tuple is allocated
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index c960cfa823..fd838a3b1b 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -395,6 +395,7 @@ struct Sharedsort
#define REMOVEABBREV(state,stup,count) ((*(state)->base.removeabbrev) (state, stup, count))
#define COMPARETUP(state,a,b) ((*(state)->base.comparetup) (a, b, state))
#define WRITETUP(state,tape,stup) ((*(state)->base.writetup) (state, tape, stup))
+#define FLUSHWRITES(state,tape) ((state)->base.flushwrites ? (*(state)->base.flushwrites) (state, tape) : (void) 0)
#define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
#define FREESTATE(state) ((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
@@ -2244,6 +2245,8 @@ mergeonerun(Tuplesortstate *state)
}
}
+ FLUSHWRITES(state, state->destTape);
+
/*
* When the heap empties, we're done. Write an end-of-run marker on the
* output tape.
@@ -2369,6 +2372,8 @@ dumptuples(Tuplesortstate *state, bool alltuples)
WRITETUP(state, state->destTape, stup);
}
+ FLUSHWRITES(state, state->destTape);
+
state->memtupcount = 0;
/*
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index e07ba4ea4b..b9e8b3943e 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -199,6 +199,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
base->comparetup = comparetup_heap;
base->comparetup_tiebreak = comparetup_heap_tiebreak;
base->writetup = writetup_heap;
+ base->flushwrites = NULL;
base->readtup = readtup_heap;
base->haveDatum1 = true;
base->arg = tupDesc; /* assume we need not copy tupDesc */
@@ -275,6 +276,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
base->comparetup = comparetup_cluster;
base->comparetup_tiebreak = comparetup_cluster_tiebreak;
base->writetup = writetup_cluster;
+ base->flushwrites = NULL;
base->readtup = readtup_cluster;
base->freestate = freestate_cluster;
base->arg = arg;
@@ -383,6 +385,7 @@ tuplesort_begin_index_btree(Relation heapRel,
base->comparetup = comparetup_index_btree;
base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
base->writetup = writetup_index;
+ base->flushwrites = NULL;
base->readtup = readtup_index;
base->haveDatum1 = true;
base->arg = arg;
@@ -462,6 +465,7 @@ tuplesort_begin_index_hash(Relation heapRel,
base->comparetup = comparetup_index_hash;
base->comparetup_tiebreak = comparetup_index_hash_tiebreak;
base->writetup = writetup_index;
+ base->flushwrites = NULL;
base->readtup = readtup_index;
base->haveDatum1 = true;
base->arg = arg;
@@ -506,6 +510,7 @@ tuplesort_begin_index_gist(Relation heapRel,
base->comparetup = comparetup_index_btree;
base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
base->writetup = writetup_index;
+ base->flushwrites = NULL;
base->readtup = readtup_index;
base->haveDatum1 = true;
base->arg = arg;
@@ -561,6 +566,7 @@ tuplesort_begin_index_brin(int workMem,
base->removeabbrev = removeabbrev_index_brin;
base->comparetup = comparetup_index_brin;
base->writetup = writetup_index_brin;
+ base->flushwrites = NULL;
base->readtup = readtup_index_brin;
base->haveDatum1 = true;
base->arg = NULL;
@@ -602,6 +608,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
base->comparetup = comparetup_datum;
base->comparetup_tiebreak = comparetup_datum_tiebreak;
base->writetup = writetup_datum;
+ base->flushwrites = NULL;
base->readtup = readtup_datum;
base->haveDatum1 = true;
base->arg = arg;
--
2.45.2
v0-0002-Add-tuplesort-level-deduplication-to-nbtree-build.patchapplication/octet-stream; name=v0-0002-Add-tuplesort-level-deduplication-to-nbtree-build.patchDownload
From df5091a66d8b106cf7e423449ba11e982f6329f2 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Fri, 30 Aug 2024 18:41:25 +0200
Subject: [PATCH v0 2/2] Add tuplesort-level deduplication to nbtree builds
This can reduce the total IO requirement of btree tuple sorts, and also
reduces the deduplication work that the index build leader has to do by
moving a good part of this responsibility to the workers.
---
src/include/access/nbtree.h | 15 +-
src/include/utils/tuplesort.h | 6 +-
src/backend/access/nbtree/nbtdedup.c | 183 +++++-
src/backend/access/nbtree/nbtsort.c | 210 +++++--
src/backend/utils/sort/tuplesortvariants.c | 675 ++++++++++++++++++++-
5 files changed, 1014 insertions(+), 75 deletions(-)
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 123fba624d..2eaf4b242d 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -161,11 +161,13 @@ typedef struct BTMetaPageData
* a heap index tuple to make space for a tiebreaker heap TID
* attribute, which we account for here.
*/
-#define BTMaxItemSize(page) \
- (MAXALIGN_DOWN((PageGetPageSize(page) - \
+#define BTMaxItemSizeForPageSize(pagesize) \
+ (MAXALIGN_DOWN(((pagesize) - \
MAXALIGN(SizeOfPageHeaderData + 3*sizeof(ItemIdData)) - \
MAXALIGN(sizeof(BTPageOpaqueData))) / 3) - \
MAXALIGN(sizeof(ItemPointerData)))
+#define BTMaxItemSize(page) BTMaxItemSizeForPageSize(PageGetPageSize(page))
+
#define BTMaxItemSizeNoHeapTid(page) \
MAXALIGN_DOWN((PageGetPageSize(page) - \
MAXALIGN(SizeOfPageHeaderData + 3*sizeof(ItemIdData)) - \
@@ -880,6 +882,11 @@ typedef struct BTDedupStateData
int nitems; /* Number of existing tuples/line pointers */
Size phystupsize; /* Includes line pointer overhead */
+ /* Metadata used during index build */
+ int rcvdallupto; /* index of latest received min TID */
+ int nhtidssorted; /* number of sorted tids, starting at 0 */
+ int maxnhtids; /* current max entries of ntids allocation */
+
/*
* Array of tuples to go on new version of the page. Contains one entry
* for each group of consecutive items. Note that existing tuples that
@@ -1200,12 +1207,16 @@ extern bool _bt_bottomupdel_pass(Relation rel, Buffer buf, Relation heapRel,
extern void _bt_dedup_start_pending(BTDedupState state, IndexTuple base,
OffsetNumber baseoff);
extern bool _bt_dedup_save_htid(BTDedupState state, IndexTuple itup);
+extern bool _bt_sort_dedup_save_htids(BTDedupState state, IndexTuple itup);
extern Size _bt_dedup_finish_pending(Page newpage, BTDedupState state);
extern IndexTuple _bt_form_posting(IndexTuple base, ItemPointer htids,
int nhtids);
extern void _bt_update_posting(BTVacuumPosting vacposting);
extern IndexTuple _bt_swap_posting(IndexTuple newitem, IndexTuple oposting,
int postingoff);
+#ifdef USE_ASSERT_CHECKING
+extern bool _bt_posting_valid(IndexTuple posting);
+#endif
/*
* prototypes for functions in nbtinsert.c
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index ed17ca00b6..a52d58c5e0 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -437,8 +437,10 @@ extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
bool uniqueNullsNotDistinct,
- int workMem, SortCoordinate coordinate,
- int sortopt);
+ int workMem,
+ SortCoordinate coordinate,
+ int sortopt,
+ bool deduplicate);
extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel,
Relation indexRel,
uint32 high_mask,
diff --git a/src/backend/access/nbtree/nbtdedup.c b/src/backend/access/nbtree/nbtdedup.c
index 456d86b51c..6ef37496a5 100644
--- a/src/backend/access/nbtree/nbtdedup.c
+++ b/src/backend/access/nbtree/nbtdedup.c
@@ -26,9 +26,6 @@ static bool _bt_do_singleval(Relation rel, Page page, BTDedupState state,
OffsetNumber minoff, IndexTuple newitem);
static void _bt_singleval_fillfactor(Page page, BTDedupState state,
Size newitemsz);
-#ifdef USE_ASSERT_CHECKING
-static bool _bt_posting_valid(IndexTuple posting);
-#endif
/*
* Perform a deduplication pass.
@@ -97,6 +94,10 @@ _bt_dedup_pass(Relation rel, Buffer buf, IndexTuple newitem, Size newitemsz,
state->phystupsize = 0;
/* nintervals should be initialized to zero */
state->nintervals = 0;
+ /* unused here; initialize to 0 */
+ state->maxnhtids = 0;
+ state->nhtidssorted = 0;
+ state->rcvdallupto = 0;
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
@@ -333,6 +334,10 @@ _bt_bottomupdel_pass(Relation rel, Buffer buf, Relation heapRel,
state->nitems = 0;
state->phystupsize = 0;
state->nintervals = 0;
+ /* unused here; initialize to 0 */
+ state->maxnhtids = 0;
+ state->nhtidssorted = 0;
+ state->rcvdallupto = 0;
/*
* Initialize tableam state that describes bottom-up index deletion
@@ -452,13 +457,34 @@ _bt_dedup_start_pending(BTDedupState state, IndexTuple base,
int nposting;
nposting = BTreeTupleGetNPosting(base);
+
+ if (state->maxnhtids > 0 && nposting > state->maxnhtids)
+ {
+ while (nposting > state->maxnhtids)
+ state->maxnhtids *= 2;
+
+ state->htids = repalloc(state->htids, state->maxnhtids
+ * sizeof(ItemPointerData));
+ }
+
memcpy(state->htids, BTreeTupleGetPosting(base),
sizeof(ItemPointerData) * nposting);
+
state->nhtids = nposting;
/* basetupsize should not include existing posting list */
state->basetupsize = BTreeTupleGetPostingOffset(base);
}
+ state->nhtidssorted = state->nhtids;
+ state->rcvdallupto = 1;
+
+ Assert(state->nhtids >= 0);
+ Assert(state->maxnhtids <= 0 || (
+ state->maxnhtids >= state->nhtids &&
+ state->nhtids >= state->nhtidssorted &&
+ state->nhtidssorted >= state->rcvdallupto &&
+ state->rcvdallupto >= 0));
+
/*
* Save new base tuple itself -- it'll be needed if we actually create a
* new posting list from new pending posting list.
@@ -543,6 +569,155 @@ _bt_dedup_save_htid(BTDedupState state, IndexTuple itup)
return true;
}
+static int32
+tidcmp(const void *a, const void *b)
+{
+ ItemPointer iptr1 = ((const ItemPointer) a);
+ ItemPointer iptr2 = ((const ItemPointer) b);
+
+ return ItemPointerCompare(iptr1, iptr2);
+}
+
+/*
+ * Save itup heap TID(s) into pending posting list where possible.
+ *
+ * Returns bool indicating if the pending posting list managed by state now
+ * includes itup's heap TID(s).
+ */
+bool
+_bt_sort_dedup_save_htids(BTDedupState state, IndexTuple itup)
+{
+ int addnhtids;
+ ItemPointer htids;
+ Size mergedtupsz;
+
+ Assert(!BTreeTupleIsPivot(itup));
+ Assert(state->nhtids >= 0);
+
+ if (!BTreeTupleIsPosting(itup))
+ {
+ addnhtids = 1;
+ htids = &itup->t_tid;
+ }
+ else
+ {
+ Assert(_bt_posting_valid(itup));
+
+ addnhtids = BTreeTupleGetNPosting(itup);
+ htids = BTreeTupleGetPosting(itup);
+ }
+
+ /* increase TID buffer size if we need to */
+ if (state->nhtids + addnhtids > state->maxnhtids)
+ {
+ while (state->maxnhtids < (state->nhtids + addnhtids))
+ state->maxnhtids *= 2;
+
+ state->htids = repalloc(state->htids,
+ sizeof(ItemPointerData) * state->maxnhtids);
+ }
+
+ state->nitems++;
+ memcpy(&state->htids[state->nhtids], htids,
+ sizeof(ItemPointerData) * addnhtids);
+
+ if (state->nhtids > 0)
+ {
+ if (state->nhtidssorted == state->nhtids &&
+ ItemPointerCompare(&state->htids[state->nhtids - 1], htids) < 0)
+ {
+ int old_nhtids = state->nhtids;
+
+ state->nhtids += addnhtids;
+ state->nhtidssorted += addnhtids;
+ /* new tuples may contain htids[0] + 1 */
+ state->rcvdallupto = old_nhtids + 1;
+ }
+ else
+ {
+ int old_nhtids = state->nhtids;
+
+ state->nhtids += addnhtids;
+
+ /*
+ * Only sort every doubling of number of TIDs, so that we don't
+ * spend too much time sorting.
+ */
+ if (state->nhtids > state->nhtidssorted * 2)
+ {
+ ItemPointer ptr;
+ int allreceived;
+
+ Assert(state->maxnhtids >= state->nhtids &&
+ state->nhtids >= state->nhtidssorted &&
+ state->nhtidssorted >= state->rcvdallupto &&
+ state->rcvdallupto >= 0);
+
+ /*
+ * Sort all TIDs. Note that contents haven't changed for the
+ * data < rcvdallupto, so we don't have to sort those again.
+ */
+ qsort((&state->htids[state->rcvdallupto]),
+ state->nhtids - state->rcvdallupto,
+ sizeof(ItemPointerData),
+ tidcmp);
+
+ state->nhtidssorted = state->nhtids;
+
+ ptr = bsearch(htids, state->htids, state->nhtids,
+ sizeof(ItemPointerData), tidcmp);
+
+ allreceived = (ptr - state->htids) + 1;
+
+ /*
+ * We added a tuple, so the number of known-storable TIDs must
+ * have increased.
+ */
+ Assert(allreceived > 0 && allreceived > state->rcvdallupto &&
+ allreceived <= old_nhtids + 1);
+ Assert(ItemPointerCompare(&state->htids[allreceived - 1],
+ htids) == 0);
+
+ state->rcvdallupto = allreceived;
+ }
+ }
+ }
+ else
+ {
+ /* all TIDs are new */
+ state->nhtids = addnhtids;
+ state->nhtidssorted = addnhtids;
+ state->rcvdallupto = 1;
+ }
+
+ Assert(state->maxnhtids >= state->nhtids &&
+ state->nhtids >= state->nhtidssorted &&
+ state->nhtidssorted >= state->rcvdallupto &&
+ state->rcvdallupto >= 0);
+
+ mergedtupsz = MAXALIGN(state->rcvdallupto * sizeof(ItemPointerData) +
+ state->basetupsize);
+
+ if (mergedtupsz > state->maxpostingsize)
+ {
+ /*
+ * Count this as an oversized item for single value strategy, though
+ * only when there are 50 TIDs in the final posting list tuple. This
+ * limit (which is fairly arbitrary) avoids confusion about how many
+ * 1/6 of a page tuples have been encountered/created by the current
+ * deduplication pass.
+ *
+ * Note: We deliberately don't consider which deduplication pass
+ * merged together tuples to create this item (could be a previous
+ * deduplication pass, or current pass). See _bt_do_singleval()
+ * comments.
+ */
+ return false;
+ }
+
+ return true;
+}
+
/*
* Finalize pending posting list tuple, and add it to the page. Final tuple
* is based on saved base tuple, and saved list of heap TIDs.
@@ -1074,7 +1249,7 @@ _bt_swap_posting(IndexTuple newitem, IndexTuple oposting, int postingoff)
* tuple. Used within assertions.
*/
#ifdef USE_ASSERT_CHECKING
-static bool
+bool
_bt_posting_valid(IndexTuple posting)
{
ItemPointerData last;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 17a352d040..8de423800d 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -101,6 +101,7 @@ typedef struct BTShared
Oid indexrelid;
bool isunique;
bool nulls_not_distinct;
+ bool deduplicate;
bool isconcurrent;
int scantuplesortstates;
@@ -205,6 +206,7 @@ typedef struct BTBuildState
{
bool isunique;
bool nulls_not_distinct;
+ bool deduplicate;
bool havedead;
Relation heap;
BTSpool *spool;
@@ -282,10 +284,8 @@ static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot);
static double _bt_parallel_heapscan(BTBuildState *buildstate,
bool *brokenhotchain);
static void _bt_leader_participate_as_worker(BTBuildState *buildstate);
-static void _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
- BTShared *btshared, Sharedsort *sharedsort,
- Sharedsort *sharedsort2, int sortmem,
- bool progress);
+static void _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, BTShared *btshared, Sharedsort *sharedsort,
+ Sharedsort *sharedsort2, int sortmem, bool deduplicate, bool progress);
/*
@@ -311,6 +311,8 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
buildstate.spool2 = NULL;
buildstate.indtuples = 0;
buildstate.btleader = NULL;
+ buildstate.deduplicate = _bt_allequalimage(index, true) &&
+ !indexInfo->ii_Unique && BTGetDeduplicateItems(index);
/*
* We expect to be called exactly once for any index relation. If that's
@@ -430,7 +432,7 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
tuplesort_begin_index_btree(heap, index, buildstate->isunique,
buildstate->nulls_not_distinct,
maintenance_work_mem, coordinate,
- TUPLESORT_NONE);
+ TUPLESORT_NONE, buildstate->deduplicate);
/*
* If building a unique index, put dead tuples in a second spool to keep
@@ -469,7 +471,8 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
*/
buildstate->spool2->sortstate =
tuplesort_begin_index_btree(heap, index, false, false, work_mem,
- coordinate2, TUPLESORT_NONE);
+ coordinate2, TUPLESORT_NONE,
+ buildstate->deduplicate);
}
/* Fill spool using either serial or parallel heap scan */
@@ -1020,6 +1023,87 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
state->btps_lastoff = last_off;
}
+/*
+ * Add pending posting list tuples
+ */
+static void
+_bt_sort_dedup_output_pending(BTWriteState *wstate, BTPageState *state,
+ BTDedupState dstate)
+{
+ IndexTuple postingtuple;
+ Size truncextra;
+ /* request TIDs/tuple based on tuple size */
+ int optmaxhtids = 0;
+ /* max TIDs/tuple possible based on tuple and page size */
+ int physmaxhtids;
+ /* actual limit of TIDs per tuple */
+ int maxpostnhtids;
+ ItemPointer htids;
+
+ physmaxhtids = (BTMaxItemSizeForPageSize(BLCKSZ) - dstate->basetupsize)
+ / sizeof(ItemPointerData);
+
+ if (dstate->basetupsize < dstate->maxpostingsize)
+ {
+ optmaxhtids = (dstate->maxpostingsize - dstate->basetupsize) /
+ sizeof(ItemPointerData);
+ }
+
+ Assert(dstate->maxnhtids >= dstate->nhtids);
+ Assert(dstate->nhtids >= dstate->nhtidssorted);
+ Assert(dstate->nhtidssorted >= dstate->rcvdallupto);
+
+ /*------
+ * We
+ * Must be able to fit 1 TID in the tuple;
+ * Cannot fit more than physmaxhtids in the posting list;
+ * Want at least 10 TIDs in the posting list, and
+ * would otherwise want to fill the tuple up to maxpostingsize.
+ */
+ maxpostnhtids = Max(1, Min(Max(10, optmaxhtids), physmaxhtids));
+
+ Assert(dstate->nhtids >= dstate->rcvdallupto);
+ Assert(maxpostnhtids > 0);
+ htids = dstate->htids;
+
+ /*
+ * We can keep writing while we have TIDs of which we know no earlier
+ * TIDs will arrive.
+ */
+ while (dstate->rcvdallupto >= maxpostnhtids)
+ {
+ /* form a tuple with a posting list */
+ postingtuple = _bt_form_posting(dstate->base, htids, maxpostnhtids);
+ truncextra = IndexTupleSize(postingtuple) - dstate->basetupsize;
+
+ _bt_buildadd(wstate, state, postingtuple, truncextra);
+
+ htids += maxpostnhtids;
+ dstate->nhtids -= maxpostnhtids;
+ dstate->rcvdallupto -= maxpostnhtids;
+ dstate->nhtidssorted -= maxpostnhtids;
+ dstate->nmaxitems++;
+ }
+
+ /*
+ * Only memmove TIDs once per output call; that reduces overhead.
+ */
+ if (htids != dstate->htids)
+ {
+ memmove(dstate->htids, htids,
+ dstate->nhtids * sizeof(ItemPointerData));
+ }
+}
+
+static int32
+tidcmp(const void *a, const void *b)
+{
+ ItemPointer iptr1 = ((const ItemPointer) a);
+ ItemPointer iptr2 = ((const ItemPointer) b);
+
+ return ItemPointerCompare(iptr1, iptr2);
+}
+
/*
* Finalize pending posting list tuple, and add it to the index. Final tuple
* is based on saved base tuple, and saved list of heap TIDs.
@@ -1033,27 +1117,66 @@ _bt_sort_dedup_finish_pending(BTWriteState *wstate, BTPageState *state,
{
Assert(dstate->nitems > 0);
- if (dstate->nitems == 1)
- _bt_buildadd(wstate, state, dstate->base, 0);
- else
+ /*
+ * We're done with this combination of key values, so let's flush
+ * all max-sized tuples.
+ */
+ if (dstate->nhtids != dstate->nhtidssorted)
{
- IndexTuple postingtuple;
- Size truncextra;
+ int tosort = dstate->nhtids - dstate->rcvdallupto;
+ qsort(dstate->htids + dstate->rcvdallupto, tosort,
+ sizeof(ItemPointerData), tidcmp);
+ dstate->nhtidssorted = dstate->nhtids;
+ }
- /* form a tuple with a posting list */
- postingtuple = _bt_form_posting(dstate->base,
- dstate->htids,
- dstate->nhtids);
- /* Calculate posting list overhead */
- truncextra = IndexTupleSize(postingtuple) -
- BTreeTupleGetPostingOffset(postingtuple);
+ dstate->rcvdallupto = dstate->nhtids;
- _bt_buildadd(wstate, state, postingtuple, truncextra);
- pfree(postingtuple);
+ _bt_sort_dedup_output_pending(wstate, state, dstate);
+
+ if (dstate->nhtids > 0)
+ {
+ /*
+ * We haven't merged any data into the base tuple, we can add it
+ * directly to the index.
+ */
+ if (dstate->nitems == 1)
+ _bt_buildadd(wstate, state, dstate->base, 0);
+ else
+ {
+ /*
+ * The index tuple was updated/merged, so we have to create new
+ * index tuples. Note that nhtids can be 1 when _output_pending
+ * left just 1 TID in the array.
+ */
+ IndexTuple postingtuple;
+ Size truncextra;
+
+ /* form a tuple with a posting list */
+ postingtuple = _bt_form_posting(dstate->base,
+ dstate->htids,
+ dstate->nhtids);
+
+ if (dstate->nhtids > 1)
+ {
+ /* Calculate posting list overhead */
+ truncextra = IndexTupleSize(postingtuple) -
+ BTreeTupleGetPostingOffset(postingtuple);
+ }
+ else
+ {
+ /* no truncatable overhead for single-value tuples */
+ truncextra = 0;
+ }
+
+ _bt_buildadd(wstate, state, postingtuple, truncextra);
+ pfree(postingtuple);
+ }
}
dstate->nmaxitems = 0;
dstate->nhtids = 0;
+ dstate->rcvdallupto = 0;
+ dstate->nhtidssorted = 0;
dstate->nitems = 0;
dstate->phystupsize = 0;
}
@@ -1282,6 +1405,9 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
dstate->nitems = 0;
dstate->phystupsize = 0; /* unused */
dstate->nintervals = 0; /* unused */
+ /* tuplesort-generated posting list tuple merging */
+ dstate->nhtidssorted = 0;
+ dstate->rcvdallupto = 0;
while ((itup = tuplesort_getindextuple(btspool->sortstate,
true)) != NULL)
@@ -1308,26 +1434,38 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
Assert(dstate->maxpostingsize <= BTMaxItemSize((Page) state->btps_buf) &&
dstate->maxpostingsize <= INDEX_SIZE_MASK);
dstate->htids = palloc(dstate->maxpostingsize);
+ dstate->maxnhtids =
+ dstate->maxpostingsize / sizeof(ItemPointerData);
/* start new pending posting list with itup copy */
_bt_dedup_start_pending(dstate, CopyIndexTuple(itup),
InvalidOffsetNumber);
}
else if (_bt_keep_natts_fast(wstate->index, dstate->base,
- itup) > keysz &&
- _bt_dedup_save_htid(dstate, itup))
+ itup) > keysz)
{
- /*
- * Tuple is equal to base tuple of pending posting list. Heap
- * TID from itup has been saved in state.
- */
+ /* tuple is equal to base tuple of pending posting list. */
+
+ if (_bt_sort_dedup_save_htids(dstate, itup))
+ {
+ /*
+ * Heap TIDs from itup have been saved in dstate; we don't
+ * yet have enough to fill a maxsized tuple.
+ */
+ }
+ else
+ {
+ /*
+ * _bt_sort_dedup_save_htids() noticed we have enough TIDs
+ * to emit a new max-sized index tuple.
+ */
+ _bt_sort_dedup_output_pending(wstate, state, dstate);
+ }
}
else
{
/*
- * Tuple is not equal to pending posting list tuple, or
- * _bt_dedup_save_htid() opted to not merge current item into
- * pending posting list.
+ * Tuple is not equal to pending posting list tuple.
*/
_bt_sort_dedup_finish_pending(wstate, state, dstate);
pfree(dstate->base);
@@ -1505,6 +1643,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
btshared->indexrelid = RelationGetRelid(btspool->index);
btshared->isunique = btspool->isunique;
btshared->nulls_not_distinct = btspool->nulls_not_distinct;
+ btshared->deduplicate = buildstate->deduplicate;
btshared->isconcurrent = isconcurrent;
btshared->scantuplesortstates = scantuplesortstates;
btshared->queryid = pgstat_get_my_query_id();
@@ -1725,7 +1864,7 @@ _bt_leader_participate_as_worker(BTBuildState *buildstate)
/* Perform work common to all participants */
_bt_parallel_scan_and_sort(leaderworker, leaderworker2, btleader->btshared,
btleader->sharedsort, btleader->sharedsort2,
- sortmem, true);
+ sortmem, buildstate->deduplicate, true);
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
@@ -1832,7 +1971,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
/* Perform sorting of spool, and possibly a spool2 */
sortmem = maintenance_work_mem / btshared->scantuplesortstates;
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
- sharedsort2, sortmem, false);
+ sharedsort2, sortmem, btshared->deduplicate,
+ false);
/* Report WAL/buffer usage during parallel execution */
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
@@ -1867,7 +2007,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
static void
_bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
BTShared *btshared, Sharedsort *sharedsort,
- Sharedsort *sharedsort2, int sortmem, bool progress)
+ Sharedsort *sharedsort2, int sortmem,
+ bool deduplicate, bool progress)
{
SortCoordinate coordinate;
BTBuildState buildstate;
@@ -1887,7 +2028,8 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
btspool->isunique,
btspool->nulls_not_distinct,
sortmem, coordinate,
- TUPLESORT_NONE);
+ TUPLESORT_NONE,
+ deduplicate);
/*
* Just as with serial case, there may be a second spool. If so, a
@@ -1910,7 +2052,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
btspool2->sortstate =
tuplesort_begin_index_btree(btspool->heap, btspool->index, false, false,
Min(sortmem, work_mem), coordinate2,
- false);
+ TUPLESORT_NONE, deduplicate);
}
/* Fill in buildstate for _bt_build_callback() */
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index b9e8b3943e..b4fd99bea0 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -49,43 +49,52 @@ static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups,
static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups,
int count);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static int comparetup_heap_tiebreak(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
LogicalTape *tape, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static int comparetup_cluster_tiebreak(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
LogicalTape *tape, unsigned int tuplen);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static int comparetup_index_btree_tiebreak(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
+static int comparetup_index_btree_dedup(const SortTuple *a,
+ const SortTuple *b,
+ Tuplesortstate *state);
+static int comparetup_index_btree_dedup_tiebreak(const SortTuple *a,
+ const SortTuple *b,
+ Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static int comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static int comparetup_index_brin(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
LogicalTape *tape, unsigned int len);
+static void writetup_btree_dedup(Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+static void flushwrites_btree_dedup(Tuplesortstate *state, LogicalTape *tape);
static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
LogicalTape *tape, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static int comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
@@ -123,8 +132,33 @@ typedef struct
bool enforceUnique; /* complain if we find duplicate tuples */
bool uniqueNullsNotDistinct; /* unique constraint null treatment */
+ /* Used by btree deduplication */
+ bool clean;
+ int nkeyatts; /* number of key attributes of the index */
+ int nhtids;
+ int nhtidssorted;
+ int rcvallupto;
+ int postoff;
+ int maxposthtids;
+ SortTuple bufsorttup; /* buffered index tuple, in .tupleBuffer */
+ void *tupbuf; /* buffer for deduplication */
+ ItemPointer htids; /* start of posting array in buffer, if any */
+ Size dedupsaved;
} TuplesortIndexBTreeArg;
+/* Buffer for merging btree duplicates. */
+#define BT_TUPLE_WRITE_BUFFER_SIZE (1024)
+/* We should merge tuples only up to an allowed size for btree pages */
+#define TARGET_POSTING_TUPLE_SIZE \
+ (Min(BT_TUPLE_WRITE_BUFFER_SIZE, BTMaxItemSizeForPageSize(BLCKSZ)))
+
+/*
+ * We only buffer new incoming tuples if they are small enough to fit 2 more
+ * TIDs */
+#define CAN_BUFFER_TUPLE(tupsize) ( \
+ (tupsize) < (TARGET_POSTING_TUPLE_SIZE - 2 * sizeof(ItemPointerData)) \
+)
+
/*
* Data structure pointed by "TuplesortPublic.arg" for the index_hash subcase.
*/
@@ -334,7 +368,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
Assert(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
- BTGreaterStrategyNumber : BTLessStrategyNumber;
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
@@ -353,7 +387,8 @@ tuplesort_begin_index_btree(Relation heapRel,
bool uniqueNullsNotDistinct,
int workMem,
SortCoordinate coordinate,
- int sortopt)
+ int sortopt,
+ bool deduplicate)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
sortopt);
@@ -382,10 +417,38 @@ tuplesort_begin_index_btree(Relation heapRel,
PARALLEL_SORT(coordinate));
base->removeabbrev = removeabbrev_index;
- base->comparetup = comparetup_index_btree;
- base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
- base->writetup = writetup_index;
- base->flushwrites = NULL;
+
+ if (deduplicate)
+ {
+ base->writetup = writetup_btree_dedup;
+ base->flushwrites = flushwrites_btree_dedup;
+ base->comparetup = comparetup_index_btree_dedup;
+ base->comparetup_tiebreak = comparetup_index_btree_dedup_tiebreak;
+ /*
+ * This buffer is twice the size of tuplesort's SLAB_SLOT_SIZE, so
+ * that it can always fit 2 deduplicated tuples worth of TIDs to
+ * merge.
+ */
+ arg->dedupsaved = 0;
+ arg->nhtids = 0;
+ arg->nhtidssorted = 0;
+ arg->rcvallupto = 0;
+ arg->tupbuf = palloc(BT_TUPLE_WRITE_BUFFER_SIZE);
+ arg->bufsorttup.tuple = NULL;
+ arg->bufsorttup.datum1 = (Datum) 0;
+ arg->bufsorttup.isnull1 = false;
+ arg->bufsorttup.srctape = -1;
+ arg->nkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRel);
+ }
+ else
+ {
+ base->writetup = writetup_index;
+ base->flushwrites = NULL;
+ base->comparetup = comparetup_index_btree;
+ base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
+ arg->tupbuf = NULL;
+ }
+
base->readtup = readtup_index;
base->haveDatum1 = true;
base->arg = arg;
@@ -418,7 +481,7 @@ tuplesort_begin_index_btree(Relation heapRel,
Assert(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
- BTGreaterStrategyNumber : BTLessStrategyNumber;
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
@@ -764,7 +827,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
/* GetMemoryChunkSpace is not supported for bump contexts */
if (TupleSortUseBumpTupleCxt(base->sortopt))
- tuplen = MAXALIGN(tuple->t_info & INDEX_SIZE_MASK);
+ tuplen = MAXALIGN(IndexTupleSize(tuple));
else
tuplen = GetMemoryChunkSpace(tuple);
@@ -1064,7 +1127,7 @@ removeabbrev_heap(Tuplesortstate *state, SortTuple *stups, int count)
HeapTupleData htup;
htup.t_len = ((MinimalTuple) stups[i].tuple)->t_len +
- MINIMAL_TUPLE_OFFSET;
+ MINIMAL_TUPLE_OFFSET;
htup.t_data = (HeapTupleHeader) ((char *) stups[i].tuple -
MINIMAL_TUPLE_OFFSET);
stups[i].datum1 = heap_getattr(&htup,
@@ -1105,9 +1168,9 @@ comparetup_heap_tiebreak(const SortTuple *a, const SortTuple *b, Tuplesortstate
int32 compare;
AttrNumber attno;
Datum datum1,
- datum2;
+ datum2;
bool isnull1,
- isnull2;
+ isnull2;
ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
@@ -1250,9 +1313,9 @@ comparetup_cluster_tiebreak(const SortTuple *a, const SortTuple *b,
int nkey;
int32 compare = 0;
Datum datum1,
- datum2;
+ datum2;
bool isnull1,
- isnull2;
+ isnull2;
ltup = (HeapTuple) a->tuple;
rtup = (HeapTuple) b->tuple;
@@ -1367,7 +1430,7 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
TuplesortClusterArg *arg = (TuplesortClusterArg *) base->arg;
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) tuplesort_readtup_alloc(state,
- t_len + HEAPTUPLESIZE);
+ t_len + HEAPTUPLESIZE);
/* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
@@ -1470,9 +1533,9 @@ comparetup_index_btree_tiebreak(const SortTuple *a, const SortTuple *b,
int nkey;
int32 compare;
Datum datum1,
- datum2;
+ datum2;
bool isnull1,
- isnull2;
+ isnull2;
tuple1 = (IndexTuple) a->tuple;
tuple2 = (IndexTuple) b->tuple;
@@ -1542,12 +1605,12 @@ comparetup_index_btree_tiebreak(const SortTuple *a, const SortTuple *b,
ereport(ERROR,
(errcode(ERRCODE_UNIQUE_VIOLATION),
- errmsg("could not create unique index \"%s\"",
- RelationGetRelationName(arg->index.indexRel)),
- key_desc ? errdetail("Key %s is duplicated.", key_desc) :
- errdetail("Duplicate keys exist."),
- errtableconstraint(arg->index.heapRel,
- RelationGetRelationName(arg->index.indexRel))));
+ errmsg("could not create unique index \"%s\"",
+ RelationGetRelationName(arg->index.indexRel)),
+ key_desc ? errdetail("Key %s is duplicated.", key_desc) :
+ errdetail("Duplicate keys exist."),
+ errtableconstraint(arg->index.heapRel,
+ RelationGetRelationName(arg->index.indexRel))));
}
/*
@@ -1577,6 +1640,168 @@ comparetup_index_btree_tiebreak(const SortTuple *a, const SortTuple *b,
return 0;
}
+static int
+comparetup_index_btree_dedup(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state)
+{
+ /*
+ * This is similar to comparetup_heap(), but expects index tuples. There
+ * is also special handling for enforcing uniqueness, and special
+ * treatment for equal keys at the end.
+ */
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ SortSupport sortKey = base->sortKeys;
+ int32 compare;
+
+ /* Compare the leading sort key */
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ sortKey);
+ if (compare != 0)
+ return compare;
+
+ /* Compare additional sort keys */
+ return comparetup_index_btree_dedup_tiebreak(a, b, state);
+}
+
+static int
+comparetup_index_btree_dedup_tiebreak(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state)
+{
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ TuplesortIndexBTreeArg *arg = (TuplesortIndexBTreeArg *) base->arg;
+ SortSupport sortKey = base->sortKeys;
+ IndexTuple tuple1;
+ IndexTuple tuple2;
+ ItemPointer pointer1;
+ ItemPointer pointer2;
+ int keysz;
+ TupleDesc tupDes;
+ bool equal_hasnull = false;
+ int nkey;
+ int32 compare;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+
+ tuple1 = (IndexTuple) a->tuple;
+ tuple2 = (IndexTuple) b->tuple;
+ keysz = base->nKeys;
+ tupDes = RelationGetDescr(arg->index.indexRel);
+
+ if (sortKey->abbrev_converter)
+ {
+ datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
+ datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+
+ /* they are equal, so we only need to examine one null flag */
+ if (a->isnull1)
+ equal_hasnull = true;
+
+ sortKey++;
+ for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
+ {
+ datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
+ datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
+
+ compare = ApplySortComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare; /* done when we find unequal attributes */
+
+ /* they are equal, so we only need to examine one null flag */
+ if (isnull1)
+ equal_hasnull = true;
+ }
+
+ /*
+ * If btree has asked us to enforce uniqueness, complain if two equal
+ * tuples are detected (unless there was at least one NULL field and NULLS
+ * NOT DISTINCT was not set).
+ *
+ * It is sufficient to make the test here, because if two tuples are equal
+ * they *must* get compared at some stage of the sort --- otherwise the
+ * sort algorithm wouldn't have checked whether one must appear before the
+ * other.
+ */
+ if (arg->enforceUnique && !(!arg->uniqueNullsNotDistinct && equal_hasnull))
+ {
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ char *key_desc;
+
+ /*
+ * Some rather brain-dead implementations of qsort (such as the one in
+ * QNX 4) will sometimes call the comparison routine to compare a
+ * value to itself, but we always use our own implementation, which
+ * does not.
+ */
+ Assert(tuple1 != tuple2);
+
+ index_deform_tuple(tuple1, tupDes, values, isnull);
+
+ key_desc = BuildIndexValueDescription(arg->index.indexRel, values, isnull);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNIQUE_VIOLATION),
+ errmsg("could not create unique index \"%s\"",
+ RelationGetRelationName(arg->index.indexRel)),
+ key_desc ? errdetail("Key %s is duplicated.", key_desc) :
+ errdetail("Duplicate keys exist."),
+ errtableconstraint(arg->index.heapRel,
+ RelationGetRelationName(arg->index.indexRel))));
+ }
+
+ /*
+ * While BTreeTupleGetHeapTID would also cater this purpose, we don't
+ * need the IsPivot path therein, so we open-code our pivot-less path
+ * here.
+ */
+ if (BTreeTupleIsPosting(tuple1))
+ pointer1 = BTreeTupleGetPosting(tuple1);
+ else
+ pointer1 = &tuple1->t_tid;
+
+ if (BTreeTupleIsPosting(tuple2))
+ pointer2 = BTreeTupleGetPosting(tuple2);
+ else
+ pointer2 = &tuple2->t_tid;
+ /*
+ * If key values are equal, we sort on ItemPointer. This is required for
+ * btree indexes, since heap TID is treated as an implicit last key
+ * attribute in order to ensure that all keys in the index are physically
+ * unique.
+ */
+ {
+ BlockNumber blk1 = ItemPointerGetBlockNumber(pointer1);
+ BlockNumber blk2 = ItemPointerGetBlockNumber(pointer2);
+
+ if (blk1 != blk2)
+ return (blk1 < blk2) ? -1 : 1;
+ }
+ {
+ OffsetNumber pos1 = ItemPointerGetOffsetNumber(pointer1);
+ OffsetNumber pos2 = ItemPointerGetOffsetNumber(pointer2);
+
+ if (pos1 != pos2)
+ return (pos1 < pos2) ? -1 : 1;
+ }
+
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
+ return 0;
+}
+
static int
comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state)
@@ -1696,6 +1921,390 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
+static int32
+tidcmp(const void *a, const void *b)
+{
+ ItemPointer iptr1 = ((const ItemPointer) a);
+ ItemPointer iptr2 = ((const ItemPointer) b);
+
+ return ItemPointerCompare(iptr1, iptr2);
+}
+
+static void
+writetup_btree_dedup_writebuftup(TuplesortIndexBTreeArg *arg,
+ Tuplesortstate *state, LogicalTape *tape)
+{
+ IndexTuple buftup;
+ int newtupsize;
+ if (arg->nhtids == 0)
+ return;
+
+ if (arg->clean)
+ {
+ writetup_index(state, tape, &arg->bufsorttup);
+ return;
+ }
+
+ Assert(arg->nhtids >= arg->nhtidssorted &&
+ arg->nhtidssorted >= arg->rcvallupto &&
+ arg->rcvallupto >= 1);
+
+ if (arg->nhtids != arg->nhtidssorted)
+ {
+ Assert(arg->nhtids > 1);
+ Assert(arg->rcvallupto >= 0);
+ qsort(arg->htids + arg->rcvallupto, arg->nhtids - arg->rcvallupto,
+ sizeof(ItemPointerData), tidcmp);
+ arg->nhtidssorted = arg->nhtids;
+ }
+
+ buftup = arg->bufsorttup.tuple;
+
+ if (arg->nhtids == 1)
+ {
+ buftup->t_tid = arg->htids[0];
+ buftup->t_info &= ~(INDEX_SIZE_MASK | INDEX_ALT_TID_MASK);
+ buftup->t_info |= arg->postoff;
+ }
+ else
+ {
+ newtupsize = arg->postoff;
+ newtupsize += arg->nhtids * sizeof(ItemPointerData);
+
+ BTreeTupleSetPosting(buftup, arg->nhtids, arg->postoff);
+ buftup->t_info &= ~INDEX_SIZE_MASK;
+ buftup->t_info |= MAXALIGN(newtupsize);
+
+ /* zero any trailing bytes */
+ if (newtupsize != MAXALIGN(newtupsize))
+ {
+ memset((char *) buftup + newtupsize,
+ 0, MAXALIGN(newtupsize) - newtupsize);
+ }
+
+ Assert(_bt_posting_valid(buftup));
+ }
+
+ writetup_index(state, tape, &arg->bufsorttup);
+}
+
+/*
+ * Write nbtree tuples to disk, with deduplication.
+ *
+ * Notes:
+ * - This deduplication only creates tuples up to 1024 bytes in size.
+ */
+static void
+writetup_btree_dedup(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ ItemPointer htids;
+ IndexTuple tuple = (IndexTuple) stup->tuple;
+ TuplesortIndexBTreeArg *arg = base->arg;
+ IndexTuple buftup = (IndexTuple) arg->bufsorttup.tuple;
+ int newbuftupsize,
+ nhtids,
+ tupsize = IndexTupleSize(tuple);
+ bool bufoverflowed = false;
+ bool eqkeyatts;
+
+ Assert(IndexTupleSize((IndexTuple) stup->tuple)
+ == MAXALIGN(IndexTupleSize((IndexTuple) stup->tuple)));
+ Assert(tupsize == MAXALIGN(tupsize));
+
+ Assert(arg->nhtids >= arg->nhtidssorted);
+
+ /*
+ * If we don't have a buffered tuple, we can either buffer it (when it's
+ * small enough for that) or immediately write it out to disk.
+ */
+ if (!PointerIsValid(buftup))
+ {
+ if (CAN_BUFFER_TUPLE(tupsize))
+ {
+ goto store_incoming;
+ }
+ else
+ {
+ writetup_index(state, tape, stup);
+ }
+ return;
+ }
+
+ /* We now know we have a tuple in the buffer */
+
+ Assert(arg->nhtids >= arg->nhtidssorted &&
+ arg->nhtidssorted >= arg->rcvallupto &&
+ arg->rcvallupto >= 1);
+
+ /*
+ * If _keep_natts wants us to keep more than just the key attributes,
+ * we know those key attributes must be equal.
+ */
+ eqkeyatts = _bt_keep_natts_fast(arg->index.indexRel,
+ arg->bufsorttup.tuple,
+ stup->tuple) > arg->nkeyatts;
+
+ if (!eqkeyatts)
+ {
+ /*
+ * The current buffered tuple's attributes aren't equal to the
+ * incoming tuple's key attributes, so we flush the old tuple, and
+ * then try to buffer the incoming tuple.
+ */
+ writetup_btree_dedup_writebuftup(arg, state, tape);
+
+ /*
+ * We buffer and deduplicate tuples only up to a tuple size of 1024B,
+ * so that they fit in SLAB_SLOT_SIZE. If it doesn't fit, we
+ * immediately write the data to disk, as their memory is immediately
+ * freed.
+ */
+ if (CAN_BUFFER_TUPLE(tupsize))
+ {
+ goto store_incoming;
+ }
+ else
+ {
+ writetup_index(state, tape, stup);
+ goto clear_buffer;
+ }
+ return;
+ }
+
+ /*
+ * Now that we're here, the incoming tuple has key attributes equal to
+ * those of the buffered tuple.
+ */
+
+ /*
+ * We copy all new incoming TIDs into the posting list.
+ *
+ * We also have to keep track of the incoming tuple's minimum TID, as we
+ * can still receive any number of TIDs higher than that from future
+ * tuples, so we can only flush those TIDs that we know we can't get new
+ * ones of.
+ */
+ if (BTreeTupleIsPosting(tuple))
+ {
+ nhtids = BTreeTupleGetNPosting(tuple);
+ htids = BTreeTupleGetPosting(tuple);
+ }
+ else
+ {
+ nhtids = 1;
+ htids = &tuple->t_tid;
+ }
+
+ /*
+ * If there are more TIDs total than can fit in the buffer, we merge
+ * all TIDs that can fit, causing us to only partially merge the tuples.
+ *
+ * Note that, in the case of partial merge, we include the lowest TID of
+ * the incoming tuple, as we need to use that TID in the next index tuple
+ * (to make sure any new incoming TIDs do get correctly emitted).
+ */
+ if (nhtids + arg->nhtids > arg->maxposthtids)
+ {
+ /*
+ * We flush if there's not enough space for even 1 TID left, so
+ * incoming tuple must've had more than 1 TID
+ */
+ Assert(BTreeTupleIsPosting(tuple));
+ bufoverflowed = true;
+
+ htids += 1;
+ nhtids = arg->maxposthtids - arg->nhtids;
+ Assert(nhtids > 0);
+ }
+
+ /* Append the to-be-added TIDs to the buffer */
+ memcpy(&arg->htids[arg->nhtids],
+ htids,
+ sizeof(ItemPointerData) * nhtids);
+
+ /*
+ * Admin: If the incoming TIDs sort after the current tail (and all tids
+ * are already ordered) we won't have to re-sort the data later.
+ */
+ if (arg->nhtids == arg->nhtidssorted)
+ {
+ int old_nhtids = arg->nhtids;
+ arg->nhtids += nhtids;
+
+ if (ItemPointerCompare(&arg->htids[old_nhtids - 1], htids) < 0)
+ {
+ arg->rcvallupto = old_nhtids + 1;
+ arg->nhtidssorted = arg->nhtids;
+ }
+ }
+ else
+ {
+ arg->nhtids += nhtids;
+ }
+
+ /* changes were applied, header needs reconstruction */
+ arg->clean = false;
+
+ /* Calculate the buffered tuple's size */
+ newbuftupsize = arg->postoff;
+ newbuftupsize += arg->nhtids * sizeof(ItemPointerData);
+
+ if (MAXALIGN(newbuftupsize + sizeof(ItemPointerData)) <= BT_TUPLE_WRITE_BUFFER_SIZE)
+ {
+ /* there's space for another tuple */
+ Assert(!bufoverflowed);
+ if (BTreeTupleIsPosting(tuple))
+ arg->dedupsaved += arg->postoff;
+ else
+ arg->dedupsaved += arg->postoff - sizeof(ItemPointerData);
+
+ Assert(arg->nhtids >= arg->nhtidssorted &&
+ arg->nhtidssorted >= arg->rcvallupto &&
+ arg->rcvallupto >= 1);
+ return;
+ }
+
+ writetup_btree_dedup_writebuftup(arg, state, tape);
+
+ if (bufoverflowed)
+ {
+ /*
+ * We now have to move the new TIDs into the old tuple.
+ * Remember that we only put the TIDs from index 1..=tupnposting
+ * into the now-flushed tuple, so we have to copy TIDs at index 0
+ * and after tupnposting.
+ */
+ int gapsz = nhtids;
+ int newntids = BTreeTupleGetNPosting(tuple) - gapsz;
+
+ Assert(htids - 1 == BTreeTupleGetPosting(tuple));
+ Assert(newntids > 0);
+
+ htids = BTreeTupleGetPosting(tuple);
+ arg->htids[0] = htids[0];
+ memcpy(&arg->htids[1],
+ &htids[gapsz + 1],
+ (newntids - 1) * sizeof(ItemPointerData));
+ arg->nhtids = newntids;
+ arg->nhtidssorted = newntids;
+ arg->rcvallupto = 1;
+
+ newbuftupsize = arg->postoff;
+
+ Assert(newbuftupsize < INDEX_SIZE_MASK);
+
+ /* Update buffered tuple's info */
+ if (newntids == 1)
+ {
+ buftup->t_tid = arg->htids[0];
+ /* clear alt tid mask from previous iterations */
+ buftup->t_info &= ~INDEX_ALT_TID_MASK;
+ }
+ else
+ {
+ newbuftupsize += newntids * sizeof(ItemPointerData);
+ BTreeTupleSetPosting(buftup, newntids, arg->postoff);
+ }
+
+ buftup->t_info &= ~INDEX_SIZE_MASK;
+ buftup->t_info |= MAXALIGN(newbuftupsize);
+
+ Assert(IndexTupleSize(buftup) == MAXALIGN(newbuftupsize));
+
+ /* clear out final bytes of trailing maxalign quantum */
+ if (MAXALIGN(newbuftupsize) != newbuftupsize)
+ {
+ memset((char *) arg->tupbuf + newbuftupsize,
+ 0, MAXALIGN(newbuftupsize) - newbuftupsize);
+ }
+
+ if (newntids > 1)
+ Assert(_bt_posting_valid(arg->bufsorttup.tuple));
+ }
+ else
+ {
+ /*
+ * No tids in the tuple, so make sure it isn't considered enough of a
+ * tuple that we might consider writing to disk
+ */
+ goto clear_buffer;
+ }
+ return;
+
+clear_buffer:
+ arg->bufsorttup.tuple = NULL;
+ arg->bufsorttup.datum1 = 0;
+ arg->bufsorttup.isnull1 = false;
+ arg->nhtids = 0;
+ arg->nhtidssorted = 0;
+ arg->rcvallupto = 0;
+ arg->htids = NULL;
+ arg->postoff = 0;
+ arg->maxposthtids = 0;
+ arg->clean = true;
+ return;
+
+store_incoming:
+ /* Consider the current state empty, and write the new tuple into it */
+ memcpy(arg->tupbuf, tuple, tupsize);
+
+ arg->bufsorttup.tuple = arg->tupbuf;
+ arg->bufsorttup.datum1 = index_getattr(arg->bufsorttup.tuple, 1,
+ arg->index.indexRel->rd_att,
+ &arg->bufsorttup.isnull1);
+
+ if (BTreeTupleIsPosting(tuple))
+ {
+ arg->postoff = BTreeTupleGetPostingOffset(arg->bufsorttup.tuple);
+ Assert(arg->postoff == MAXALIGN(arg->postoff));
+ arg->nhtids = BTreeTupleGetNPosting(arg->bufsorttup.tuple);
+ arg->htids = BTreeTupleGetPosting(arg->bufsorttup.tuple);
+ }
+ else
+ {
+ arg->nhtids = 1;
+ arg->htids = (ItemPointer) ((char *) arg->bufsorttup.tuple + tupsize);
+ arg->htids[0] = tuple->t_tid;
+ arg->postoff = tupsize;
+ }
+ arg->nhtidssorted = arg->nhtids;
+ arg->rcvallupto = 1;
+ arg->clean = true;
+
+ arg->maxposthtids = MAXALIGN_DOWN(BT_TUPLE_WRITE_BUFFER_SIZE -
+ arg->postoff) / sizeof(ItemPointerData);
+
+ Assert(arg->maxposthtids > 1);
+
+ return;
+}
+
+static void
+flushwrites_btree_dedup(Tuplesortstate *state, LogicalTape *tape)
+{
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ TuplesortIndexBTreeArg *arg = base->arg;
+
+ if (PointerIsValid(arg->bufsorttup.tuple))
+ {
+ if (arg->nhtids != arg->nhtidssorted)
+ qsort(arg->htids + arg->rcvallupto,
+ arg->nhtids - arg->rcvallupto, sizeof(ItemPointerData),
+ tidcmp);
+ writetup_index(state, tape, &arg->bufsorttup);
+ }
+
+ arg->bufsorttup.tuple = NULL;
+ arg->bufsorttup.datum1 = (Datum) 0;
+ arg->bufsorttup.isnull1 = false;
+ arg->bufsorttup.srctape = -1;
+ arg->nhtids = 0;
+ arg->nhtidssorted = 0;
+ arg->rcvallupto = 0;
+ arg->htids = NULL;
+}
+
/*
* Routines specialized for BrinTuple case
*/
--
2.45.2