Re: PassDownLimitBound for ForeignScan/CustomScan [take-2]

Started by Kouhei Kaigaiabout 9 years ago1 messages
#1Kouhei Kaigai
kaigai@ak.jp.nec.com
1 attachment(s)

Hello,

The attached patch is a revised version of pass-down LIMIT to FDW/CSP.

Below is the updates from the last version.

'ps_numTuples' of PlanState was declared as uint64, instead of long
to avoid problems on 32bits machine when a large LIMIT clause is
supplied.

'ps_numTuples' is re-interpreted; 0 means that its upper node wants
to fetch all the tuples. It allows to eliminate a boring initialization
on ExecInit handler for each executor node.

Even though it was not suggested, estimate_path_cost_size() of postgres_fdw
adjusts number of rows if foreign-path is located on top-level of
the base-relations and LIMIT clause takes a constant value.
It will make more adequate plan as follows:

* WITHOUT this patch
--------------------
postgres=# explain verbose select * from t_a, t_b where t_a.id = t_b.id and t_a.x < t_b.x LIMIT 100;
QUERY PLAN
----------------------------------------------------------------------------------------
Limit (cost=261.17..274.43 rows=100 width=88)
Output: t_a.id, t_a.x, t_a.y, t_b.id, t_b.x, t_b.y
-> Hash Join (cost=261.17..581.50 rows=2416 width=88)
Output: t_a.id, t_a.x, t_a.y, t_b.id, t_b.x, t_b.y
Hash Cond: (t_a.id = t_b.id)
Join Filter: (t_a.x < t_b.x)
-> Foreign Scan on public.t_a (cost=100.00..146.12 rows=1204 width=44)
Output: t_a.id, t_a.x, t_a.y
Remote SQL: SELECT id, x, y FROM public.t
-> Hash (cost=146.12..146.12 rows=1204 width=44)
Output: t_b.id, t_b.x, t_b.y
-> Foreign Scan on public.t_b (cost=100.00..146.12 rows=1204 width=44)
Output: t_b.id, t_b.x, t_b.y
Remote SQL: SELECT id, x, y FROM public.t
(14 rows)

* WITH this patch
-----------------
postgres=# explain verbose select * from t_a, t_b where t_a.id = t_b.id and t_a.x < t_b.x LIMIT 100;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------
Limit (cost=100.00..146.58 rows=100 width=88)
Output: t_a.id, t_a.x, t_a.y, t_b.id, t_b.x, t_b.y
-> Foreign Scan (cost=100.00..146.58 rows=100 width=88)
Output: t_a.id, t_a.x, t_a.y, t_b.id, t_b.x, t_b.y
Relations: (public.t_a) INNER JOIN (public.t_b)
Remote SQL: SELECT r1.id, r1.x, r1.y, r2.id, r2.x, r2.y FROM (public.t r1 INNER JOIN public.t r2 ON (((r1.x < r2.x)) AND ((r1.id = r2.id))))
(6 rows)

On the other hands, I noticed it is not safe to attach LIMIT clause at
the planner stage because root->limit_tuples is declared as double.
Even if LIMIT clause takes a constant value, it is potentially larger
than 2^53 which is the limitation we can represent accurately with
float64 data type but LIMIT clause allows up to 2^63-1.
So, postgres_fdw now attaches LIMIT clause on the remote query on
execution time only.

Thanks,
----
PG-Strom Project / NEC OSS Promotion Center
KaiGai Kohei <kaigai@ak.jp.nec.com>

Show quoted text

-----Original Message-----
From: Robert Haas [mailto:robertmhaas@gmail.com]
Sent: Thursday, November 10, 2016 3:08 AM
To: Kaigai Kouhei(海外 浩平) <kaigai@ak.jp.nec.com>
Cc: pgsql-hackers@postgresql.org; Jeevan Chalke
<jeevan.chalke@enterprisedb.com>; Etsuro Fujita
<fujita.etsuro@lab.ntt.co.jp>; Andres Freund <andres@anarazel.de>
Subject: ##freemail## Re: PassDownLimitBound for ForeignScan/CustomScan
[take-2]

On Mon, Oct 31, 2016 at 10:20 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com>
wrote:

As an example, I enhanced postgres_fdw to understand the ps_numTuples
if it is set. If and when remote ORDER BY is pushed down, the latest
code tries to sort the entire remote table because it does not know
how many rows to be returned. Thus, it took larger execution time.
On the other hands, the patched one runs the remote query with LIMIT
clause according to the ps_numTuples; which is informed by the Limit
node on top of the ForeignScan node.

So there are two cases here. If the user says LIMIT 12, we could in theory
know that at planner time and optimize accordingly. If the user says LIMIT
twelve(), however, we will need to wait until execution time unless twelve()
happens to be capable of being simplified to a constant by the planner.

Therefore, it's possible to imagine having two mechanisms here. In the
simple case where the LIMIT and OFFSET values are constants, we could
implement a system to get hold of that information during planning and
use it for whatever we like. In addition, we can have an
execution-time system that optimizes based on values available at execution
(regardless of whether those values were also available during planning).
Those are, basically, two separate things, and this patch has enough to
do just focusing on one of them.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL
Company

Attachments:

passdown-limit-fdw.v2.patchapplication/octet-stream; name=passdown-limit-fdw.v2.patchDownload
 contrib/postgres_fdw/postgres_fdw.c    | 46 ++++++++++++++++++++++-
 src/backend/executor/nodeLimit.c       | 68 ++--------------------------------
 src/backend/executor/nodeMergeAppend.c |  6 ++-
 src/backend/executor/nodeResult.c      | 25 +++++++++++++
 src/backend/executor/nodeSort.c        | 10 +++++
 src/include/nodes/execnodes.h          |  3 ++
 6 files changed, 90 insertions(+), 68 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index fbe6929..721a631 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2428,6 +2428,17 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
 	if (es->verbose)
 	{
 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
+
+		/* LIMIT clause is attached on execution time */
+		if (node->ss.ps.ps_numTuples > 0)
+		{
+			StringInfoData	buf;
+
+			initStringInfo(&buf);
+			appendStringInfo(&buf, "%s LIMIT %lu",
+							 sql, node->ss.ps.ps_numTuples);
+			sql = buf.data;
+		}
 		ExplainPropertyText("Remote SQL", sql, es);
 	}
 }
@@ -2553,6 +2564,13 @@ estimate_path_cost_size(PlannerInfo *root,
 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
 								remote_conds, pathkeys, &retrieved_attrs,
 								NULL);
+		/*
+		 * Attach LIMIT if this path is top-level and constant value is
+		 * specified.
+		 */
+		if (root->limit_tuples >= 0.0 &&
+			bms_equal(root->all_baserels, foreignrel->relids))
+			appendStringInfo(&sql, " LIMIT %.0f", root->limit_tuples);
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
@@ -2627,6 +2645,13 @@ estimate_path_cost_size(PlannerInfo *root,
 
 			/* Estimate of number of rows in cross product */
 			nrows = fpinfo_i->rows * fpinfo_o->rows;
+			if (root->limit_tuples >= 0.0 &&
+				bms_equal(root->all_baserels, foreignrel->relids))
+			{
+				nrows = Min(nrows, root->limit_tuples);
+				rows = Min(rows, root->limit_tuples);
+			}
+
 			/* Clamp retrieved rows estimate to at most size of cross product */
 			retrieved_rows = Min(retrieved_rows, nrows);
 
@@ -2724,6 +2749,10 @@ estimate_path_cost_size(PlannerInfo *root,
 			/*
 			 * Number of rows expected from foreign server will be same as
 			 * that of number of groups.
+			 *
+			 * MEMO: root->limit_tuples is not attached when query contains
+			 * grouping-clause or aggregate functions. So, we don's adjust
+			 * rows even if LIMIT <const> is supplied.
 			 */
 			rows = retrieved_rows = numGroups;
 
@@ -2754,8 +2783,17 @@ estimate_path_cost_size(PlannerInfo *root,
 		}
 		else
 		{
+			double		nrows = foreignrel->tuples;
+
+			if (root->limit_tuples >= 0.0 &&
+				bms_equal(root->all_baserels, foreignrel->relids))
+			{
+				nrows = Min(nrows, root->limit_tuples);
+				rows = Min(rows, root->limit_tuples);
+			}
+
 			/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
-			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
+			retrieved_rows = Min(retrieved_rows, nrows);
 
 			/*
 			 * Cost as though this were a seqscan, which is pessimistic.  We
@@ -2768,7 +2806,7 @@ estimate_path_cost_size(PlannerInfo *root,
 
 			startup_cost += foreignrel->baserestrictcost.startup;
 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
-			run_cost += cpu_per_tuple * foreignrel->tuples;
+			run_cost += cpu_per_tuple * nrows;
 		}
 
 		/*
@@ -2942,6 +2980,10 @@ create_cursor(ForeignScanState *node)
 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
 					 fsstate->cursor_number, fsstate->query);
 
+	/* Append LIMIT if numTuples were passed-down */
+	if (node->ss.ps.ps_numTuples > 0)
+		appendStringInfo(&buf, " LIMIT %ld", node->ss.ps.ps_numTuples);
+
 	/*
 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
 	 * to infer types for all parameters.  Since we explicitly cast every
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index faf32e1..df3fe01 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -26,7 +26,6 @@
 #include "nodes/nodeFuncs.h"
 
 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
 
 
 /* ----------------------------------------------------------------
@@ -232,6 +231,7 @@ static void
 recompute_limits(LimitState *node)
 {
 	ExprContext *econtext = node->ps.ps_ExprContext;
+	uint64		tuples_needed;
 	Datum		val;
 	bool		isNull;
 
@@ -296,70 +296,8 @@ recompute_limits(LimitState *node)
 	node->lstate = LIMIT_RESCAN;
 
 	/* Notify child node about limit, if useful */
-	pass_down_bound(node, outerPlanState(node));
-}
-
-/*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  Also, if our input is a MergeAppend, we can apply the
- * same bound to any Sorts that are direct children of the MergeAppend,
- * since the MergeAppend surely need read no more than that many tuples from
- * any one input.  We also have to be prepared to look through a Result,
- * since the planner might stick one atop MergeAppend for projection purposes.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
- */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
-{
-	if (IsA(child_node, SortState))
-	{
-		SortState  *sortState = (SortState *) child_node;
-		int64		tuples_needed = node->count + node->offset;
-
-		/* negative test checks for overflow in sum */
-		if (node->noCount || tuples_needed < 0)
-		{
-			/* make sure flag gets reset if needed upon rescan */
-			sortState->bounded = false;
-		}
-		else
-		{
-			sortState->bounded = true;
-			sortState->bound = tuples_needed;
-		}
-	}
-	else if (IsA(child_node, MergeAppendState))
-	{
-		MergeAppendState *maState = (MergeAppendState *) child_node;
-		int			i;
-
-		for (i = 0; i < maState->ms_nplans; i++)
-			pass_down_bound(node, maState->mergeplans[i]);
-	}
-	else if (IsA(child_node, ResultState))
-	{
-		/*
-		 * An extra consideration here is that if the Result is projecting a
-		 * targetlist that contains any SRFs, we can't assume that every input
-		 * tuple generates an output tuple, so a Sort underneath might need to
-		 * return more than N tuples to satisfy LIMIT N. So we cannot use
-		 * bounded sort.
-		 *
-		 * If Result supported qual checking, we'd have to punt on seeing a
-		 * qual, too.  Note that having a resconstantqual is not a
-		 * showstopper: if that fails we're not getting any rows at all.
-		 */
-		if (outerPlanState(child_node) &&
-			!expression_returns_set((Node *) child_node->plan->targetlist))
-			pass_down_bound(node, outerPlanState(child_node));
-	}
+	tuples_needed = (node->noCount ? 0 : node->count + node->offset);
+	outerPlanState(node)->ps_numTuples = tuples_needed;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..91707c6 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -174,10 +174,14 @@ ExecMergeAppend(MergeAppendState *node)
 		/*
 		 * First time through: pull the first tuple from each subplan, and set
 		 * up the heap.
+		 * Also, pass down the required number of tuples
 		 */
 		for (i = 0; i < node->ms_nplans; i++)
 		{
-			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+			PlanState  *pstate = node->mergeplans[i];
+
+			pstate->ps_numTuples = node->ps.ps_numTuples;
+			node->ms_slots[i] = ExecProcNode(pstate);
 			if (!TupIsNull(node->ms_slots[i]))
 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
 		}
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 4007b76..9e83eb2 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -47,6 +47,7 @@
 
 #include "executor/executor.h"
 #include "executor/nodeResult.h"
+#include "nodes/nodeFuncs.h"
 #include "utils/memutils.h"
 
 
@@ -75,6 +76,28 @@ ExecResult(ResultState *node)
 	econtext = node->ps.ps_ExprContext;
 
 	/*
+	 * Pass down the number of required tuples by the upper node
+	 *
+	 * An extra consideration here is that if the Result is projecting a
+	 * targetlist that contains any SRFs, we can't assume that every input
+	 * tuple generates an output tuple, so a Sort underneath might need to
+	 * return more than N tuples to satisfy LIMIT N. So we cannot use
+	 * bounded sort.
+	 *
+	 * If Result supported qual checking, we'd have to punt on seeing a
+	 * qual, too.  Note that having a resconstantqual is not a
+	 * showstopper: if that fails we're not getting any rows at all.
+	 */
+	if (!node->rs_started)
+	{
+		if (outerPlanState(node) &&
+			!expression_returns_set((Node *) node->ps.plan->targetlist))
+			outerPlanState(node)->ps_numTuples = node->ps.ps_numTuples;
+
+		node->rs_started = true;
+	}
+
+	/*
 	 * check constant qualifications like (2 > 1), if not already done
 	 */
 	if (node->rs_checkqual)
@@ -218,6 +241,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)
 	resstate->ps.plan = (Plan *) node;
 	resstate->ps.state = estate;
 
+	resstate->rs_started = false;
 	resstate->rs_done = false;
 	resstate->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
 
@@ -294,6 +318,7 @@ ExecEndResult(ResultState *node)
 void
 ExecReScanResult(ResultState *node)
 {
+	node->rs_started = false;
 	node->rs_done = false;
 	node->ps.ps_TupFromTlist = false;
 	node->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a34dcc5..6ec7abb 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -66,6 +66,16 @@ ExecSort(SortState *node)
 
 		SO1_printf("ExecSort: %s\n",
 				   "sorting subplan");
+		/*
+		 * Check bounds according to the required number of tuples
+		 */
+		if (node->ss.ps.ps_numTuples == 0)
+			node->bounded = false;
+		else
+		{
+			node->bounded = true;
+			node->bound = node->ss.ps.ps_numTuples;
+		}
 
 		/*
 		 * Want to scan subplan in the forward direction while creating the
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f6f73f3..3b84959 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1066,6 +1066,8 @@ typedef struct PlanState
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 	bool		ps_TupFromTlist;/* state flag for processing set-valued
 								 * functions in targetlist */
+	uint64		ps_numTuples;	/* number of tuples required by the upper
+								 * node if any. 0 means all the tuples */
 } PlanState;
 
 /* ----------------
@@ -1114,6 +1116,7 @@ typedef struct ResultState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	ExprState  *resconstantqual;
+	bool		rs_started;		/* are we already called? */
 	bool		rs_done;		/* are we done? */
 	bool		rs_checkqual;	/* do we need to check the qual? */
 } ResultState;