*** ./pgsql/src/backend/utils/sort/tuplesort.c	2008-01-01 20:45:55.000000000 +0100
--- ./postgresql-8.2.5-REFINED/src/backend/utils/sort/tuplesort.c	2008-02-06 23:53:46.000000000 +0100
***************
*** 87,110 ****
   * above.  Nonetheless, with large workMem we can have many tapes.
   *
   *
!  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
   * IDENTIFICATION
!  *	  $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.81 2008/01/01 19:45:55 momjian Exp $
   *
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
- #include <limits.h>
- 
  #include "access/heapam.h"
  #include "access/nbtree.h"
  #include "catalog/pg_amop.h"
  #include "catalog/pg_operator.h"
- #include "commands/tablespace.h"
  #include "miscadmin.h"
  #include "utils/datum.h"
  #include "utils/logtape.h"
--- 87,107 ----
   * above.  Nonetheless, with large workMem we can have many tapes.
   *
   *
!  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
   * IDENTIFICATION
!  *	  $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.70 2006/10/04 00:30:04 momjian Exp $
   *
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
  #include "access/heapam.h"
  #include "access/nbtree.h"
  #include "catalog/pg_amop.h"
  #include "catalog/pg_operator.h"
  #include "miscadmin.h"
  #include "utils/datum.h"
  #include "utils/logtape.h"
***************
*** 115,127 ****
  #include "utils/tuplesort.h"
  
  
! /* GUC variables */
  #ifdef TRACE_SORT
  bool		trace_sort = false;
  #endif
- #ifdef DEBUG_BOUNDED_SORT
- bool		optimize_bounded_sort = true;
- #endif
  
  
  /*
--- 112,121 ----
  #include "utils/tuplesort.h"
  
  
! /* GUC variable */
  #ifdef TRACE_SORT
  bool		trace_sort = false;
  #endif
  
  
  /*
***************
*** 165,171 ****
  typedef enum
  {
  	TSS_INITIAL,				/* Loading tuples; still within memory limit */
- 	TSS_BOUNDED,				/* Loading tuples into bounded-size heap */
  	TSS_BUILDRUNS,				/* Loading tuples; writing to tape */
  	TSS_SORTEDINMEM,			/* Sort completed entirely in memory */
  	TSS_SORTEDONTAPE,			/* Sort completed, final run is on tape */
--- 159,164 ----
***************
*** 195,204 ****
  	TupSortStatus status;		/* enumerated value as shown above */
  	int			nKeys;			/* number of columns in sort key */
  	bool		randomAccess;	/* did caller request random access? */
- 	bool		bounded;		/* did caller specify a maximum number of
- 								 * tuples to return? */
- 	bool		boundUsed;		/* true if we made use of a bounded heap */
- 	int			bound;			/* if bounded, the maximum number of tuples */
  	long		availMem;		/* remaining memory available, in bytes */
  	long		allowedMem;		/* total memory allowed, in bytes */
  	int			maxTapes;		/* number of tapes (Knuth's T) */
--- 188,193 ----
***************
*** 246,258 ****
  										int tapenum, unsigned int len);
  
  	/*
- 	 * Function to reverse the sort direction from its current state. (We
- 	 * could dispense with this if we wanted to enforce that all variants
- 	 * represent the sort key information alike.)
- 	 */
- 	void		(*reversedirection) (Tuplesortstate *state);
- 
- 	/*
  	 * This array holds the tuples now in sort memory.	If we are in state
  	 * INITIAL, the tuples are in no particular order; if we are in state
  	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
--- 235,240 ----
***************
*** 261,276 ****
  	 * heap --- during merge passes, memtuples[] entries beyond tapeRange are
  	 * never in the heap and are used to hold pre-read tuples.)  In state
  	 * SORTEDONTAPE, the array is not used.
! 	 */
! 	SortTuple  *memtuples;		/* array of SortTuple structs */
! 	int			memtupcount;	/* number of tuples currently present */
! 	int			memtupsize;		/* allocated length of memtuples array */
  
  	/*
  	 * While building initial runs, this is the current output run number
  	 * (starting at 0).  Afterwards, it is the number of initial runs we made.
! 	 */
  	int			currentRun;
  
  	/*
  	 * Unless otherwise noted, all pointer variables below are pointers to
--- 243,292 ----
  	 * heap --- during merge passes, memtuples[] entries beyond tapeRange are
  	 * never in the heap and are used to hold pre-read tuples.)  In state
  	 * SORTEDONTAPE, the array is not used.
! 	 */
! 
!     /*
! 	 * The memtuples array contains all of the tuples currently present into
! 	 * memory and it will be arranged into two heaps respectively HeapUP and
! 	 * HeapDOWN. So HeapUP and HeapDOWN will actually point to the root of those
! 	 * two heaps. Consider for example divisionIndex = memtupsize/2 -1, then HeapUP
! 	 * will be initially arranged in indexes [0,divisionIndex] while HeapDOWN in
! 	 * [divisionIndex+1,memtupsize-1]. HeapDOWN is an ordinary heap with initially
! 	 * its root placed at index divisionIndex+1 and its leaves at indexes
! 	 * towards memtupsize-1. On the other hand, HeapUP is a sort of "upside down"
! 	 * heap, that is initially its root lays initially at index divisionIndex
! 	 * and its leaves toward index 0 of the memtuples array.
! 	 */
! 	SortTuple  *memtuples;		/* array of SortTuple structs */
! 	int         HeapUP;	        /* index of HeapUP's root into memtuples */
! 	int         HeapDOWN;       /* index of HeapDOWN's root into memtuples */
! 
! 	int			memtupcount;    /* # of tuples currently present in 'memtuples'*/
! 	int         memtupcountUP;  /* # of tuples currently present in HeapUP*/
! 	int         memtupcountDOWN; /* # of tuples currently present in HeapDOWN*/
! 
! 	bool        HeapUPactive;   //Are we still using HeapUP to build the current logical run?
! 	bool        HeapDOWNactive; //Are we still using HeapDOWN to build the current logical run?
! 
! 	int			memtupsize;		/* allocated length of memtuples array */
! 	int			memtupsizeUP;		/* allocated length of memtuples array */
! 	int			memtupsizeDOWN;		/* allocated length of memtuples array */
  
  	/*
  	 * While building initial runs, this is the current output run number
  	 * (starting at 0).  Afterwards, it is the number of initial runs we made.
! 	 *
! 	 *  ++++++++++++++++++++++++++++++++++++++++++++++++
! 	 *                   MODIFIED
! 
  	int			currentRun;
+ 	 *
+         ++++++++++++++++++++++++++++++++++++++++++++++++
+      */
+ 
+      int currentRun; //left temporary here for 'mergeruns() use'
+      int currentRunUP;
+      int currentRunDOWN;
  
  	/*
  	 * Unless otherwise noted, all pointer variables below are pointers to
***************
*** 333,338 ****
--- 349,355 ----
  	 */
  	TupleDesc	tupDesc;
  	ScanKey		scanKeys;		/* array of length nKeys */
+ 	SortFunctionKind *sortFnKinds;		/* array of length nKeys */
  
  	/*
  	 * These variables are specific to the IndexTuple case; they are set by
***************
*** 347,354 ****
  	 * tuplesort_begin_datum and used only by the DatumTuple routines.
  	 */
  	Oid			datumType;
  	FmgrInfo	sortOpFn;		/* cached lookup data for sortOperator */
! 	int			sortFnFlags;	/* equivalent to sk_flags */
  	/* we need typelen and byval in order to know how to copy the Datums. */
  	int			datumTypeLen;
  	bool		datumTypeByVal;
--- 364,372 ----
  	 * tuplesort_begin_datum and used only by the DatumTuple routines.
  	 */
  	Oid			datumType;
+ 	Oid			sortOperator;
  	FmgrInfo	sortOpFn;		/* cached lookup data for sortOperator */
! 	SortFunctionKind sortFnKind;
  	/* we need typelen and byval in order to know how to copy the Datums. */
  	int			datumTypeLen;
  	bool		datumTypeByVal;
***************
*** 365,371 ****
  #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
  #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
  #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
- #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
  #define LACKMEM(state)		((state)->availMem < 0)
  #define USEMEM(state,amt)	((state)->availMem -= (amt))
  #define FREEMEM(state,amt)	((state)->availMem += (amt))
--- 383,388 ----
***************
*** 420,432 ****
  static void mergeonerun(Tuplesortstate *state);
  static void beginmerge(Tuplesortstate *state);
  static void mergepreread(Tuplesortstate *state);
! static void mergeprereadone(Tuplesortstate *state, int srcTape);
! static void dumptuples(Tuplesortstate *state, bool alltuples);
! static void make_bounded_heap(Tuplesortstate *state);
! static void sort_bounded_heap(Tuplesortstate *state);
  static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
! 					  int tupleindex, bool checkIndex);
! static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
  static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
  static void markrunend(Tuplesortstate *state, int tapenum);
  static int comparetup_heap(const SortTuple *a, const SortTuple *b,
--- 437,464 ----
  static void mergeonerun(Tuplesortstate *state);
  static void beginmerge(Tuplesortstate *state);
  static void mergepreread(Tuplesortstate *state);
! static void mergeprereadone(Tuplesortstate *state, int srcTape);
!     //DUMPTUPLES
! static void dumptuples(Tuplesortstate *state, bool alltuples);
! static void dumptuples_UP(Tuplesortstate *state, bool alltuples);
! static void dumptuples_DOWN(Tuplesortstate *state, bool alltuples);
!     //TUPLESORT_HEAP_INSERT
  static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
! 					  int tupleindex, bool checkIndex);
! static void tuplesort_heap_insert_UP(Tuplesortstate *state, SortTuple *tuple,
!                       int tupleindex, bool checkIndex);
! static void tuplesort_heap_insert_DOWN(Tuplesortstate *state, SortTuple *tuple,
!                       int tupleindex, bool checkIndex);
!     //TUPLESORT_HEAP_SIFTUP
! static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
! static void tuplesort_heap_siftup_UP(Tuplesortstate *state, bool checkIndex);
! static void tuplesort_heap_siftup_DOWN(Tuplesortstate *state, bool checkIndex);
!     //RIGHT/LEFT SON FUNCTIONS
! static int leftSonUP(int nodeIndex, int rootIndexUP);
! static int leftSonDOWN(int nodeIndex, int rootIndexUP);
! static int rightSonUP(int nodeIndex, int rootIndexUP);
! static int rightSonDOWN(int nodeIndex, int rootIndexUP);
! 
  static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
  static void markrunend(Tuplesortstate *state, int tapenum);
  static int comparetup_heap(const SortTuple *a, const SortTuple *b,
***************
*** 436,442 ****
  			  SortTuple *stup);
  static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
  			 int tapenum, unsigned int len);
- static void reversedirection_heap(Tuplesortstate *state);
  static int comparetup_index(const SortTuple *a, const SortTuple *b,
  				 Tuplesortstate *state);
  static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
--- 468,473 ----
***************
*** 444,450 ****
  			   SortTuple *stup);
  static void readtup_index(Tuplesortstate *state, SortTuple *stup,
  			  int tapenum, unsigned int len);
- static void reversedirection_index(Tuplesortstate *state);
  static int comparetup_datum(const SortTuple *a, const SortTuple *b,
  				 Tuplesortstate *state);
  static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
--- 475,480 ----
***************
*** 452,459 ****
  			   SortTuple *stup);
  static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
  			  int tapenum, unsigned int len);
- static void reversedirection_datum(Tuplesortstate *state);
- static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  
  
  /*
--- 482,487 ----
***************
*** 507,514 ****
  
  	state->status = TSS_INITIAL;
  	state->randomAccess = randomAccess;
- 	state->bounded = false;
- 	state->boundUsed = false;
  	state->allowedMem = workMem * 1024L;
  	state->availMem = state->allowedMem;
  	state->sortcontext = sortcontext;
--- 535,540 ----
***************
*** 524,530 ****
--- 550,565 ----
  	if (LACKMEM(state))
  		elog(ERROR, "insufficient memory allowed for sort");
  
+     /*  ++++++++++++++++++++++++++++++++++++++++++++++++
+ 	 *                   MODIFIED
+ 	 *
  	state->currentRun = 0;
+ 	 *
+      *  ++++++++++++++++++++++++++++++++++++++++++++++++
+      */
+ 
+ 	state->currentRunUP = 0;
+ 	state->currentRunDOWN = 1;
  
  	/*
  	 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
***************
*** 540,547 ****
  
  Tuplesortstate *
  tuplesort_begin_heap(TupleDesc tupDesc,
! 					 int nkeys, AttrNumber *attNums,
! 					 Oid *sortOperators, bool *nullsFirstFlags,
  					 int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
--- 575,582 ----
  
  Tuplesortstate *
  tuplesort_begin_heap(TupleDesc tupDesc,
! 					 int nkeys,
! 					 Oid *sortOperators, AttrNumber *attNums,
  					 int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
***************
*** 565,587 ****
  	state->copytup = copytup_heap;
  	state->writetup = writetup_heap;
  	state->readtup = readtup_heap;
- 	state->reversedirection = reversedirection_heap;
  
  	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
  	state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
  
  	for (i = 0; i < nkeys; i++)
  	{
! 		Oid			sortFunction;
! 		bool		reverse;
  
- 		AssertArg(attNums[i] != 0);
  		AssertArg(sortOperators[i] != 0);
  
! 		if (!get_compare_function_for_ordering_op(sortOperators[i],
! 												  &sortFunction, &reverse))
! 			elog(ERROR, "operator %u is not a valid ordering operator",
! 				 sortOperators[i]);
  
  		/*
  		 * We needn't fill in sk_strategy or sk_subtype since these scankeys
--- 600,621 ----
  	state->copytup = copytup_heap;
  	state->writetup = writetup_heap;
  	state->readtup = readtup_heap;
  
  	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
  	state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
+ 	state->sortFnKinds = (SortFunctionKind *)
+ 		palloc0(nkeys * sizeof(SortFunctionKind));
  
  	for (i = 0; i < nkeys; i++)
  	{
! 		RegProcedure sortFunction;
  
  		AssertArg(sortOperators[i] != 0);
+ 		AssertArg(attNums[i] != 0);
  
! 		/* select a function that implements the sort operator */
! 		SelectSortFunction(sortOperators[i], &sortFunction,
! 						   &state->sortFnKinds[i]);
  
  		/*
  		 * We needn't fill in sk_strategy or sk_subtype since these scankeys
***************
*** 592,603 ****
  					InvalidStrategy,
  					sortFunction,
  					(Datum) 0);
- 
- 		/* However, we use btree's conventions for encoding directionality */
- 		if (reverse)
- 			state->scanKeys[i].sk_flags |= SK_BT_DESC;
- 		if (nullsFirstFlags[i])
- 			state->scanKeys[i].sk_flags |= SK_BT_NULLS_FIRST;
  	}
  
  	MemoryContextSwitchTo(oldcontext);
--- 626,631 ----
***************
*** 629,635 ****
  	state->copytup = copytup_index;
  	state->writetup = writetup_index;
  	state->readtup = readtup_index;
- 	state->reversedirection = reversedirection_index;
  
  	state->indexRel = indexRel;
  	/* see comments below about btree dependence of this code... */
--- 657,662 ----
***************
*** 643,655 ****
  
  Tuplesortstate *
  tuplesort_begin_datum(Oid datumType,
! 					  Oid sortOperator, bool nullsFirstFlag,
  					  int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
  	MemoryContext oldcontext;
! 	Oid			sortFunction;
! 	bool		reverse;
  	int16		typlen;
  	bool		typbyval;
  
--- 670,681 ----
  
  Tuplesortstate *
  tuplesort_begin_datum(Oid datumType,
! 					  Oid sortOperator,
  					  int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
  	MemoryContext oldcontext;
! 	RegProcedure sortFunction;
  	int16		typlen;
  	bool		typbyval;
  
***************
*** 668,689 ****
  	state->copytup = copytup_datum;
  	state->writetup = writetup_datum;
  	state->readtup = readtup_datum;
- 	state->reversedirection = reversedirection_datum;
  
  	state->datumType = datumType;
  
! 	/* lookup the ordering function */
! 	if (!get_compare_function_for_ordering_op(sortOperator,
! 											  &sortFunction, &reverse))
! 		elog(ERROR, "operator %u is not a valid ordering operator",
! 			 sortOperator);
  	fmgr_info(sortFunction, &state->sortOpFn);
  
- 	/* set ordering flags */
- 	state->sortFnFlags = reverse ? SK_BT_DESC : 0;
- 	if (nullsFirstFlag)
- 		state->sortFnFlags |= SK_BT_NULLS_FIRST;
- 
  	/* lookup necessary attributes of the datum type */
  	get_typlenbyval(datumType, &typlen, &typbyval);
  	state->datumTypeLen = typlen;
--- 694,708 ----
  	state->copytup = copytup_datum;
  	state->writetup = writetup_datum;
  	state->readtup = readtup_datum;
  
  	state->datumType = datumType;
+ 	state->sortOperator = sortOperator;
  
! 	/* select a function that implements the sort operator */
! 	SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
! 	/* and look up the function */
  	fmgr_info(sortFunction, &state->sortOpFn);
  
  	/* lookup necessary attributes of the datum type */
  	get_typlenbyval(datumType, &typlen, &typbyval);
  	state->datumTypeLen = typlen;
***************
*** 695,734 ****
  }
  
  /*
-  * tuplesort_set_bound
-  *
-  *	Advise tuplesort that at most the first N result tuples are required.
-  *
-  * Must be called before inserting any tuples.	(Actually, we could allow it
-  * as long as the sort hasn't spilled to disk, but there seems no need for
-  * delayed calls at the moment.)
-  *
-  * This is a hint only. The tuplesort may still return more tuples than
-  * requested.
-  */
- void
- tuplesort_set_bound(Tuplesortstate *state, int64 bound)
- {
- 	/* Assert we're called before loading any tuples */
- 	Assert(state->status == TSS_INITIAL);
- 	Assert(state->memtupcount == 0);
- 	Assert(!state->bounded);
- 
- #ifdef DEBUG_BOUNDED_SORT
- 	/* Honor GUC setting that disables the feature (for easy testing) */
- 	if (!optimize_bounded_sort)
- 		return;
- #endif
- 
- 	/* We want to be able to compute bound * 2, so limit the setting */
- 	if (bound > (int64) (INT_MAX / 2))
- 		return;
- 
- 	state->bounded = true;
- 	state->bound = (int) bound;
- }
- 
- /*
   * tuplesort_end
   *
   *	Release resources and clean up.
--- 714,719 ----
***************
*** 907,913 ****
   */
  static void
  puttuple_common(Tuplesortstate *state, SortTuple *tuple)
! {
  	switch (state->status)
  	{
  		case TSS_INITIAL:
--- 892,901 ----
   */
  static void
  puttuple_common(Tuplesortstate *state, SortTuple *tuple)
! {
!     bool inHeapUP = false;
!     bool inHeapDOWN = false;
! 
  	switch (state->status)
  	{
  		case TSS_INITIAL:
***************
*** 927,958 ****
  			state->memtuples[state->memtupcount++] = *tuple;
  
  			/*
- 			 * Check if it's time to switch over to a bounded heapsort. We do
- 			 * so if the input tuple count exceeds twice the desired tuple
- 			 * count (this is a heuristic for where heapsort becomes cheaper
- 			 * than a quicksort), or if we've just filled workMem and have
- 			 * enough tuples to meet the bound.
- 			 *
- 			 * Note that once we enter TSS_BOUNDED state we will always try to
- 			 * complete the sort that way.	In the worst case, if later input
- 			 * tuples are larger than earlier ones, this might cause us to
- 			 * exceed workMem significantly.
- 			 */
- 			if (state->bounded &&
- 				(state->memtupcount > state->bound * 2 ||
- 				 (state->memtupcount > state->bound && LACKMEM(state))))
- 			{
- #ifdef TRACE_SORT
- 				if (trace_sort)
- 					elog(LOG, "switching to bounded heapsort at %d tuples: %s",
- 						 state->memtupcount,
- 						 pg_rusage_show(&state->ru_start));
- #endif
- 				make_bounded_heap(state);
- 				return;
- 			}
- 
- 			/*
  			 * Done if we still fit in available memory and have array slots.
  			 */
  			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
--- 915,920 ----
***************
*** 965,998 ****
  
  			/*
  			 * Dump tuples until we are back under the limit.
! 			 */
! 			dumptuples(state, false);
! 			break;
  
! 		case TSS_BOUNDED:
  
- 			/*
- 			 * We don't want to grow the array here, so check whether the new
- 			 * tuple can be discarded before putting it in.  This should be a
- 			 * good speed optimization, too, since when there are many more
- 			 * input tuples than the bound, most input tuples can be discarded
- 			 * with just this one comparison.  Note that because we currently
- 			 * have the sort direction reversed, we must check for <= not >=.
- 			 */
- 			if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
- 			{
- 				/* new tuple <= top of the heap, so we can discard it */
- 				free_sort_tuple(state, tuple);
- 			}
- 			else
- 			{
- 				/* discard top of heap, sift up, insert new tuple */
- 				free_sort_tuple(state, &state->memtuples[0]);
- 				tuplesort_heap_siftup(state, false);
- 				tuplesort_heap_insert(state, tuple, 0, false);
- 			}
  			break;
- 
  		case TSS_BUILDRUNS:
  
  			/*
--- 927,939 ----
  
  			/*
  			 * Dump tuples until we are back under the limit.
! 			MODIFICATO: IT WILL BE REMOVED THE ROOT OF THE HEAP THAT WILL HOST
! 			THE NEW INPUT TUPLE.
  
! 			dumptuples(state, false);
! 			*/
  
  			break;
  		case TSS_BUILDRUNS:
  
  			/*
***************
*** 1008,1024 ****
  			 * Note there will always be at least one tuple in the heap at
  			 * this point; see dumptuples.
  			 */
! 			Assert(state->memtupcount > 0);
  			if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
  				tuplesort_heap_insert(state, tuple, state->currentRun, true);
  			else
  				tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
  
! 			/*
! 			 * If we are over the memory limit, dump tuples till we're under.
! 			 */
! 			dumptuples(state, false);
! 			break;
  
  		default:
  			elog(ERROR, "invalid tuplesort state");
--- 949,1008 ----
  			 * Note there will always be at least one tuple in the heap at
  			 * this point; see dumptuples.
  			 */
! 	/*  ++++++++++++++++++++++++++++++++++++++++++++++++
! 	 *                   MODIFIED
! 	 *
!             Assert(state->memtupcount > 0);
  			if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
  				tuplesort_heap_insert(state, tuple, state->currentRun, true);
  			else
  				tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
  
!              If we are over the memory limit, dump tuples till we're under.
! 
!             dumptuples(state, false);
! 	 *
!      *  ++++++++++++++++++++++++++++++++++++++++++++++++
!      */
!             Assert(state->memtupcount > 0);
! 
! 
!             if(state->HeapUPactive && COMPARETUP(state, tuple, &state->memtuples[state->HeapUP]) <= 0)
! 			{
! 			    dumptuples_UP(state, false);
! 				tuplesort_heap_insert_UP(state, tuple, state->currentRunUP, true);
! 				inHeapUP = true;
! 				break;
! 			}
! 
! 			if (state->HeapDOWNactive && COMPARETUP(state, tuple, &state->memtuples[state->HeapDOWN]) >= 0)
! 			{
! 			    dumptuples_DOWN(state, false);
! 				tuplesort_heap_insert_DOWN(state, tuple, state->currentRunDOWN, true);
!                 inHeapDOWN = true;
!                 break;
! 			}
! 
! 			/* IF WE GET TO THIS POINT IT MEANS THE INPUT TUPLE WON'T FORM PART OF THE CURRENT RUN */
!             // At the moment we just place it into HeapUP at first. Waiting for a more efficient
!             // to manage that kind of element.
! 
! 			if(state->HeapUPactive)
! 			{
! 			    dumptuples_UP(state, false);
! 				tuplesort_heap_insert_UP(state, tuple, state->currentRunUP+2, true);
! 				inHeapUP = true;
! 				break;
! 			}
! 
! 			if(state->HeapDOWNactive)
! 			{
! 			    dumptuples_DOWN(state, false);
! 				tuplesort_heap_insert_DOWN(state, tuple, state->currentRunDOWN+2, true);
! 				inHeapUP = true;
! 				break;
! 			}
! 
  
  		default:
  			elog(ERROR, "invalid tuplesort state");
***************
*** 1060,1081 ****
  			state->markpos_eof = false;
  			state->status = TSS_SORTEDINMEM;
  			break;
- 
- 		case TSS_BOUNDED:
- 
- 			/*
- 			 * We were able to accumulate all the tuples required for output
- 			 * in memory, using a heap to eliminate excess tuples.	Now we
- 			 * have to transform the heap to a properly-sorted array.
- 			 */
- 			sort_bounded_heap(state);
- 			state->current = 0;
- 			state->eof_reached = false;
- 			state->markpos_offset = 0;
- 			state->markpos_eof = false;
- 			state->status = TSS_SORTEDINMEM;
- 			break;
- 
  		case TSS_BUILDRUNS:
  
  			/*
--- 1044,1049 ----
***************
*** 1091,1097 ****
  			state->markpos_offset = 0;
  			state->markpos_eof = false;
  			break;
- 
  		default:
  			elog(ERROR, "invalid tuplesort state");
  			break;
--- 1059,1064 ----
***************
*** 1137,1151 ****
  					return true;
  				}
  				state->eof_reached = true;
- 
- 				/*
- 				 * Complain if caller tries to retrieve more tuples than
- 				 * originally asked for in a bounded sort.	This is because
- 				 * returning EOF here might be the wrong thing.
- 				 */
- 				if (state->bounded && state->current >= state->bound)
- 					elog(ERROR, "retrieved too many tuples in a bounded sort");
- 
  				return false;
  			}
  			else
--- 1104,1109 ----
***************
*** 1443,1449 ****
  inittapes(Tuplesortstate *state)
  {
  	int			maxTapes,
- 				ntuples,
  				j;
  	long		tapeSpace;
  
--- 1401,1406 ----
***************
*** 1480,1491 ****
  		USEMEM(state, tapeSpace);
  
  	/*
- 	 * Make sure that the temp file(s) underlying the tape set are created in
- 	 * suitable temp tablespaces.
- 	 */
- 	PrepareTempTablespaces();
- 
- 	/*
  	 * Create the tape set and allocate the per-tape data arrays.
  	 */
  	state->tapeset = LogicalTapeSetCreate(maxTapes);
--- 1437,1442 ----
***************
*** 1501,1525 ****
  	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
  
  	/*
! 	 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
  	 * marked as belonging to run number zero.
  	 *
  	 * NOTE: we pass false for checkIndex since there's no point in comparing
  	 * indexes in this step, even though we do intend the indexes to be part
  	 * of the sort key...
! 	 */
  	ntuples = state->memtupcount;
! 	state->memtupcount = 0;		/* make the heap empty */
  	for (j = 0; j < ntuples; j++)
  	{
! 		/* Must copy source tuple to avoid possible overwrite */
  		SortTuple	stup = state->memtuples[j];
  
  		tuplesort_heap_insert(state, &stup, 0, false);
  	}
  	Assert(state->memtupcount == ntuples);
  
- 	state->currentRun = 0;
  
  	/*
  	 * Initialize variables of Algorithm D (step D1).
--- 1452,1504 ----
  	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
  
  	/*
! 	 * OLD: Convert the unsorted contents of memtuples[] into a heap. Each tuple is
  	 * marked as belonging to run number zero.
  	 *
  	 * NOTE: we pass false for checkIndex since there's no point in comparing
  	 * indexes in this step, even though we do intend the indexes to be part
  	 * of the sort key...
! 	 */
! 
! 	 /*         MODIFICATO
  	ntuples = state->memtupcount;
! 	state->memtupcount = 0;		// make the heap empty
  	for (j = 0; j < ntuples; j++)
  	{
! 		// Must copy source tuple to avoid possible overwrite
  		SortTuple	stup = state->memtuples[j];
  
  		tuplesort_heap_insert(state, &stup, 0, false);
  	}
  	Assert(state->memtupcount == ntuples);
+     */
+ 
+ 	// NEW: qsort the unsorted array
+ 	if (state->memtupcount > 1)
+         qsort_arg((void *) state->memtuples, state->memtupcount,
+                            sizeof(SortTuple),
+                            (qsort_arg_comparator) state->comparetup,
+                            (void *) state);
+ 
+ 
+     // dividing logically 'memtuples' into two heaps
+     state->memtupcountUP = state->memtupcount >> 1;
+     state->HeapUP = state->memtupcountUP - 1;
+     state->HeapDOWN = state->memtupcountUP;
+     state->memtupcountDOWN = state->memtupcount - state->memtupcountUP;
+     state->HeapUPactive = true;
+     state->HeapDOWNactive = true;
+ 
+     //initializing run number for each element into HeapUP
+     state->currentRunUP = 0;
+     for(j=0; j<state->memtupcountUP; j++)
+         state->memtuples[j].tupindex = state->currentRunUP;
+ 
+     //initializing run number for each element into HeapDOWN
+     state->currentRunDOWN = 1;
+     for(j=0; j<state->memtupcountDOWN; j++)
+         state->memtuples[state->memtupcountUP + j].tupindex = state->currentRunDOWN;
  
  
  	/*
  	 * Initialize variables of Algorithm D (step D1).
***************
*** 1555,1566 ****
  	/* Step D3: advance j (destTape) */
  	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
  	{
! 		state->destTape++;
! 		return;
  	}
  	if (state->tp_dummy[state->destTape] != 0)
  	{
! 		state->destTape = 0;
  		return;
  	}
  
--- 1534,1561 ----
  	/* Step D3: advance j (destTape) */
  	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
  	{
! 		state->destTape++;
! 
! 		if(state->destTape+2 < state->tapeRange)
! 		{
! 		    state->destTapeUP = destTape + 1;
! 		    state->destTapeUP = destTape + 2;
! 		    return;
!         }
!         if(state->destTape-2 > 0)
!         {
!             state->destTapeUP = destTape - 1;
! 		    state->destTapeUP = destTape - 2;
! 		    return;
!         }
! 
! 		elog(ERROR, "MANOLO -- NOT ENOUGH TAPES?");
  	}
  	if (state->tp_dummy[state->destTape] != 0)
  	{
! 		state->destTape = 0;
! 		state->destTapeUP = 1;
! 		state->destTapeUP = 2;
  		return;
  	}
  
***************
*** 1572,1578 ****
  		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
  		state->tp_fib[j] = a + state->tp_fib[j + 1];
  	}
! 	state->destTape = 0;
  }
  
  /*
--- 1567,1575 ----
  		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
  		state->tp_fib[j] = a + state->tp_fib[j + 1];
  	}
! 	state->destTape = 0;
! 	state->destTapeUP = 1;
! 	state->destTapeUP = 2;
  }
  
  /*
***************
*** 2022,2027 ****
--- 2019,2169 ----
  		}
  	}
  }
+ 
+ 
+ static void
+ dumptuples_UP(Tuplesortstate *state, bool alltuples)
+ {
+     int rootIndexUP = state->HeapUP;
+     bool randomAccess;
+ 
+     /* Storing the original value of randomAccess */
+ 	randomAccess = state->radomAccess;
+ 	/* That allows backward reading tuples */
+ 	state->radomAccess = true;
+ 
+ 	while (alltuples ||
+ 		   (LACKMEM(state) && state->memtupcountUP > 1) ||
+ 		   state->memtupcountUP >= state->memtupsizeUP)
+ 	{
+ 		/*
+ 		 * Dump the heap's frontmost entry, and sift up to remove it from the
+ 		 * heap.
+ 		 */
+ 		Assert(state->memtupcountUP > 0);
+ 		WRITETUP(state, state->tp_tapenum[state->destTapeUP],
+ 				 &state->memtuples[rootIndexUP]);
+ 		tuplesort_heap_siftup_UP(state, true);
+ 
+ 		/*
+ 		 * If the heap is empty *or* top run number has changed, we've
+ 		 * finished the current run.
+ 		 */
+ 		if (state->memtupcountUP == 0 ||
+ 			state->currentRunUP != state->memtuples[rootIndexUP].tupindex)
+ 		{
+ 			markrunend(state, state->tp_tapenum[state->destTapeUP]);
+ 
+ 			state->currentRunUP += 2;
+ 			state->HeapUPactive = false;
+ 
+ #ifdef TRACE_SORT
+ 			if (trace_sort)
+ 				elog(LOG, "finished writing%s run UP %d to tape %d: %s",
+ 					 (state->memtupcountUP == 0) ? " final" : "",
+ 					 state->currentRunUP, state->destTapeUP,
+ 					 pg_rusage_show(&state->ru_start));
+ #endif
+ 
+ 			/*
+ 			 * Done if heap is empty, else prepare for new LOGICAL run,
+ 			 * in case PHYSICAL run DOWN too is over.
+ 			 */
+ 			if (state->memtupcountUP == 0)
+ 				break;
+             if (!state->HeapDOWNactive)
+             {
+                 formLogicalRun(state);
+ 
+                 state->tp_runs[state->destTape]++;
+                 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+                 state->HeapDOWNactive = true;
+                 state->HeapUPactive = true;
+ 
+                 Assert(state->currentRunUP == state->memtuples[rootIndexUP].tupindex);
+                 selectnewtape(state);
+             }
+ 		}
+ 	}
+ 
+     /* Restoring the original value of randomAccess */
+ 	state->radomAccess = randomAccess;
+ }
+ 
+ void formLogicalRun(Tuplesortstate *state)
+ {
+ 
+ 
+ }
+ 
+ 
+ static void
+ dumptuples_DOWN(Tuplesortstate *state, bool alltuples)
+ {
+     int rootIndexDOWN = state->HeapDOWN;
+ 
+ 	while (alltuples ||
+ 		   (LACKMEM(state) && state->memtupcountDOWN > 1) ||
+ 		   state->memtupcountDOWN >= state->memtupsizeDOWN)
+ 	{
+ 		/*
+ 		 * Dump the heap's frontmost entry, and sift up to remove it from the
+ 		 * heap.
+ 		 */
+ 		Assert(state->memtupcountDOWN > 0);
+ 		WRITETUP(state, state->tp_tapenum[state->destTape],
+ 				 &state->memtuples[rootIndexDOWN]);
+ 		tuplesort_heap_siftup_DOWN(state, true);
+ 
+ 		/*
+ 		 * If the heap is empty *or* top run number has changed, we've
+ 		 * finished the current run.
+ 		 */
+ 
+ 		 //IMPORTANT!!! SEE CORRESPONDENCE RUN<-->TAPE
+ 
+ 		if (state->memtupcountDOWN == 0 ||
+ 			state->currentRunDOWN != state->memtuples[rootIndexDOWN].tupindex)
+ 		{
+ 		    state->HeapDOWNactive = false;
+ 
+ 		    if(!state->HeapUPactive)
+             {
+                 state->HeapUPactive = true;
+                 state->HeapDOWNactive = true;
+ 
+                 markrunend(state, state->tp_tapenum[state->destTape]);
+ 
+                 state->currentRunDOWN += 2;
+                 state->currentRunUP += 2;
+ 
+                 state->tp_runs[state->destTape]++;
+                 state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+ 
+ #ifdef TRACE_SORT
+                 if (trace_sort)
+                     elog(LOG, "finished writing%s run %d to tape %d: %s",
+                         (state->memtupcountDOWN == 0) ? " final" : "",
+                         state->currentRunDOWN, state->destTape,
+                         pg_rusage_show(&state->ru_start));
+ #endif
+             }
+ 
+             /*
+              * Done if heap is empty, else prepare for new run.
+              */
+ 
+              //INSIDE OR PUTSIDE OF THE PRECEDING 'IF'?!?!
+              if (state->memtupcountDOWN == 0)
+                     break;
+              Assert(state->currentRunDOWN == state->memtuples[rootIndexDOWN].tupindex);
+              selectnewtape(state);
+ 		}
+ 	}
+ }
+ 
+ 
+ 
  
  /*
   * tuplesort_rescan		- rewind and replay the scan
***************
*** 2122,2185 ****
  	MemoryContextSwitchTo(oldcontext);
  }
  
- /*
-  * tuplesort_explain - produce a line of information for EXPLAIN ANALYZE
-  *
-  * This can be called after tuplesort_performsort() finishes to obtain
-  * printable summary information about how the sort was performed.
-  *
-  * The result is a palloc'd string.
-  */
- char *
- tuplesort_explain(Tuplesortstate *state)
- {
- 	char	   *result = (char *) palloc(100);
- 	long		spaceUsed;
- 
- 	/*
- 	 * Note: it might seem we should print both memory and disk usage for a
- 	 * disk-based sort.  However, the current code doesn't track memory space
- 	 * accurately once we have begun to return tuples to the caller (since we
- 	 * don't account for pfree's the caller is expected to do), so we cannot
- 	 * rely on availMem in a disk sort.  This does not seem worth the overhead
- 	 * to fix.	Is it worth creating an API for the memory context code to
- 	 * tell us how much is actually used in sortcontext?
- 	 */
- 	if (state->tapeset)
- 		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
- 	else
- 		spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
- 
- 	switch (state->status)
- 	{
- 		case TSS_SORTEDINMEM:
- 			if (state->boundUsed)
- 				snprintf(result, 100,
- 						 "Sort Method:  top-N heapsort  Memory: %ldkB",
- 						 spaceUsed);
- 			else
- 				snprintf(result, 100,
- 						 "Sort Method:  quicksort  Memory: %ldkB",
- 						 spaceUsed);
- 			break;
- 		case TSS_SORTEDONTAPE:
- 			snprintf(result, 100,
- 					 "Sort Method:  external sort  Disk: %ldkB",
- 					 spaceUsed);
- 			break;
- 		case TSS_FINALMERGE:
- 			snprintf(result, 100,
- 					 "Sort Method:  external merge  Disk: %ldkB",
- 					 spaceUsed);
- 			break;
- 		default:
- 			snprintf(result, 100, "sort still in progress");
- 			break;
- 	}
- 
- 	return result;
- }
- 
  
  /*
   * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
--- 2264,2269 ----
***************
*** 2194,2307 ****
  	 COMPARETUP(state, tup1, tup2))
  
  /*
!  * Convert the existing unordered array of SortTuples to a bounded heap,
!  * discarding all but the smallest "state->bound" tuples.
!  *
!  * When working with a bounded heap, we want to keep the largest entry
!  * at the root (array entry zero), instead of the smallest as in the normal
!  * sort case.  This allows us to discard the largest entry cheaply.
!  * Therefore, we temporarily reverse the sort direction.
   *
!  * We assume that all entries in a bounded heap will always have tupindex
!  * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
!  * the direction of comparison for tupindexes.
   */
  static void
! make_bounded_heap(Tuplesortstate *state)
  {
! 	int			tupcount = state->memtupcount;
! 	int			i;
  
! 	Assert(state->status == TSS_INITIAL);
! 	Assert(state->bounded);
! 	Assert(tupcount >= state->bound);
  
! 	/* Reverse sort direction so largest entry will be at root */
! 	REVERSEDIRECTION(state);
  
! 	state->memtupcount = 0;		/* make the heap empty */
! 	for (i = 0; i < tupcount; i++)
  	{
! 		if (state->memtupcount >= state->bound &&
! 		  COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
! 		{
! 			/* New tuple would just get thrown out, so skip it */
! 			free_sort_tuple(state, &state->memtuples[i]);
! 		}
! 		else
! 		{
! 			/* Insert next tuple into heap */
! 			/* Must copy source tuple to avoid possible overwrite */
! 			SortTuple	stup = state->memtuples[i];
! 
! 			tuplesort_heap_insert(state, &stup, 0, false);
  
! 			/* If heap too full, discard largest entry */
! 			if (state->memtupcount > state->bound)
! 			{
! 				free_sort_tuple(state, &state->memtuples[0]);
! 				tuplesort_heap_siftup(state, false);
! 			}
! 		}
  	}
! 
! 	Assert(state->memtupcount == state->bound);
! 	state->status = TSS_BOUNDED;
  }
  
! /*
!  * Convert the bounded heap to a properly-sorted array
!  */
  static void
! sort_bounded_heap(Tuplesortstate *state)
  {
! 	int			tupcount = state->memtupcount;
! 
! 	Assert(state->status == TSS_BOUNDED);
! 	Assert(state->bounded);
! 	Assert(tupcount == state->bound);
  
  	/*
! 	 * We can unheapify in place because each sift-up will remove the largest
! 	 * entry, which we can promptly store in the newly freed slot at the end.
! 	 * Once we're down to a single-entry heap, we're done.
  	 */
! 	while (state->memtupcount > 1)
! 	{
! 		SortTuple	stup = state->memtuples[0];
  
! 		/* this sifts-up the next-largest entry and decreases memtupcount */
! 		tuplesort_heap_siftup(state, false);
! 		state->memtuples[state->memtupcount] = stup;
! 	}
! 	state->memtupcount = tupcount;
  
  	/*
! 	 * Reverse sort direction back to the original state.  This is not
! 	 * actually necessary but seems like a good idea for tidiness.
  	 */
! 	REVERSEDIRECTION(state);
  
! 	state->status = TSS_SORTEDINMEM;
! 	state->boundUsed = true;
  }
  
! /*
!  * Insert a new tuple into an empty or existing heap, maintaining the
!  * heap invariant.	Caller is responsible for ensuring there's room.
!  *
!  * Note: we assume *tuple is a temporary variable that can be scribbled on.
!  * For some callers, tuple actually points to a memtuples[] entry above the
!  * end of the heap.  This is safe as long as it's not immediately adjacent
!  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
!  * is, it might get overwritten before being moved into the heap!
!  */
  static void
! tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
! 					  int tupleindex, bool checkIndex)
  {
  	SortTuple  *memtuples;
! 	int			j;
  
  	/*
  	 * Save the tupleindex --- see notes above about writing on *tuple. It's a
--- 2278,2371 ----
  	 COMPARETUP(state, tup1, tup2))
  
  /*
!  * Insert a new tuple into an empty or existing heap, maintaining the
!  * heap invariant.	Caller is responsible for ensuring there's room.
   *
!  * Note: we assume *tuple is a temporary variable that can be scribbled on.
!  * For some callers, tuple actually points to a memtuples[] entry above the
!  * end of the heap.  This is safe as long as it's not immediately adjacent
!  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
!  * is, it might get overwritten before being moved into the heap!
   */
  static void
! tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
! 					  int tupleindex, bool checkIndex)
  {
! 	SortTuple  *memtuples;
! 	int			j;
  
! 	/*
! 	 * Save the tupleindex --- see notes above about writing on *tuple. It's a
! 	 * historical artifact that tupleindex is passed as a separate argument
! 	 * and not in *tuple, but it's notationally convenient so let's leave it
! 	 * that way.
! 	 */
! 	tuple->tupindex = tupleindex;
  
! 	memtuples = state->memtuples;
! 	Assert(state->memtupcount < state->memtupsize);
  
! 	/*
! 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
! 	 * using 1-based array indexes, not 0-based.
! 	 */
! 	j = state->memtupcount++;
! 	while (j > 0)
  	{
! 		int			i = (j - 1) >> 1;
  
! 		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
! 			break;
! 		memtuples[j] = memtuples[i];
! 		j = i;
  	}
! 	memtuples[j] = *tuple;
  }
  
! 
! 
! 
  static void
! tuplesort_heap_insert_DOWN(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex)
  {
! 	SortTuple  *memtuples;
! 	int			j;
  
  	/*
! 	 * Save the tupleindex --- see notes above about writing on *tuple. It's a
! 	 * historical artifact that tupleindex is passed as a separate argument
! 	 * and not in *tuple, but it's notationally convenient so let's leave it
! 	 * that way.
  	 */
! 	tuple->tupindex = tupleindex;
  
! 	memtuples = state->memtuples;
! 	Assert(state->memtupcountDOWN < state->memtupsizeDOWN);
  
  	/*
! 	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
! 	 * using 1-based array indexes, not 0-based.
  	 */
! 	j = state->HeapDOWN + state->memtupcountDOWN++;
! 	while (j > state->HeapDOWN)
! 	{
! 		int	i = (j - 1) >> 1;
  
! 		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
! 			break;
! 		memtuples[j] = memtuples[i];
! 		j = i;
! 	}
! 	memtuples[j] = *tuple;
  }
  
! 
  static void
! tuplesort_heap_insert_UP(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex)
  {
  	SortTuple  *memtuples;
! 	int		j,
!             HeapUP;
  
  	/*
  	 * Save the tupleindex --- see notes above about writing on *tuple. It's a
***************
*** 2312,2327 ****
  	tuple->tupindex = tupleindex;
  
  	memtuples = state->memtuples;
! 	Assert(state->memtupcount < state->memtupsize);
  
  	/*
  	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
  	 * using 1-based array indexes, not 0-based.
! 	 */
! 	j = state->memtupcount++;
! 	while (j > 0)
  	{
! 		int			i = (j - 1) >> 1;
  
  		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
  			break;
--- 2376,2392 ----
  	tuple->tupindex = tupleindex;
  
  	memtuples = state->memtuples;
! 	Assert(state->memtupcountUP < state->memtupsizeUP);
  
  	/*
  	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
  	 * using 1-based array indexes, not 0-based.
! 	 */
! 	HeapUP = state->HeapUP;
! 	j = HeapUP - state->memtupcountUP++;
! 	while (j < HeapUP)
  	{
! 		int	i = (j - 1) >> 1;
  
  		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
  			break;
***************
*** 2331,2336 ****
--- 2396,2403 ----
  	memtuples[j] = *tuple;
  }
  
+ 
+ 
  /*
   * The tuple at state->memtuples[0] has been removed from the heap.
   * Decrement memtupcount, and sift up to maintain the heap invariant.
***************
*** 2364,2370 ****
  	}
  	memtuples[i] = *tuple;
  }
! 
  
  /*
   * Tape interface routines
--- 2431,2580 ----
  	}
  	memtuples[i] = *tuple;
  }
! 
! 
! 
! static int leftSonUP(int nodeIndex, int rootIndexUP)
! {    return (nodeIndex * 2 - 1 - rootIndexUP); }
! 
! static int rightSonUP(int nodeIndex, int rootIndexUP)
! {    return (nodeIndex * 2 - 2 - rootIndexUP); }
! 
! static int leftSonDOWN(int nodeIndex, int rootIndexDOWN)
! {    return (nodeIndex * 2 + 2 - rootIndexDOWN); }
! 
! static int rightSonDOWN(int nodeIndex, int rootIndexDOWN)
! {    return (nodeIndex * 2 + 1 - rootIndexDOWN); }
! 
! 
! static void
! tuplesort_heap_siftup_UP(Tuplesortstate *state, bool checkIndex)
! {
! 	SortTuple  *memtuples = state->memtuples;
! 	SortTuple  *tuple;
! 	int			i,
!                 n, //index of the last leaf of HeapUP
!                 memtupcount,
!                 memtupcountUP, memtupsizeUP, HeapUP;
!     bool        runningAfter;
! 
!     memtupcount = --state->memtupcount;
!     memtupcountUP = --state->memtupcountUP;
! 	if (memtupcountUP <= 0)
! 		return;
! 	memtupsizeUP = state->memtupsizeUP;
! 	HeapUP = state->HeapUP;
! 
! 	if(memtupsizeUP >= memtupcountUP)
! 	{   //index of the last element of the heap
! 	    n = HeapUP - memtupcountUP + 1;
! 	    tuple = &memtuples[n];		/* tuple that must be reinserted */
! 	    runningAfter = false;
!     }
!     else
!     {
!         n = memtupcount - (memtupcountUP - memtupsizeUP) -1 ;
! 	    runningAfter = true;
! 	    tuple = &memtuples[n];		/* tuple that must be reinserted */
!     }
! 
! 
! 	i = HeapUP;			/* i is where the "hole" is */
! 	for (;;)
! 	{
! 		int		j = leftSonUP(HeapUP, i);
! 
!         if ( !runningAfter && j > n )
! 		{
! 		    if (j - 1 < n &&
!                     HEAPCOMPARE(&memtuples[j], &memtuples[j - 1]) < 0 )
!                 j--;
!             if (HEAPCOMPARE(tuple, &memtuples[j]) >= 0)
!                 break;
!             memtuples[i] = memtuples[j];
!             i = j;
! 		    continue;
!         }
!         /*HEAPS RUNNING AFTER THEM EACH OTHER
!         if (runningAfter && )
! 		if (j + 1 < n &&
! 			HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
! 			j++;
! 		if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
! 			break;
! 		memtuples[i] = memtuples[j];
! 		i = j;
! 		*/
! 	}
! 	memtuples[i] = *tuple;
! }
! 
! 
! 
! 
! static void
! tuplesort_heap_siftup_DOWN(Tuplesortstate *state, bool checkIndex)
! {
! 	SortTuple  *memtuples = state->memtuples;
! 	SortTuple  *tuple;
! 	int			i,
!                 n, //index of the last leaf of HeapUP
!                 HeapDOWN,
!                 memtupcountDOWN, memtupsizeDOWN, memtupcount;
!     bool        runningAfter;
! 
!     memtupcount = --state->memtupcount;
!     memtupcountDOWN = --state->memtupcountDOWN;
! 	if (memtupcountDOWN <= 0)
! 		return;
! 	memtupsizeDOWN = state->memtupsizeDOWN;
! 	HeapDOWN = state->HeapDOWN;
! 
! 	if(memtupsizeDOWN >= memtupcountDOWN)
! 	{
! 	    n = HeapDOWN + memtupcountDOWN - 1;
! 	    tuple = &memtuples[n];		/* tuple that must be reinserted */
! 	    runningAfter = false;
!     }
!     else
!     {
!         n = memtupcountDOWN - (memtupsizeDOWN - 1) ;
! 	    runningAfter = true;
! 	    tuple = &memtuples[n];		/* tuple that must be reinserted */
!     }
! 
! 
! 	i = HeapDOWN;			/* i is where the "hole" is */
! 	for (;;)
! 	{
! 		int		j = rightSonDOWN(HeapDOWN,i);
! 
!         if ( !runningAfter && j < n )
! 		{
! 		    if (j + 1 < n &&
!                     HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0 )
!                 j++;
!             if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
!                 break;
!             memtuples[i] = memtuples[j];
!             i = j;
! 		    continue;
!         }
! 
!         /*HEAPS RUNNING AFTER THEM EACH OTHER
!         if (runningAfter && )
! 		if (j + 1 < n &&
! 			HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
! 			j++;
! 		if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
! 			break;
! 		memtuples[i] = memtuples[j];
! 		i = j;
! 		*/
! 	}
! 	memtuples[i] = *tuple;
! }
! 
  
  /*
   * Tape interface routines
***************
*** 2393,2418 ****
  
  
  /*
!  * Set up for an external caller of ApplySortFunction.	This function
!  * basically just exists to localize knowledge of the encoding of sk_flags
!  * used in this module.
   */
  void
  SelectSortFunction(Oid sortOperator,
! 				   bool nulls_first,
! 				   Oid *sortFunction,
! 				   int *sortFlags)
! {
! 	bool		reverse;
! 
! 	if (!get_compare_function_for_ordering_op(sortOperator,
! 											  sortFunction, &reverse))
! 		elog(ERROR, "operator %u is not a valid ordering operator",
! 			 sortOperator);
! 
! 	*sortFlags = reverse ? SK_BT_DESC : 0;
! 	if (nulls_first)
! 		*sortFlags |= SK_BT_NULLS_FIRST;
  }
  
  /*
--- 2603,2701 ----
  
  
  /*
!  * This routine selects an appropriate sorting function to implement
!  * a sort operator as efficiently as possible.	The straightforward
!  * method is to use the operator's implementation proc --- ie, "<"
!  * comparison.	However, that way often requires two calls of the function
!  * per comparison.	If we can find a btree three-way comparator function
!  * associated with the operator, we can use it to do the comparisons
!  * more efficiently.  We also support the possibility that the operator
!  * is ">" (descending sort), in which case we have to reverse the output
!  * of the btree comparator.
!  *
!  * Possibly this should live somewhere else (backend/catalog/, maybe?).
   */
  void
  SelectSortFunction(Oid sortOperator,
! 				   RegProcedure *sortFunction,
! 				   SortFunctionKind *kind)
! {
! 	CatCList   *catlist;
! 	int			i;
! 	HeapTuple	tuple;
! 	Form_pg_operator optup;
! 	Oid			opclass = InvalidOid;
! 
! 	/*
! 	 * Search pg_amop to see if the target operator is registered as the "<"
! 	 * or ">" operator of any btree opclass.  It's possible that it might be
! 	 * registered both ways (eg, if someone were to build a "reverse sort"
! 	 * opclass for some reason); prefer the "<" case if so. If the operator is
! 	 * registered the same way in multiple opclasses, assume we can use the
! 	 * associated comparator function from any one.
! 	 */
! 	catlist = SearchSysCacheList(AMOPOPID, 1,
! 								 ObjectIdGetDatum(sortOperator),
! 								 0, 0, 0);
! 
! 	for (i = 0; i < catlist->n_members; i++)
! 	{
! 		Form_pg_amop aform;
! 
! 		tuple = &catlist->members[i]->tuple;
! 		aform = (Form_pg_amop) GETSTRUCT(tuple);
! 
! 		if (!opclass_is_btree(aform->amopclaid))
! 			continue;
! 		/* must be of default subtype, too */
! 		if (aform->amopsubtype != InvalidOid)
! 			continue;
! 
! 		if (aform->amopstrategy == BTLessStrategyNumber)
! 		{
! 			opclass = aform->amopclaid;
! 			*kind = SORTFUNC_CMP;
! 			break;				/* done looking */
! 		}
! 		else if (aform->amopstrategy == BTGreaterStrategyNumber)
! 		{
! 			opclass = aform->amopclaid;
! 			*kind = SORTFUNC_REVCMP;
! 			/* keep scanning in hopes of finding a BTLess entry */
! 		}
! 	}
! 
! 	ReleaseSysCacheList(catlist);
! 
! 	if (OidIsValid(opclass))
! 	{
! 		/* Found a suitable opclass, get its default comparator function */
! 		*sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
! 		Assert(RegProcedureIsValid(*sortFunction));
! 		return;
! 	}
! 
! 	/*
! 	 * Can't find a comparator, so use the operator as-is.  Decide whether it
! 	 * is forward or reverse sort by looking at its name (grotty, but this
! 	 * only matters for deciding which end NULLs should get sorted to).  XXX
! 	 * possibly better idea: see whether its selectivity function is
! 	 * scalargtcmp?
! 	 */
! 	tuple = SearchSysCache(OPEROID,
! 						   ObjectIdGetDatum(sortOperator),
! 						   0, 0, 0);
! 	if (!HeapTupleIsValid(tuple))
! 		elog(ERROR, "cache lookup failed for operator %u", sortOperator);
! 	optup = (Form_pg_operator) GETSTRUCT(tuple);
! 	if (strcmp(NameStr(optup->oprname), ">") == 0)
! 		*kind = SORTFUNC_REVLT;
! 	else
! 		*kind = SORTFUNC_LT;
! 	*sortFunction = optup->oprcode;
! 	ReleaseSysCache(tuple);
! 
! 	Assert(RegProcedureIsValid(*sortFunction));
  }
  
  /*
***************
*** 2443,2484 ****
  /*
   * Apply a sort function (by now converted to fmgr lookup form)
   * and return a 3-way comparison result.  This takes care of handling
!  * reverse-sort and NULLs-ordering properly.  We assume that DESC and
!  * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
   */
  static inline int32
! inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags,
  						Datum datum1, bool isNull1,
  						Datum datum2, bool isNull2)
  {
! 	int32		compare;
! 
! 	if (isNull1)
! 	{
! 		if (isNull2)
! 			compare = 0;		/* NULL "=" NULL */
! 		else if (sk_flags & SK_BT_NULLS_FIRST)
! 			compare = -1;		/* NULL "<" NOT_NULL */
! 		else
! 			compare = 1;		/* NULL ">" NOT_NULL */
! 	}
! 	else if (isNull2)
! 	{
! 		if (sk_flags & SK_BT_NULLS_FIRST)
! 			compare = 1;		/* NOT_NULL ">" NULL */
! 		else
! 			compare = -1;		/* NOT_NULL "<" NULL */
! 	}
! 	else
  	{
! 		compare = DatumGetInt32(myFunctionCall2(sortFunction,
! 												datum1, datum2));
  
! 		if (sk_flags & SK_BT_DESC)
! 			compare = -compare;
! 	}
  
! 	return compare;
  }
  
  /*
--- 2726,2799 ----
  /*
   * Apply a sort function (by now converted to fmgr lookup form)
   * and return a 3-way comparison result.  This takes care of handling
!  * NULLs and sort ordering direction properly.
   */
  static inline int32
! inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
  						Datum datum1, bool isNull1,
  						Datum datum2, bool isNull2)
  {
! 	switch (kind)
  	{
! 		case SORTFUNC_LT:
! 			if (isNull1)
! 			{
! 				if (isNull2)
! 					return 0;
! 				return 1;		/* NULL sorts after non-NULL */
! 			}
! 			if (isNull2)
! 				return -1;
! 			if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
! 				return -1;		/* a < b */
! 			if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
! 				return 1;		/* a > b */
! 			return 0;
! 
! 		case SORTFUNC_REVLT:
! 			/* We reverse the ordering of NULLs, but not the operator */
! 			if (isNull1)
! 			{
! 				if (isNull2)
! 					return 0;
! 				return -1;		/* NULL sorts before non-NULL */
! 			}
! 			if (isNull2)
! 				return 1;
! 			if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
! 				return -1;		/* a < b */
! 			if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
! 				return 1;		/* a > b */
! 			return 0;
  
! 		case SORTFUNC_CMP:
! 			if (isNull1)
! 			{
! 				if (isNull2)
! 					return 0;
! 				return 1;		/* NULL sorts after non-NULL */
! 			}
! 			if (isNull2)
! 				return -1;
! 			return DatumGetInt32(myFunctionCall2(sortFunction,
! 												 datum1, datum2));
  
! 		case SORTFUNC_REVCMP:
! 			if (isNull1)
! 			{
! 				if (isNull2)
! 					return 0;
! 				return -1;		/* NULL sorts before non-NULL */
! 			}
! 			if (isNull2)
! 				return 1;
! 			return -DatumGetInt32(myFunctionCall2(sortFunction,
! 												  datum1, datum2));
! 
! 		default:
! 			elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
! 			return 0;			/* can't get here, but keep compiler quiet */
! 	}
  }
  
  /*
***************
*** 2486,2496 ****
   * C99's brain-dead notions about how to implement inline functions...
   */
  int32
! ApplySortFunction(FmgrInfo *sortFunction, int sortFlags,
  				  Datum datum1, bool isNull1,
  				  Datum datum2, bool isNull2)
  {
! 	return inlineApplySortFunction(sortFunction, sortFlags,
  								   datum1, isNull1,
  								   datum2, isNull2);
  }
--- 2801,2811 ----
   * C99's brain-dead notions about how to implement inline functions...
   */
  int32
! ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
  				  Datum datum1, bool isNull1,
  				  Datum datum2, bool isNull2)
  {
! 	return inlineApplySortFunction(sortFunction, kind,
  								   datum1, isNull1,
  								   datum2, isNull2);
  }
***************
*** 2514,2520 ****
  	CHECK_FOR_INTERRUPTS();
  
  	/* Compare the leading sort key */
! 	compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
  									  a->datum1, a->isnull1,
  									  b->datum1, b->isnull1);
  	if (compare != 0)
--- 2829,2836 ----
  	CHECK_FOR_INTERRUPTS();
  
  	/* Compare the leading sort key */
! 	compare = inlineApplySortFunction(&scanKey->sk_func,
! 									  state->sortFnKinds[0],
  									  a->datum1, a->isnull1,
  									  b->datum1, b->isnull1);
  	if (compare != 0)
***************
*** 2538,2544 ****
  		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
  		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
  
! 		compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
  										  datum1, isnull1,
  										  datum2, isnull2);
  		if (compare != 0)
--- 2854,2861 ----
  		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
  		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
  
! 		compare = inlineApplySortFunction(&scanKey->sk_func,
! 										  state->sortFnKinds[nkey],
  										  datum1, isnull1,
  										  datum2, isnull2);
  		if (compare != 0)
***************
*** 2621,2638 ****
  								&stup->isnull1);
  }
  
- static void
- reversedirection_heap(Tuplesortstate *state)
- {
- 	ScanKey		scanKey = state->scanKeys;
- 	int			nkey;
- 
- 	for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
- 	{
- 		scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
- 	}
- }
- 
  
  /*
   * Routines specialized for IndexTuple case
--- 2938,2943 ----
***************
*** 2665,2671 ****
  	CHECK_FOR_INTERRUPTS();
  
  	/* Compare the leading sort key */
! 	compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
  									  a->datum1, a->isnull1,
  									  b->datum1, b->isnull1);
  	if (compare != 0)
--- 2970,2977 ----
  	CHECK_FOR_INTERRUPTS();
  
  	/* Compare the leading sort key */
! 	compare = inlineApplySortFunction(&scanKey->sk_func,
! 									  SORTFUNC_CMP,
  									  a->datum1, a->isnull1,
  									  b->datum1, b->isnull1);
  	if (compare != 0)
***************
*** 2691,2699 ****
  		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
  		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
  
! 		compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
  										  datum1, isnull1,
  										  datum2, isnull2);
  		if (compare != 0)
  			return compare;		/* done when we find unequal attributes */
  
--- 2997,3010 ----
  		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
  		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
  
! 		/* see comments about NULLs handling in btbuild */
! 
! 		/* the comparison function is always of CMP type */
! 		compare = inlineApplySortFunction(&scanKey->sk_func,
! 										  SORTFUNC_CMP,
  										  datum1, isnull1,
  										  datum2, isnull2);
+ 
  		if (compare != 0)
  			return compare;		/* done when we find unequal attributes */
  
***************
*** 2720,2727 ****
  	if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
  		ereport(ERROR,
  				(errcode(ERRCODE_UNIQUE_VIOLATION),
! 				 errmsg("could not create unique index \"%s\"",
! 						RelationGetRelationName(state->indexRel)),
  				 errdetail("Table contains duplicated values.")));
  
  	/*
--- 3031,3037 ----
  	if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
  		ereport(ERROR,
  				(errcode(ERRCODE_UNIQUE_VIOLATION),
! 				 errmsg("could not create unique index"),
  				 errdetail("Table contains duplicated values.")));
  
  	/*
***************
*** 2809,2826 ****
  								 &stup->isnull1);
  }
  
- static void
- reversedirection_index(Tuplesortstate *state)
- {
- 	ScanKey		scanKey = state->indexScanKey;
- 	int			nkey;
- 
- 	for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
- 	{
- 		scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
- 	}
- }
- 
  
  /*
   * Routines specialized for DatumTuple case
--- 3119,3124 ----
***************
*** 2832,2838 ****
  	/* Allow interrupting long sorts */
  	CHECK_FOR_INTERRUPTS();
  
! 	return inlineApplySortFunction(&state->sortOpFn, state->sortFnFlags,
  								   a->datum1, a->isnull1,
  								   b->datum1, b->isnull1);
  }
--- 3130,3136 ----
  	/* Allow interrupting long sorts */
  	CHECK_FOR_INTERRUPTS();
  
! 	return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
  								   a->datum1, a->isnull1,
  								   b->datum1, b->isnull1);
  }
***************
*** 2925,2943 ****
  							sizeof(tuplen)) != sizeof(tuplen))
  			elog(ERROR, "unexpected end of data");
  }
- 
- static void
- reversedirection_datum(Tuplesortstate *state)
- {
- 	state->sortFnFlags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
- }
- 
- /*
-  * Convenience routine to free a tuple previously loaded into sort memory
-  */
- static void
- free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
- {
- 	FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
- 	pfree(stup->tuple);
- }
--- 3223,3225 ----
