parallel quicksort

Started by Mark Wongover 15 years ago2 messages
#1Mark Wong
markwkm@gmail.com
1 attachment(s)

Hi everyone,

I've been playing around with a process based parallel quicksort
(http://github.com/markwkm/quicksort) and I tried to shoehorn it into
postgres because I wanted to see if I could sort more than integers.
I've attached a patch that creates a new GUC to control the degree of
parallelism and only modified the quicksort algorithm in quicksort.c.
Trying to 'make install' quickly shows me the patch breaks zic and
Andrew Gierth further pointed out on irc (a couple months back now)
that user defined comparison functions won't work as expected in the
forked processes (if I remember that correctly).

Hoping this could be useful, I wanted to put out what I had so far and
see how far away this is from something workable. Not to mention that
there are probably some improvements that could be make to the
parallel quicksort algorithm.

In case anyone is interested in a parallel merge sort algorithm, I
have started something fairly basic here:
http://github.com/markwkm/mergesort

Regards,
Mark

Attachments:

pqs-1.patchapplication/octet-stream; name=pqs-1.patchDownload
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 6af19bd..0170df3 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2607,6 +2607,21 @@ SELECT * FROM parent WHERE key = 2400;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-degree-of-parallelism" xreflabel="degree-of-parallelism">
+      <term><varname>degree-of-parallelism</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>degree-of-parallelism</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Sets the maximum degree of paralleism (DOP) to be used by each backend
+        server.  This is the maximum number of process than can be created in
+        order to do work in parallel.  Currently this is only used by
+        quicksort.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-cursor-tuple-fraction" xreflabel="cursor_tuple_fraction">
       <term><varname>cursor_tuple_fraction</varname> (<type>floating point</type>)</term>
       <indexterm>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index db61569..1ef6289 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -205,6 +205,8 @@ bool		enable_bonjour = false;
 char	   *bonjour_name;
 bool		restart_after_crash = true;
 
+extern int	degree_of_parallelism = 1;
+
 /* PIDs of special child processes; 0 when not running */
 static pid_t StartupPID = 0,
 			BgWriterPID = 0,
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index bd29c5d..ee13110 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -34,6 +34,8 @@
 #include <signal.h>
 #include <unistd.h>
 #include <sys/time.h>
+#include <sys/sem.h>
+#include <sys/shm.h>
 
 #include "access/transam.h"
 #include "access/xact.h"
@@ -57,6 +59,10 @@ bool		log_lock_waits = false;
 /* Pointer to this process's PGPROC struct, if any */
 PGPROC	   *MyProc = NULL;
 
+/* These fields are for inter-process parallelism. */
+extern int sem_id_dop; /* Semaphore just to access the DOP counter. */
+extern int *shm_dop; /* Current degree of parallelism, the DOP counter. */
+
 /*
  * This spinlock protects the freelist of recycled PGPROC structures.
  * We cannot use an LWLock because the LWLock manager depends on already
@@ -242,6 +248,13 @@ InitProcess(void)
 	volatile PROC_HDR *procglobal = ProcGlobal;
 	int			i;
 
+	/* A structure for initialiting the DOP semaphore. */
+	union semun {
+		int val;
+		struct semid_ds *buf;
+		ushort *array;
+	} argument;
+
 	/*
 	 * ProcGlobal should be set up already (if we are a backend, we inherit
 	 * this by fork() or EXEC_BACKEND mechanism from the postmaster).
@@ -332,6 +345,17 @@ InitProcess(void)
 		SHMQueueInit(&(MyProc->myProcLocks[i]));
 	MyProc->recoveryConflictPending = false;
 
+	/* FIXME: handle error */
+	sem_id_dop = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
+	argument.val = 1;
+	semctl(sem_id_dop, 0, SETVAL, argument); /* FIXME: handle error */
+	shm_dop = shmat(sem_id_dop, NULL, 0); /* FIXME: handle error */
+	/*
+	 * Initialize the DOP to 1 because there is always 1 process active, which
+	 * is the backend that was started to handle the connection.
+	 */
+	*shm_dop = 1;
+
 	/*
 	 * We might be reusing a semaphore that belonged to a failed process. So
 	 * be careful and reinitialize its value here.	(This is not strictly
@@ -712,6 +736,10 @@ ProcKill(int code, Datum arg)
 	/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
 	if (AutovacuumLauncherPid != 0)
 		kill(AutovacuumLauncherPid, SIGUSR2);
+
+	/* Clean up the semaphore and shared memory used for the DOP variables. */
+	shmdt(shm_dop); /* FIXME: handle error */
+	shmctl(sem_id_dop, IPC_RMID, NULL); /* FIXME: handle error */
 }
 
 /*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b209128..0d23b1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -118,6 +118,7 @@ extern char *temp_tablespaces;
 extern bool synchronize_seqscans;
 extern bool fullPageWrites;
 extern int	ssl_renegotiation_limit;
+extern int	degree_of_parallelism;
 
 #ifdef TRACE_SORT
 extern bool trace_sort;
@@ -2118,6 +2119,15 @@ static struct config_int ConfigureNamesInt[] =
 		1024, 100, 102400, NULL, NULL
 	},
 
+	{
+		{"degree_of_parallelism", PGC_USERSET, QUERY_TUNING_OTHER,
+			gettext_noop("Sets the size maximum processes to be forked per backend."),
+			NULL,
+		},
+		&degree_of_parallelism,
+		1, 1, 1024, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index d31f1a1..bf58f5f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -246,6 +246,8 @@
 #join_collapse_limit = 8		# 1 disables collapsing of explicit 
 					# JOIN clauses
 
+#degree_of_parallelism = 1		# range 1-1024
+
 
 #------------------------------------------------------------------------------
 # ERROR REPORTING AND LOGGING
diff --git a/src/port/qsort.c b/src/port/qsort.c
index 7d50fb8..605c51d 100644
--- a/src/port/qsort.c
+++ b/src/port/qsort.c
@@ -1,5 +1,7 @@
 /*
- *	qsort.c: standard quicksort algorithm
+ *	qsort.c: a modified recursive parallel quicksort algorithm
+ *
+ * FIXME: Do we need to set up function for auxiliary processes for sorting?
  *
  *	Modifications from vanilla NetBSD source:
  *	  Add do ... while() macro fix
@@ -43,9 +45,26 @@
  * SUCH DAMAGE.
  */
 
+#include <unistd.h>
+#include <sys/sem.h>
+
 #include "c.h"
 
 
+/*
+ * FIXME:
+ * The only reason for defining these three variables here is because the
+ * quicksort code is built into libpgport and other files, such as
+ * postmaster.c and proc.c do not.  I do not know what other scenarios would
+ * be more appropriate.
+ */
+int sem_id_dop;
+int *shm_dop;
+int degree_of_parallelism;
+
+static void get_lock();
+static void release_lock();
+
 static char *med3(char *a, char *b, char *c,
 	 int (*cmp) (const void *, const void *));
 static void swapfunc(char *, char *, size_t, int);
@@ -91,6 +110,30 @@ swapfunc(char *a, char *b, size_t n, int swaptype)
 
 #define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
 
+static void
+get_lock()
+{
+	struct sembuf operations[1];
+
+	operations[0].sem_num = 0;
+	operations[0].sem_op = -1;
+	operations[0].sem_flg = 0;
+	/* FIXME: handle error */
+	semop(sem_id_dop, operations, 1);
+}
+
+void static
+release_lock()
+{
+	struct sembuf operations[1];
+
+	operations[0].sem_num = 0;
+	operations[0].sem_op = 1;
+	operations[0].sem_flg = 0;
+	/* FIXME: handle error */
+	semop(sem_id_dop, operations, 1);
+}
+
 static char *
 med3(char *a, char *b, char *c, int (*cmp) (const void *, const void *))
 {
@@ -114,9 +157,18 @@ pg_qsort(void *a, size_t n, size_t es, int (*cmp) (const void *, const void *))
 				swaptype,
 				presorted;
 
-loop:SWAPINIT(a, es);
+	/*
+	 * Use -1 to initialize because fork() uses 0 to identify a process as a
+	 * child.
+	 */
+	int lchild = -1;
+	int rchild = -1;
+	int status; /* For waitpid() only. */
+
+	SWAPINIT(a, es);
 	if (n < 7)
 	{
+		/* Insertion sort if less than 7 elements. */
 		for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
 			for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0;
 				 pl -= es)
@@ -124,6 +176,7 @@ loop:SWAPINIT(a, es);
 		return;
 	}
 	presorted = 1;
+	/* Check if sorted. */
 	for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
 	{
 		if (cmp(pm - es, pm) > 0)
@@ -148,6 +201,7 @@ loop:SWAPINIT(a, es);
 		}
 		pm = med3(pl, pm, pn, cmp);
 	}
+	/* Begin "partition" logic. */
 	swap(a, pm);
 	pa = pb = (char *) a + es;
 	pc = pd = (char *) a + (n - 1) * es;
@@ -177,19 +231,64 @@ loop:SWAPINIT(a, es);
 		pb += es;
 		pc -= es;
 	}
+	/* End "partition" logic. */
 	pn = (char *) a + n * es;
 	r = Min(pa - (char *) a, pb - pa);
 	vecswap(a, pb - r, r);
 	r = Min(pd - pc, pn - pd - es);
 	vecswap(pb, pn - r, r);
 	if ((r = pb - pa) > es)
-		qsort(a, r / es, es, cmp);
+	{
+		get_lock();
+		if (*shm_dop < degree_of_parallelism)
+		{
+			/* Under the degree limit, fork. */
+			++*shm_dop;
+			release_lock();
+
+			lchild = fork(); /* FIXME: handle error */
+			if (lchild == 0) {
+				/* The 'left' child starts processing. */
+				qsort(a, r / es, es, cmp);
+
+				get_lock();
+				--*shm_dop;
+				release_lock();
+				exit(0);
+			}
+		}
+		else
+		{
+			release_lock();
+			qsort(a, r / es, es, cmp);
+		}
+	}
 	if ((r = pd - pc) > es)
 	{
-		/* Iterate rather than recurse to save stack space */
-		a = pn - r;
-		n = r / es;
-		goto loop;
+		get_lock();
+		if (*shm_dop < degree_of_parallelism)
+		{
+			/* Under the degree limit, fork. */
+			++*shm_dop;
+			release_lock();
+
+			rchild = fork(); /* FIXME: handle error */
+			if (lchild == 0) {
+				/* The 'right' child starts processing. */
+				qsort(pn - r, r / es, es, cmp);
+
+				get_lock();
+				--*shm_dop;
+				release_lock();
+				exit(0);
+			}
+		}
+		else
+		{
+			release_lock();
+			qsort(pn - r, r / es, es, cmp);
+		}
 	}
-/*		qsort(pn - r, r / es, es, cmp);*/
+	waitpid(lchild, &status, 0);
+	waitpid(rchild, &status, 0);
 }
diff --git a/src/port/qsort_arg.c b/src/port/qsort_arg.c
index 586f2f6..ca26e93 100644
--- a/src/port/qsort_arg.c
+++ b/src/port/qsort_arg.c
@@ -43,9 +43,19 @@
  * SUCH DAMAGE.
  */
 
+#include <unistd.h>
+#include <sys/sem.h>
+
 #include "c.h"
 
 
+int sem_id_dop;
+int *shm_dop;
+int degree_of_parallelism;
+
+static void get_lock();
+static void release_lock();
+
 static char *med3(char *a, char *b, char *c,
 	 qsort_arg_comparator cmp, void *arg);
 static void swapfunc(char *, char *, size_t, int);
@@ -91,6 +101,30 @@ swapfunc(char *a, char *b, size_t n, int swaptype)
 
 #define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
 
+static void
+get_lock()
+{
+	struct sembuf operations[1];
+
+	operations[0].sem_num = 0;
+	operations[0].sem_op = -1;
+	operations[0].sem_flg = 0;
+	/* FIXME: handle error */
+	semop(sem_id_dop, operations, 1);
+}
+
+void static
+release_lock()
+{
+	struct sembuf operations[1];
+
+	operations[0].sem_num = 0;
+	operations[0].sem_op = 1;
+	operations[0].sem_flg = 0;
+	/* FIXME: handle error */
+	semop(sem_id_dop, operations, 1);
+}
+
 static char *
 med3(char *a, char *b, char *c, qsort_arg_comparator cmp, void *arg)
 {
@@ -114,7 +148,11 @@ qsort_arg(void *a, size_t n, size_t es, qsort_arg_comparator cmp, void *arg)
 				swaptype,
 				presorted;
 
-loop:SWAPINIT(a, es);
+	int lchild = -1;
+	int rchild = -1;
+	int status; /* For waitpid() only. */
+
+	SWAPINIT(a, es);
 	if (n < 7)
 	{
 		for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
@@ -183,13 +221,57 @@ loop:SWAPINIT(a, es);
 	r = Min(pd - pc, pn - pd - es);
 	vecswap(pb, pn - r, r);
 	if ((r = pb - pa) > es)
-		qsort_arg(a, r / es, es, cmp, arg);
+	{
+		get_lock();
+		if (*shm_dop < degree_of_parallelism)
+		{
+			/* Under the degree limit, fork. */
+			++*shm_dop;
+			release_lock();
+
+			lchild = fork(); /* FIXME: handle error */
+			if (lchild == 0) {
+				/* The 'left' child starts processing. */
+				qsort_arg(a, r / es, es, cmp, arg);
+
+				get_lock();
+				--*shm_dop;
+				release_lock();
+				exit(0);
+			}
+		}
+		else
+		{
+			release_lock();
+			qsort_arg(a, r / es, es, cmp, arg);
+		}
+	}
 	if ((r = pd - pc) > es)
 	{
-		/* Iterate rather than recurse to save stack space */
-		a = pn - r;
-		n = r / es;
-		goto loop;
+		get_lock();
+		if (*shm_dop < degree_of_parallelism)
+		{
+			/* Under the degree limit, fork. */
+			++*shm_dop;
+			release_lock();
+
+			rchild = fork(); /* FIXME: handle error */
+			if (lchild == 0) {
+				/* The 'right' child starts processing. */
+				qsort_arg(pn - r, r / es, es, cmp, arg);
+
+				get_lock();
+				--*shm_dop;
+				release_lock();
+				exit(0);
+			}
+		}
+		else
+		{
+			release_lock();
+			qsort_arg(pn - r, r / es, es, cmp, arg);
+		}
 	}
-/*		qsort_arg(pn - r, r / es, es, cmp, arg);*/
+	waitpid(lchild, &status, 0);
+	waitpid(rchild, &status, 0);
 }
#2Markus Wanner
markus@bluegap.ch
In reply to: Mark Wong (#1)
Re: parallel quicksort

Hi,

On 08/09/2010 12:04 AM, Mark Wong wrote:

I've been playing around with a process based parallel quicksort
(http://github.com/markwkm/quicksort) and I tried to shoehorn it into
postgres because I wanted to see if I could sort more than integers.
I've attached a patch that creates a new GUC to control the degree of
parallelism and only modified the quicksort algorithm in quicksort.c.
Trying to 'make install' quickly shows me the patch breaks zic and
Andrew Gierth further pointed out on irc (a couple months back now)
that user defined comparison functions won't work as expected in the
forked processes (if I remember that correctly).

I'm not sure what the problems are, but the background worker
infrastructure I recently posted could possibly solve this problem, as
those are more like normal backends. (Assuming you were forking from the
backend).

Hoping this could be useful, I wanted to put out what I had so far and
see how far away this is from something workable. Not to mention that
there are probably some improvements that could be make to the
parallel quicksort algorithm.

Thanks for sharing.

Regards

Markus Wanner