From ac7f8aaa5b7257fb84f819b27525e00a5a6ced84 Mon Sep 17 00:00:00 2001
From: John Naylor <john.naylor@postgresql.org>
Date: Thu, 23 Jun 2022 16:07:10 +0700
Subject: [PATCH v1 2/2] Implement dual-pivot quicksort

Choose pivots by running insertion sort on five candidates and choosing
the 2nd and 4th, ("tertile of five"). If any of the five are equal, we
assume the input has many duplicates and fall back to B&M since it's
optimized for that case.
---
 src/include/lib/sort_template.h               | 191 +++++++++++++++++-
 src/test/modules/debug_qsort/Makefile         |  18 ++
 .../modules/debug_qsort/debug_qsort--1.0.sql  |   1 +
 src/test/modules/debug_qsort/debug_qsort.c    |  70 +++++++
 .../modules/debug_qsort/debug_qsort.control   |   4 +
 5 files changed, 282 insertions(+), 2 deletions(-)
 create mode 100644 src/test/modules/debug_qsort/Makefile
 create mode 100644 src/test/modules/debug_qsort/debug_qsort--1.0.sql
 create mode 100644 src/test/modules/debug_qsort/debug_qsort.c
 create mode 100644 src/test/modules/debug_qsort/debug_qsort.control

diff --git a/src/include/lib/sort_template.h b/src/include/lib/sort_template.h
index 54921de568..2e4ef2604d 100644
--- a/src/include/lib/sort_template.h
+++ b/src/include/lib/sort_template.h
@@ -240,6 +240,7 @@ ST_SCOPE void ST_SORT(ST_ELEMENT_TYPE * first, size_t n
 #define DO_SWAP(a_, b_) DO_SWAPN((a_), (b_), element_size)
 #endif
 
+#if 0
 /*
  * Find the median of three values.  Currently, performance seems to be best
  * if the comparator is inlined here, but the med3 function is not inlined
@@ -256,6 +257,7 @@ ST_MED3(ST_ELEMENT_TYPE * a,
 		(DO_COMPARE(b, c) < 0 ? b : (DO_COMPARE(a, c) < 0 ? c : a))
 		: (DO_COMPARE(b, c) > 0 ? b : (DO_COMPARE(a, c) < 0 ? a : c));
 }
+#endif
 
 static inline void
 ST_SWAP(ST_POINTER_TYPE * a, ST_POINTER_TYPE * b)
@@ -283,6 +285,18 @@ ST_SORT_INTERNAL(ST_POINTER_TYPE * data, size_t n
 		ST_SORT_PROTO_ARG)
 {
 	ST_POINTER_TYPE *a = data,
+				*left,
+				*right,
+				*e1,
+				*e2,
+				*e3,
+				*e4,
+				*e5,
+				*pivot1,
+				*pivot2,
+				*less,
+				*great,
+				*k,
 			   *pa,
 			   *pb,
 			   *pc,
@@ -291,9 +305,12 @@ ST_SORT_INTERNAL(ST_POINTER_TYPE * data, size_t n
 			   *pm,
 			   *pn;
 	size_t		d1,
-				d2;
+				d2,
+				d3,
+				seventh;
 	int			r,
 				presorted;
+	bool		unique_hint = true;
 
 loop:
 	DO_CHECK_FOR_INTERRUPTS();
@@ -319,6 +336,7 @@ loop:
 	}
 	if (presorted)
 		return;
+#if 0
 	pm = a + (n / 2) * ST_POINTER_STEP;
 	if (n > 7)
 	{
@@ -334,7 +352,175 @@ loop:
 		}
 		pm = DO_MED3(pl, pm, pn);
 	}
-	DO_SWAP(a, pm);
+#endif
+
+	left = a;
+	right = a + (n - 1) * ST_POINTER_STEP;
+
+#ifdef QSORT_DEBUG
+	elog(NOTICE, "start:");
+
+	for (ST_POINTER_TYPE *i = left; i <= right; i += ST_POINTER_STEP)
+	{
+			elog(NOTICE, "%d ", *i);
+	}
+#endif
+
+	// select five pivot candidates spaced equally around the midpoint
+	seventh = n / 7 * ST_POINTER_STEP;
+	e3 = a + (n / 2) * ST_POINTER_STEP;
+	e2 = e3 - seventh;
+	e1 = e2 - seventh;
+	e4 = e3 + seventh;
+	e5 = e4 + seventh;
+
+	// do insertion sort on pivot candidates
+	for (pm = e2; pm <= e5; pm += seventh)
+		for (pl = pm; pl > e1 && (r = DO_COMPARE(pl - seventh, pl)) >= 0;
+			 pl -= seventh)
+			{
+				if (r == 0)
+				{
+					/* found two equal candidates */
+					unique_hint = false;
+					break;
+				}
+				else
+					DO_SWAP(pl, pl - seventh);
+			}
+
+#ifdef QSORT_DEBUG
+	elog(NOTICE, "after sorting pivot candidates:");
+	for (ST_POINTER_TYPE *i = left; i <= right; i += ST_POINTER_STEP)
+	{
+		if (i == e1)
+			elog(NOTICE, "%d e1", *i);
+		else if (i == e2)
+			elog(NOTICE, "%d e2", *i);
+		else if (i == e3)
+			elog(NOTICE, "%d e3", *i);
+		else if (i == e4)
+			elog(NOTICE, "%d e4", *i);
+		else if (i == e5)
+			elog(NOTICE, "%d e5", *i);
+		else
+			elog(NOTICE, "%d ", *i);
+	}
+#endif
+
+	/* if the pivot candidates were all unique, use dual-pivot quicksort, otherwise use B&M quicksort since it is faster on inputs with many duplicates */
+	if (unique_hint)
+	{
+		DO_SWAP(e2, left);
+		DO_SWAP(e4, right);
+
+		pivot1 = left;
+		pivot2 = right;
+		less = left + ST_POINTER_STEP;
+		great = right - ST_POINTER_STEP;
+
+#ifdef QSORT_DEBUG
+		elog(NOTICE, "before swap loop:");
+		for (ST_POINTER_TYPE *i = left; i <= right; i += ST_POINTER_STEP)
+		{
+			if (i == pivot1)
+				elog(NOTICE, "%d pivot 1", *i);
+			else if (i == pivot2)
+				elog(NOTICE, "%d pivot 2", *i);
+			else if (i == less && i == great)
+				elog(NOTICE, "%d less/great", *i);
+			else if (i == less)
+				elog(NOTICE, "%d less", *i);
+			else if (i == great)
+				elog(NOTICE, "%d great", *i);
+			else
+				elog(NOTICE, "%d ", *i);
+		}
+#endif
+
+		/*
+		 * dual-pivot partitioning
+		 *
+		 *   left part           center part                   right part
+		 * +--------------------------------------------------------------+
+		 * |  < pivot1  |  pivot1 <= && <= pivot2  |    ?    |  > pivot2  |
+		 * +--------------------------------------------------------------+
+		 *               ^                          ^       ^
+		 *               |                          |       |
+		 *              less                        k     great
+		 *
+		 * Invariants:
+		 *
+		 *              all in (left, less)   < pivot1
+		 *    pivot1 <= all in [less, k)     <= pivot2
+		 *              all in (great, right) > pivot2
+		 *
+		 * k points to the first element in the "?" part
+		 */
+		for (k = less; k <= great; k += ST_POINTER_STEP)
+		{
+			if (DO_COMPARE(k, pivot1) < 0)
+			{
+				if (k > less)
+					DO_SWAP(k, less);
+				less += ST_POINTER_STEP;
+			}
+			else if (DO_COMPARE(k, pivot2) > 0)
+			{
+				while (k < great && DO_COMPARE(great, pivot2) > 0)
+				{
+					great-= ST_POINTER_STEP;
+					DO_CHECK_FOR_INTERRUPTS();
+				}
+
+				DO_SWAP(k, great);
+				great-= ST_POINTER_STEP;
+
+				if (DO_COMPARE(k, pivot1) < 0)
+				{
+					if (k > less)
+						DO_SWAP(k, less);
+					less += ST_POINTER_STEP;
+				}
+			}
+			DO_CHECK_FOR_INTERRUPTS();
+		}
+
+#ifdef QSORT_DEBUG
+		elog(NOTICE, "after swap loop:");
+		for (ST_POINTER_TYPE *i = left; i <= right; i += ST_POINTER_STEP)
+		{
+			if (i == pivot1)
+				elog(NOTICE, "%d pivot 1", *i);
+			else if (i == pivot2)
+				elog(NOTICE, "%d pivot 2", *i);
+			else if (i == less && i == great)
+				elog(NOTICE, "%d less/great", *i);
+			else if (i == less)
+				elog(NOTICE, "%d less", *i);
+			else if (i == great)
+				elog(NOTICE, "%d great", *i);
+			else
+				elog(NOTICE, "%d ", *i);
+		}
+#endif
+
+		DO_SWAP(less - ST_POINTER_STEP, pivot1);
+		DO_SWAP(great + ST_POINTER_STEP, pivot2);
+
+		d1 = (less - ST_POINTER_STEP) - left;
+		d2 = (great + ST_POINTER_STEP) - less;
+		d3 = right - (great + ST_POINTER_STEP);
+
+		// WIP: don't worry about choosing a subarray for iteration for now
+		DO_SORT(left, d1 / ST_POINTER_STEP);
+		DO_SORT(less, d2 / ST_POINTER_STEP);
+		DO_SORT(great + 2 * ST_POINTER_STEP, d3 / ST_POINTER_STEP);
+	}
+	else // B&M
+	{
+	/* use median of five for the pivot */
+	DO_SWAP(a, e3);
 	pa = pb = a + ST_POINTER_STEP;
 	pc = pd = a + (n - 1) * ST_POINTER_STEP;
 	for (;;)
@@ -399,6 +585,7 @@ loop:
 			goto loop;
 		}
 	}
+	}
 }
 
 /*
diff --git a/src/test/modules/debug_qsort/Makefile b/src/test/modules/debug_qsort/Makefile
new file mode 100644
index 0000000000..6cb7ecba84
--- /dev/null
+++ b/src/test/modules/debug_qsort/Makefile
@@ -0,0 +1,18 @@
+MODULE_big = debug_qsort
+OBJS = debug_qsort.o
+PGFILEDESC = "test"
+EXTENSION = debug_qsort
+DATA = debug_qsort--1.0.sql
+
+first: all
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/debug_qsort
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/debug_qsort/debug_qsort--1.0.sql b/src/test/modules/debug_qsort/debug_qsort--1.0.sql
new file mode 100644
index 0000000000..9d4399a11c
--- /dev/null
+++ b/src/test/modules/debug_qsort/debug_qsort--1.0.sql
@@ -0,0 +1 @@
+CREATE FUNCTION debug_qsort_int(int[]) RETURNS void AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/debug_qsort/debug_qsort.c b/src/test/modules/debug_qsort/debug_qsort.c
new file mode 100644
index 0000000000..0424df03cd
--- /dev/null
+++ b/src/test/modules/debug_qsort/debug_qsort.c
@@ -0,0 +1,70 @@
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include <stdlib.h>
+
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+
+// comparator for qsort_arg()
+// XXX copied from the version in nbtutils.c
+static inline int
+btint4fastcmp(const void * x, const void * y)
+{
+	int32		*a = (int32 *) x;
+	int32		*b = (int32 *) y;
+
+	if (*a > *b)
+		return 1;
+	else if (*a == *b)
+		return 0;
+	else
+		return -1;
+}
+
+#define 	QSORT_DEBUG
+
+// qsort with inlined comparator
+// #define ST_SORT qsort_int32
+// #define ST_ELEMENT_TYPE int32
+// #define ST_COMPARE(a, b) (btint4fastcmp(a, b))
+// #define ST_SCOPE static
+// #define ST_DEFINE
+// #define ST_DECLARE
+// #include "lib/sort_template_dp.h"
+
+// like qsort.c
+// #define ST_THRESHOLD_INSERTION_SORT 7
+#define ST_SORT qsort_runtime
+#define ST_ELEMENT_TYPE_VOID
+#define ST_COMPARE_RUNTIME_POINTER
+#define ST_SCOPE
+#define ST_DECLARE
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+/* from contrib/intarray */
+#define ARRPTR(x)  ( (int32 *) ARR_DATA_PTR(x) )
+#define ARRNELEMS(x)  ArrayGetNItems(ARR_NDIM(x), ARR_DIMS(x))
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(debug_qsort_int);
+Datum
+debug_qsort_int(PG_FUNCTION_ARGS)
+{
+
+	ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
+	int     count = ARRNELEMS(a);
+	int		*arr = ARRPTR(a);
+
+	qsort_runtime(arr, count, sizeof(int32), btint4fastcmp);
+	elog(NOTICE, "sorted:");
+	for (int i = 0; i < count; i++)
+	{
+		elog(NOTICE, "%d", arr[i]);
+	}
+
+	PG_RETURN_NULL();
+}
diff --git a/src/test/modules/debug_qsort/debug_qsort.control b/src/test/modules/debug_qsort/debug_qsort.control
new file mode 100644
index 0000000000..860b803aa2
--- /dev/null
+++ b/src/test/modules/debug_qsort/debug_qsort.control
@@ -0,0 +1,4 @@
+comment = 'test'
+default_version = '1.0'
+module_pathname = '$libdir/debug_qsort'
+relocatable = true
-- 
2.36.1

