Index: src/backend/utils/sort/tuplesort.c =================================================================== RCS file: /home/postgres/cvsrepo/pgsql/src/backend/utils/sort/tuplesort.c,v retrieving revision 1.81 diff -c -r1.81 tuplesort.c *** src/backend/utils/sort/tuplesort.c 1 Jan 2008 19:45:55 -0000 1.81 --- src/backend/utils/sort/tuplesort.c 26 Feb 2008 00:57:35 -0000 *************** *** 262,276 **** * never in the heap and are used to hold pre-read tuples.) In state * SORTEDONTAPE, the array is not used. */ SortTuple *memtuples; /* array of SortTuple structs */ ! int memtupcount; /* number of tuples currently present */ int memtupsize; /* allocated length of memtuples array */ /* * While building initial runs, this is the current output run number * (starting at 0). Afterwards, it is the number of initial runs we made. */ ! int currentRun; /* * Unless otherwise noted, all pointer variables below are pointers to --- 262,301 ---- * never in the heap and are used to hold pre-read tuples.) In state * SORTEDONTAPE, the array is not used. */ + + /* + * The 'memtuples' array will be arranged into 2 heaps, respectively HeapUP and + * HeapDOWN. Actually the 'HeapUP' and 'HeapDOWN' variables will mark the root of those + * two heaps. Consider for example divisionIndex = memtupsize/2 -1, then HeapUP + * will be initially arranged in indexes [0,divisionIndex] while HeapDOWN in + * [divisionIndex+1,memtupsize-1]. HeapDOWN is an ordinary heap with + * its root placed at index divisionIndex+1 and its leaves at indexes + * towards memtupsize-1. On the other hand, HeapUP is a sort of "upside down" + * heap, that is its root lays at index divisionIndex + * and its leaves toward index 0 of the memtuples array. + */ SortTuple *memtuples; /* array of SortTuple structs */ ! int HeapUP; /* index of HeapUP's root into memtuples */ ! int HeapDOWN; /* index of HeapDOWN's root into memtuples */ ! ! int memtupcount; /* # of tuples currently present in 'memtuples'*/ ! int memtupcountUP; /* # of tuples currently present in HeapUP*/ ! int memtupcountDOWN; /* # of tuples currently present in HeapDOWN*/ ! ! bool HeapUPactive; //Are we still using HeapUP to build the current logical run? ! bool HeapDOWNactive; //Are we still using HeapDOWN to build the current logical run? ! int memtupsize; /* allocated length of memtuples array */ + int memtupsizeUP; /* allocated length of memtuples array */ + int memtupsizeDOWN; /* allocated length of memtuples array */ /* * While building initial runs, this is the current output run number * (starting at 0). Afterwards, it is the number of initial runs we made. */ ! int currentRun; //left temporary here for mergeruns() use ! int currentRunUP; ! int currentRunDOWN; /* * Unless otherwise noted, all pointer variables below are pointers to *************** *** 307,312 **** --- 332,339 ---- */ int Level; /* Knuth's l */ int destTape; /* current output tape (Knuth's j, less 1) */ + int destTapeUP; // temp tape to store the current runUP + int destTapeDOWN; // temp tape to store the current runDOWN int *tp_fib; /* Target Fibonacci run counts (A[]) */ int *tp_runs; /* # of real runs on each tape */ int *tp_dummy; /* # of dummy runs for each tape (D[]) */ *************** *** 422,432 **** --- 449,474 ---- static void mergepreread(Tuplesortstate *state); static void mergeprereadone(Tuplesortstate *state, int srcTape); static void dumptuples(Tuplesortstate *state, bool alltuples); + static void dumptuples_UP(Tuplesortstate *state, bool alltuples); + static void dumptuples_DOWN(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state); static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex); + static void tuplesort_heap_insert_UP(Tuplesortstate *state, SortTuple *tuple, + int tupleindex, bool checkIndex); + static void tuplesort_heap_insert_DOWN(Tuplesortstate *state, SortTuple *tuple, + int tupleindex, bool checkIndex); static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); + static void tuplesort_heap_siftup_UP(Tuplesortstate *state, bool checkIndex); + static void tuplesort_heap_siftup_DOWN(Tuplesortstate *state, bool checkIndex); + + static int leftSonUP(int nodeIndex, int rootIndexUP); + static int leftSonDOWN(int nodeIndex, int rootIndexUP); + static int rightSonUP(int nodeIndex, int rootIndexUP); + static int rightSonDOWN(int nodeIndex, int rootIndexUP); + + void formLogicalRun(Tuplesortstate *state); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); static int comparetup_heap(const SortTuple *a, const SortTuple *b, *************** *** 524,530 **** if (LACKMEM(state)) elog(ERROR, "insufficient memory allowed for sort"); ! state->currentRun = 0; /* * maxTapes, tapeRange, and Algorithm D variables will be initialized by --- 566,575 ---- if (LACKMEM(state)) elog(ERROR, "insufficient memory allowed for sort"); ! // state->currentRun = 0; ! ! state->currentRunUP = 0; ! state->currentRunDOWN = 1; /* * maxTapes, tapeRange, and Algorithm D variables will be initialized by *************** *** 908,913 **** --- 953,960 ---- static void puttuple_common(Tuplesortstate *state, SortTuple *tuple) { + bool inHeapUP = false; + bool inHeapDOWN = false; switch (state->status) { case TSS_INITIAL: *************** *** 964,972 **** inittapes(state); /* ! * Dump tuples until we are back under the limit. */ ! dumptuples(state, false); break; case TSS_BOUNDED: --- 1011,1020 ---- inittapes(state); /* ! * No need to dumptuple() here. In TSS_BUILDRUNS we'll dump ! * tuples just from the heap hosting the next input tuple. */ ! //dumptuples(state, false); break; case TSS_BOUNDED: *************** *** 1009,1024 **** * this point; see dumptuples. */ Assert(state->memtupcount > 0); ! if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0) ! tuplesort_heap_insert(state, tuple, state->currentRun, true); ! else ! tuplesort_heap_insert(state, tuple, state->currentRun + 1, true); ! /* ! * If we are over the memory limit, dump tuples till we're under. ! */ ! dumptuples(state, false); ! break; default: elog(ERROR, "invalid tuplesort state"); --- 1057,1097 ---- * this point; see dumptuples. */ Assert(state->memtupcount > 0); ! if(state->HeapUPactive && COMPARETUP(state, tuple, &state->memtuples[state->HeapUP]) <= 0) ! { ! dumptuples_UP(state, false); ! tuplesort_heap_insert_UP(state, tuple, state->currentRunUP, true); ! inHeapUP = true; ! break; ! } ! if (state->HeapDOWNactive && COMPARETUP(state, tuple, &state->memtuples[state->HeapDOWN]) >= 0) ! { ! dumptuples_DOWN(state, false); ! tuplesort_heap_insert_DOWN(state, tuple, state->currentRunDOWN, true); ! inHeapDOWN = true; ! break; ! } ! ! /* IF WE GET TO THIS POINT IT MEANS THE INPUT TUPLE WON'T FORM PART OF THE CURRENT RUN */ ! // At the moment we just place it into HeapUP at first. Waiting for a more efficient ! // to manage that kind of element. ! ! if(state->HeapUPactive) ! { ! dumptuples_UP(state, false); ! tuplesort_heap_insert_UP(state, tuple, state->currentRunUP+2, true); ! inHeapUP = true; ! break; ! } ! ! if(state->HeapDOWNactive) ! { ! dumptuples_DOWN(state, false); ! tuplesort_heap_insert_DOWN(state, tuple, state->currentRunDOWN+2, true); ! inHeapUP = true; ! break; ! } default: elog(ERROR, "invalid tuplesort state"); *************** *** 1519,1524 **** --- 1592,1623 ---- } Assert(state->memtupcount == ntuples); + // NEW: qsort the unsorted array + if (state->memtupcount > 1) + qsort_arg((void *) state->memtuples, state->memtupcount, + sizeof(SortTuple), + (qsort_arg_comparator) state->comparetup, + (void *) state); + + + // dividing logically 'memtuples' into two heaps + state->memtupcountUP = state->memtupcount >> 1; + state->memtupcountDOWN = state->memtupcount - state->memtupcountUP; + state->HeapUP = state->memtupcountUP - 1; + state->HeapDOWN = state->memtupcountUP; + + state->HeapUPactive = true; + state->HeapDOWNactive = true; + + //initializing run number for each element into HeapUP + state->currentRunUP = 0; + for(j=0; j < state->memtupcountUP; j++) + state->memtuples[j].tupindex = state->currentRunUP; + + //initializing run number for each element into HeapDOWN + state->currentRunDOWN = 1; + for(j=0; jmemtupcountDOWN; j++) + state->memtuples[state->memtupcountUP + j].tupindex = state->currentRunDOWN; state->currentRun = 0; /* *************** *** 1557,1566 **** --- 1656,1681 ---- { state->destTape++; return; + if(state->destTape+2 < state->tapeRange) + { + state->destTapeUP = state->destTape + 1; + state->destTapeUP = state->destTape + 2; + return; + } + if(state->destTape-2 > 0) + { + state->destTapeUP = state->destTape - 1; + state->destTapeUP = state->destTape - 2; + return; + } + + elog(ERROR, "MANOLO -- NOT ENOUGH TAPES?"); } if (state->tp_dummy[state->destTape] != 0) { state->destTape = 0; + state->destTapeUP = 1; + state->destTapeUP = 2; return; } *************** *** 1573,1578 **** --- 1688,1695 ---- state->tp_fib[j] = a + state->tp_fib[j + 1]; } state->destTape = 0; + state->destTapeUP = 1; + state->destTapeUP = 2; } /* *************** *** 2023,2028 **** --- 2140,2295 ---- } } + + static void + dumptuples_UP(Tuplesortstate *state, bool alltuples) + { + int rootIndexUP = state->HeapUP; + bool randomAccess; + + /* Storing the original value of randomAccess */ + randomAccess = state->randomAccess; + /* That allows backward reading tuples */ + state->randomAccess = true; + + while (alltuples || + (LACKMEM(state) && state->memtupcountUP > 1) || + state->memtupcountUP >= state->memtupsizeUP) + { + /* + * Dump the heap's frontmost entry, and sift up to remove it from the + * heap. + */ + Assert(state->memtupcountUP > 0); + WRITETUP(state, state->tp_tapenum[state->destTapeUP], + &state->memtuples[rootIndexUP]); + tuplesort_heap_siftup_UP(state, true); + + /* + * If the heap is empty *or* top run number has changed, we've + * finished the current run. + */ + if (state->memtupcountUP == 0 || + state->currentRunUP != state->memtuples[rootIndexUP].tupindex) + { + markrunend(state, state->tp_tapenum[state->destTapeUP]); + + #ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "finished writing%s run UP %d to tape %d: %s", + (state->memtupcountUP == 0) ? " final" : "", + state->currentRunUP, state->destTapeUP, + pg_rusage_show(&state->ru_start)); + #endif + state->currentRunUP += 2; + state->HeapUPactive = false; + /* + * Done if heap is empty, else prepare for new LOGICAL run, + * in case PHYSICAL run DOWN is over too. + */ + if (state->memtupcountUP == 0) + break; + if (!state->HeapDOWNactive) + { + //do we really need to update 'currentRun'? + state->currentRun = state->currentRunUP - 2; + + formLogicalRun(state); + + state->tp_runs[state->destTape]++; + state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ + state->HeapDOWNactive = true; + state->HeapUPactive = true; + + Assert(state->currentRunUP == state->memtuples[rootIndexUP].tupindex); + selectnewtape(state); + } + } + } + + /* Restoring the original value of randomAccess */ + state->randomAccess = randomAccess; + } + + void formLogicalRun(Tuplesortstate *state) + { + + + #ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "finished writing%s LOGICAL run %d to tape %d: %s", + (state->memtupcount == 0) ? " final" : "", + state->currentRun, state->destTape, + pg_rusage_show(&state->ru_start)); + #endif + + } + + + static void + dumptuples_DOWN(Tuplesortstate *state, bool alltuples) + { + int rootIndexDOWN = state->HeapDOWN; + + while (alltuples || + (LACKMEM(state) && state->memtupcountDOWN > 1) || + state->memtupcountDOWN >= state->memtupsizeDOWN) + { + /* + * Dump the heap's frontmost entry, and sift up to remove it from the + * heap. + */ + Assert(state->memtupcountDOWN > 0); + WRITETUP(state, state->tp_tapenum[state->destTapeDOWN], + &state->memtuples[rootIndexDOWN]); + tuplesort_heap_siftup_DOWN(state, true); + + /* + * If the heap is empty *or* top run number has changed, we've + * finished the current run. + */ + + if (state->memtupcountDOWN == 0 || + state->currentRunDOWN != state->memtuples[rootIndexDOWN].tupindex) + { + markrunend(state, state->tp_tapenum[state->destTapeDOWN]); + + #ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "finished writing%s run DOWN %d to tape %d: %s", + (state->memtupcountDOWN == 0) ? " final" : "", + state->currentRunDOWN, state->destTape, + pg_rusage_show(&state->ru_start)); + #endif + + state->currentRunDOWN += 2; + state->HeapDOWNactive = false; + /* + * Done if heap is empty, else prepare for new LOGICAL run, + * in case PHYSICAL run UP is over too. + */ + if (state->memtupcountDOWN == 0) + break; + if(!state->HeapUPactive) + { + formLogicalRun(state); + + state->tp_runs[state->destTape]++; + state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ + state->HeapUPactive = true; + state->HeapDOWNactive = true; + + Assert(state->currentRunDOWN == state->memtuples[rootIndexDOWN].tupindex); + selectnewtape(state); + + } + } + } + } + + + + /* * tuplesort_rescan - rewind and replay the scan */ *************** *** 2331,2336 **** --- 2598,2673 ---- memtuples[j] = *tuple; } + static void + tuplesort_heap_insert_DOWN(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex) + { + SortTuple *memtuples; + int j; + + /* + * Save the tupleindex --- see notes above about writing on *tuple. It's a + * historical artifact that tupleindex is passed as a separate argument + * and not in *tuple, but it's notationally convenient so let's leave it + * that way. + */ + tuple->tupindex = tupleindex; + + memtuples = state->memtuples; + Assert(state->memtupcountDOWN < state->memtupsizeDOWN); + + /* + * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is + * using 1-based array indexes, not 0-based. + */ + j = state->HeapDOWN + state->memtupcountDOWN++; + while (j > state->HeapDOWN) + { + int i = (j - 1) >> 1; + + if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0) + break; + memtuples[j] = memtuples[i]; + j = i; + } + memtuples[j] = *tuple; + } + + + static void + tuplesort_heap_insert_UP(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex) + { + SortTuple *memtuples; + int j, + HeapUP; + + /* + * Save the tupleindex --- see notes above about writing on *tuple. It's a + * historical artifact that tupleindex is passed as a separate argument + * and not in *tuple, but it's notationally convenient so let's leave it + * that way. + */ + tuple->tupindex = tupleindex; + + memtuples = state->memtuples; + Assert(state->memtupcountUP < state->memtupsizeUP); + + /* + * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is + * using 1-based array indexes, not 0-based. + */ + HeapUP = state->HeapUP; + j = HeapUP - state->memtupcountUP++; + while (j < HeapUP) + { + int i = (j - 1) >> 1; + + if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0) + break; + memtuples[j] = memtuples[i]; + j = i; + } + memtuples[j] = *tuple; + } /* * The tuple at state->memtuples[0] has been removed from the heap. * Decrement memtupcount, and sift up to maintain the heap invariant. *************** *** 2364,2369 **** --- 2701,2846 ---- } memtuples[i] = *tuple; } + static int leftSonUP(int nodeIndex, int rootIndexUP) + { return (nodeIndex * 2 - 1 - rootIndexUP); } + + static int rightSonUP(int nodeIndex, int rootIndexUP) + { return (nodeIndex * 2 - 2 - rootIndexUP); } + + static int leftSonDOWN(int nodeIndex, int rootIndexDOWN) + { return (nodeIndex * 2 + 2 - rootIndexDOWN); } + + static int rightSonDOWN(int nodeIndex, int rootIndexDOWN) + { return (nodeIndex * 2 + 1 - rootIndexDOWN); } + + + static void + tuplesort_heap_siftup_UP(Tuplesortstate *state, bool checkIndex) + { + SortTuple *memtuples = state->memtuples; + SortTuple *tuple; + int i, + n, //index of the last leaf of HeapUP + memtupcount, + memtupcountUP, memtupsizeUP, HeapUP; + bool runningAfter; + + memtupcount = --state->memtupcount; + memtupcountUP = --state->memtupcountUP; + if (memtupcountUP <= 0) + return; + memtupsizeUP = state->memtupsizeUP; + HeapUP = state->HeapUP; + + if(memtupsizeUP >= memtupcountUP) + { //index of the last element of the heap + n = HeapUP - memtupcountUP + 1; + tuple = &memtuples[n]; /* tuple that must be reinserted */ + runningAfter = false; + } + else + { + n = memtupcount - (memtupcountUP - memtupsizeUP) -1 ; + runningAfter = true; + tuple = &memtuples[n]; /* tuple that must be reinserted */ + } + + + i = HeapUP; /* i is where the "hole" is */ + for (;;) + { + int j = leftSonUP(HeapUP, i); + + if ( !runningAfter && j > n ) + { + if (j - 1 < n && + HEAPCOMPARE(&memtuples[j], &memtuples[j - 1]) < 0 ) + j--; + if (HEAPCOMPARE(tuple, &memtuples[j]) >= 0) + break; + memtuples[i] = memtuples[j]; + i = j; + continue; + } + /*HEAPS RUNNING AFTER THEM EACH OTHER + if (runningAfter && ) + if (j + 1 < n && + HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0) + j++; + if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0) + break; + memtuples[i] = memtuples[j]; + i = j; + */ + } + memtuples[i] = *tuple; + } + + + + + static void + tuplesort_heap_siftup_DOWN(Tuplesortstate *state, bool checkIndex) + { + SortTuple *memtuples = state->memtuples; + SortTuple *tuple; + int i, + n, //index of the last leaf of HeapUP + HeapDOWN, + memtupcountDOWN, memtupsizeDOWN, memtupcount; + bool runningAfter; + + memtupcount = --state->memtupcount; + memtupcountDOWN = --state->memtupcountDOWN; + if (memtupcountDOWN <= 0) + return; + memtupsizeDOWN = state->memtupsizeDOWN; + HeapDOWN = state->HeapDOWN; + + if(memtupsizeDOWN >= memtupcountDOWN) + { + n = HeapDOWN + memtupcountDOWN - 1; + tuple = &memtuples[n]; /* tuple that must be reinserted */ + runningAfter = false; + } + else + { + n = memtupcountDOWN - (memtupsizeDOWN - 1) ; + runningAfter = true; + tuple = &memtuples[n]; /* tuple that must be reinserted */ + } + + + i = HeapDOWN; /* i is where the "hole" is */ + for (;;) + { + int j = rightSonDOWN(HeapDOWN,i); + + if ( !runningAfter && j < n ) + { + if (j + 1 < n && + HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0 ) + j++; + if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0) + break; + memtuples[i] = memtuples[j]; + i = j; + continue; + } + + /*HEAPS RUNNING AFTER THEM EACH OTHER + if (runningAfter && ) + if (j + 1 < n && + HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0) + j++; + if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0) + break; + memtuples[i] = memtuples[j]; + i = j; + */ + } + memtuples[i] = *tuple; + } /*