>From fb77b5fb53c5aa86fd227c00d7bdd06bdce66dc1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 30 Nov 2015 17:46:19 +0900
Subject: [PATCH 5/5] Temporary implement of merge join with parallel sort

This patch enables asynchronous execution of parallel execution of
both subtree of merge join if both side are explicitly sorted. This is
quite artifitial behavior but convenient to see difference of
performance.
---
 src/backend/executor/execAmi.c           |  8 ++++++
 src/backend/executor/nodeGather.c        | 35 ++++++++++++++++++++++++-
 src/backend/optimizer/plan/createplan.c  | 45 ++++++++++++++++++++++++++++++++
 src/backend/utils/misc/guc.c             |  9 +++++++
 src/include/executor/nodeGather.h        |  2 ++
 src/include/optimizer/cost.h             |  1 +
 src/test/regress/expected/rangefuncs.out |  4 ++-
 7 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index b969fc0..8289a26 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -315,6 +315,10 @@ ExecMarkPos(PlanState *node)
 			ExecSortMarkPos((SortState *) node);
 			break;
 
+		case T_GatherState:
+			ExecGatherMarkPos((GatherState *) node);
+			break;
+
 		case T_ResultState:
 			ExecResultMarkPos((ResultState *) node);
 			break;
@@ -364,6 +368,10 @@ ExecRestrPos(PlanState *node)
 			ExecSortRestrPos((SortState *) node);
 			break;
 
+		case T_GatherState:
+			ExecGatherRestrPos((GatherState *) node);
+			break;
+
 		case T_ResultState:
 			ExecResultRestrPos((ResultState *) node);
 			break;
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index b642d51..ec45622 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -8,7 +8,7 @@
  *
  * A Gather executor launches parallel workers to run multiple copies of a
  * plan.  It can also run the plan itself, if the workers are not available
- * or have not started up yet.  It then merges all of the results it produces
+ * or have not stated up yet.  It then merges all of the results it produces
  * and the results from the workers into a single output stream.  Therefore,
  * it will normally be used with a plan where running multiple copies of the
  * same plan does not produce duplicate output, such as PartialSeqScan.
@@ -127,6 +127,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
  * Returns true if any of underlying nodes started asynchronously
  * ----------------------------------------------------------------
  */
+/* for test */
+static bool from_execgather = false;
 bool
 StartGather(GatherState *node)
 {
@@ -138,6 +140,10 @@ StartGather(GatherState *node)
 	if (!ExecNode_is_inited(node))
 		return false;
 
+	elog(DEBUG1, "nodeGather executed %s",
+		 from_execgather ? "synchronously" : "asynchronously");
+	from_execgather = false;
+
 	SetNodeRunState(node, Started);
 
 	/*
@@ -220,6 +226,7 @@ ExecGather(GatherState *node)
 	ExprContext *econtext;
 
 	/* Execute childs asynchronously if possible */
+	from_execgather = true;
 	if (ExecNode_is_inited(node))
 		StartGather(node);
 
@@ -468,6 +475,32 @@ ExecShutdownGather(GatherState *node)
  */
 
 /* ----------------------------------------------------------------
+ *		ExecGatherMarkPos
+ *
+ *		Calls MarkPos of the top node on the worker
+ * ----------------------------------------------------------------
+ */
+void
+ExecGatherMarkPos(GatherState *node)
+{
+	/* There's no means to command worker? */
+	elog(DEBUG1, "MarkPos on Gather is not implemented");
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortRestrPos
+ *
+ *		Calls tuplesort to restore the last saved sort file position.
+ * ----------------------------------------------------------------
+ */
+void
+ExecGatherRestrPos(GatherState *node)
+{
+	/* There's no means to command worker? */
+	elog(ERROR, "RestrPos on Gather is not implemented");
+}
+
+/* ----------------------------------------------------------------
  *		ExecReScanGather
  *
  *		Re-initialize the workers and rescans a relation via them.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 411b36c..e3d3456 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -844,10 +844,24 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 
 		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
 		if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		{
 			subplan = (Plan *) make_sort(root, subplan, numsortkeys,
 										 sortColIdx, sortOperators,
 										 collations, nullsFirst,
 										 best_path->limit_tuples);
+			if (enable_parasortmerge)
+			{
+				Gather *gather;
+
+				gather = make_gather(subplan->targetlist,
+									 NIL,
+									 1, /* num_workers */
+									 true, /* single_copy */
+									 subplan);
+				subplan = (Plan *)gather;
+				root->glob->parallelModeNeeded = true;
+			}
+		}
 
 		subplans = lappend(subplans, subplan);
 	}
@@ -2360,6 +2374,9 @@ create_nestloop_plan(PlannerInfo *root,
 	return join_plan;
 }
 
+bool enable_parasortmerge = false;
+extern bool enable_asyncexec;
+
 static MergeJoin *
 create_mergejoin_plan(PlannerInfo *root,
 					  MergePath *best_path,
@@ -2637,6 +2654,34 @@ create_mergejoin_plan(PlannerInfo *root,
 	/*
 	 * Now we can build the mergejoin node.
 	 */
+
+	/* Try sort in bgworker, even if both side is not sort */
+	if (enable_parasortmerge)
+	{
+		Gather *gather;
+
+		if (IsA(outer_plan, Sort))
+		{
+			gather = make_gather(outer_plan->targetlist,
+								 NIL,
+								 1, /* num_workers */
+								 true, /* single_copy */
+								 outer_plan);
+			outer_plan = (Plan *)gather;
+			root->glob->parallelModeNeeded = true;
+		}
+
+		if (IsA(inner_plan, Sort))
+		{
+			gather = make_gather(inner_plan->targetlist,
+								 NIL,
+								 1, /* num_workers */
+								 true, /* single_copy */
+								 inner_plan);
+			inner_plan = (Plan *)gather;
+			root->glob->parallelModeNeeded = true;
+		}
+	}
 	join_plan = make_mergejoin(tlist,
 							   joinclauses,
 							   otherclauses,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 16e04d2..61afd1e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -855,6 +855,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_parasortmerge", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables parallel sort if both side of mergejoin need to be sorted."),
+			NULL
+		},
+		&enable_parasortmerge,
+		false,
+		NULL, NULL, NULL
+	},
+	{
 		{"enable_asyncexec", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enable early execution."),
 			NULL
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index e7cbe21..d724a2e 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -21,6 +21,8 @@ extern bool StartGather(GatherState *node);
 extern TupleTableSlot *ExecGather(GatherState *node);
 extern void ExecEndGather(GatherState *node);
 extern void ExecShutdownGather(GatherState *node);
+extern void ExecGatherMarkPos(GatherState *node);
+extern void ExecGatherRestrPos(GatherState *node);
 extern void ExecReScanGather(GatherState *node);
 
 #endif   /* NODEGATHER_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index e9b7595..4f9de21 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern bool enable_hashagg;
 extern bool enable_nestloop;
 extern bool enable_material;
 extern bool enable_mergejoin;
+extern bool enable_parasortmerge;
 extern bool enable_asyncexec;
 extern bool enable_hashjoin;
 extern int	constraint_exclusion;
diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out
index 00ef421..73b5aaa 100644
--- a/src/test/regress/expected/rangefuncs.out
+++ b/src/test/regress/expected/rangefuncs.out
@@ -1,6 +1,7 @@
 SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
          name         | setting 
 ----------------------+---------
+ enable_asyncexec     | off
  enable_bitmapscan    | on
  enable_hashagg       | on
  enable_hashjoin      | on
@@ -9,10 +10,11 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
  enable_material      | on
  enable_mergejoin     | on
  enable_nestloop      | on
+ enable_parasortmerge | off
  enable_seqscan       | on
  enable_sort          | on
  enable_tidscan       | on
-(11 rows)
+(13 rows)
 
 CREATE TABLE foo2(fooid int, f2 int);
 INSERT INTO foo2 VALUES(1, 11);
-- 
1.8.3.1

