From 34a5807fca043023c48fb4810710ab4ed42ce728 Mon Sep 17 00:00:00 2001
From: Arseny Sher <sher-ars@ispras.ru>
Date: Sat, 11 Mar 2017 02:33:47 +0300
Subject: [PATCH 6/8] Reversed Limit implementation.

---
 src/backend/executor/execProcnode.c |  18 ++-
 src/backend/executor/nodeLimit.c    | 244 ++++++++----------------------------
 src/include/executor/nodeLimit.h    |   6 +-
 src/include/nodes/execnodes.h       |  10 +-
 4 files changed, 78 insertions(+), 200 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index f2275876d6..1ebb0da36f 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -197,6 +197,12 @@ ExecInitNode(Plan *node, EState *estate, int eflags, PlanState *parent)
 			result = (PlanState *) ExecInitHash((Hash *) node,
 												estate, eflags, parent);
 			break;
+
+		case T_Limit:
+			result = (PlanState *) ExecInitLimit((Limit *) node,
+												 estate, eflags, parent);
+			break;
+
 		default:
 			elog(ERROR, "unrecognized/unsupported node type: %d",
 				 (int) nodeTag(node));
@@ -274,7 +280,9 @@ ExecPushTuple(TupleTableSlot *slot, PlanState *pusher)
 		return SendReadyTuple(slot, pusher);
 	}
 
-	if (nodeTag(receiver) == T_HashState)
+	if (nodeTag(receiver) == T_LimitState)
+		return ExecPushTupleToLimit(slot, (LimitState *) receiver);
+	else if (nodeTag(receiver) == T_HashState)
 		return ExecPushTupleToHash(slot, (HashState *) receiver);
 
 	/* does push come from the outer side? */
@@ -314,7 +322,9 @@ ExecPushNull(TupleTableSlot *slot, PlanState *pusher)
 		return;
 	}
 
-	if (nodeTag(receiver) == T_HashState)
+	if (nodeTag(receiver) == T_LimitState)
+		return ExecPushNullToLimit(slot, (LimitState *) receiver);
+	else if (nodeTag(receiver) == T_HashState)
 		return ExecPushNullToHash(slot, (HashState *) receiver);
 
 	/* does push come from the outer side? */
@@ -391,6 +401,10 @@ ExecEndNode(PlanState *node)
 			ExecEndHash((HashState *) node);
 			break;
 
+		case T_LimitState:
+			ExecEndLimit((LimitState *) node);
+			break;
+
 		default:
 			elog(ERROR, "unrecognized/unsupported node type: %d",
 				 (int) nodeTag(node));
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index bcacbfc13b..ad3b6b436c 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -28,199 +28,66 @@
 static void recompute_limits(LimitState *node);
 static void pass_down_bound(LimitState *node, PlanState *child_node);
 
-
-/* ----------------------------------------------------------------
- *		ExecLimit
- *
- *		This is a very simple node which just performs LIMIT/OFFSET
- *		filtering on the stream of tuples returned by a subplan.
- * ----------------------------------------------------------------
- */
-TupleTableSlot *				/* return: a tuple or NULL */
-ExecLimit(LimitState *node)
+bool
+ExecPushTupleToLimit(TupleTableSlot *slot, LimitState *node)
 {
-	ScanDirection direction;
-	TupleTableSlot *slot;
-	PlanState  *outerPlan;
+	bool parent_accepts_tuples;
+	bool limit_accepts_tuples;
+	/* last tuple in the window just pushed */
+	bool last_tuple_pushed;
 
 	/*
-	 * get information from the node
+	 * Backward direction is not supported at the moment
 	 */
-	direction = node->ps.state->es_direction;
-	outerPlan = outerPlanState(node);
+	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+	/* guard against calling ExecPushTupleToLimit after it returned false */
+	Assert(node->lstate != LIMIT_DONE);
 
-	/*
-	 * The main logic is a simple state machine.
-	 */
-	switch (node->lstate)
+	if (node->lstate == LIMIT_INITIAL)
 	{
-		case LIMIT_INITIAL:
-
-			/*
-			 * First call for this node, so compute limit/offset. (We can't do
-			 * this any earlier, because parameters from upper nodes will not
-			 * be set during ExecInitLimit.)  This also sets position = 0 and
-			 * changes the state to LIMIT_RESCAN.
-			 */
-			recompute_limits(node);
-
-			/* FALL THRU */
-
-		case LIMIT_RESCAN:
-
-			/*
-			 * If backwards scan, just return NULL without changing state.
-			 */
-			if (!ScanDirectionIsForward(direction))
-				return NULL;
-
-			/*
-			 * Check for empty window; if so, treat like empty subplan.
-			 */
-			if (node->count <= 0 && !node->noCount)
-			{
-				node->lstate = LIMIT_EMPTY;
-				return NULL;
-			}
-
-			/*
-			 * Fetch rows from subplan until we reach position > offset.
-			 */
-			for (;;)
-			{
-				slot = ExecProcNode(outerPlan);
-				if (TupIsNull(slot))
-				{
-					/*
-					 * The subplan returns too few tuples for us to produce
-					 * any output at all.
-					 */
-					node->lstate = LIMIT_EMPTY;
-					return NULL;
-				}
-				node->subSlot = slot;
-				if (++node->position > node->offset)
-					break;
-			}
-
-			/*
-			 * Okay, we have the first tuple of the window.
-			 */
-			node->lstate = LIMIT_INWINDOW;
-			break;
-
-		case LIMIT_EMPTY:
-
-			/*
-			 * The subplan is known to return no tuples (or not more than
-			 * OFFSET tuples, in general).  So we return no tuples.
-			 */
-			return NULL;
-
-		case LIMIT_INWINDOW:
-			if (ScanDirectionIsForward(direction))
-			{
-				/*
-				 * Forwards scan, so check for stepping off end of window. If
-				 * we are at the end of the window, return NULL without
-				 * advancing the subplan or the position variable; but change
-				 * the state machine state to record having done so.
-				 */
-				if (!node->noCount &&
-					node->position - node->offset >= node->count)
-				{
-					node->lstate = LIMIT_WINDOWEND;
-					return NULL;
-				}
-
-				/*
-				 * Get next tuple from subplan, if any.
-				 */
-				slot = ExecProcNode(outerPlan);
-				if (TupIsNull(slot))
-				{
-					node->lstate = LIMIT_SUBPLANEOF;
-					return NULL;
-				}
-				node->subSlot = slot;
-				node->position++;
-			}
-			else
-			{
-				/*
-				 * Backwards scan, so check for stepping off start of window.
-				 * As above, change only state-machine status if so.
-				 */
-				if (node->position <= node->offset + 1)
-				{
-					node->lstate = LIMIT_WINDOWSTART;
-					return NULL;
-				}
-
-				/*
-				 * Get previous tuple from subplan; there should be one!
-				 */
-				slot = ExecProcNode(outerPlan);
-				if (TupIsNull(slot))
-					elog(ERROR, "LIMIT subplan failed to run backwards");
-				node->subSlot = slot;
-				node->position--;
-			}
-			break;
-
-		case LIMIT_SUBPLANEOF:
-			if (ScanDirectionIsForward(direction))
-				return NULL;
-
-			/*
-			 * Backing up from subplan EOF, so re-fetch previous tuple; there
-			 * should be one!  Note previous tuple must be in window.
-			 */
-			slot = ExecProcNode(outerPlan);
-			if (TupIsNull(slot))
-				elog(ERROR, "LIMIT subplan failed to run backwards");
-			node->subSlot = slot;
-			node->lstate = LIMIT_INWINDOW;
-			/* position does not change 'cause we didn't advance it before */
-			break;
-
-		case LIMIT_WINDOWEND:
-			if (ScanDirectionIsForward(direction))
-				return NULL;
-
-			/*
-			 * Backing up from window end: simply re-return the last tuple
-			 * fetched from the subplan.
-			 */
-			slot = node->subSlot;
-			node->lstate = LIMIT_INWINDOW;
-			/* position does not change 'cause we didn't advance it before */
-			break;
-
-		case LIMIT_WINDOWSTART:
-			if (!ScanDirectionIsForward(direction))
-				return NULL;
-
-			/*
-			 * Advancing after having backed off window start: simply
-			 * re-return the last tuple fetched from the subplan.
-			 */
-			slot = node->subSlot;
-			node->lstate = LIMIT_INWINDOW;
-			/* position does not change 'cause we didn't change it before */
-			break;
-
-		default:
-			elog(ERROR, "impossible LIMIT state: %d",
-				 (int) node->lstate);
-			slot = NULL;		/* keep compiler quiet */
-			break;
+		/*
+		 * First call for this node, so compute limit/offset. (We can't do
+		 * this any earlier, because parameters from upper nodes will not
+		 * be set during ExecInitLimit.) This also sets position = 0.
+		 */
+		recompute_limits(node);
+
+		/*
+		 * Check for empty window; if so, treat like empty subplan.
+		 */
+		if (!node->noCount && node->count <= 0)
+		{
+			node->lstate = LIMIT_DONE;
+			ExecPushNull(NULL, (PlanState *) node);
+			return false;
+		}
+
+		node->lstate = LIMIT_ACTIVE;
 	}
 
-	/* Return the current tuple */
-	Assert(!TupIsNull(slot));
+	if (++node->position <= node->offset)
+	{
+		/* we are not inside the window yet, wait for the next tuple */
+		return true;
+	}
+	/* Now we are sure that we are inside the window and this tuple has to be
+	   pushed */
+	parent_accepts_tuples = ExecPushTuple(slot, (PlanState *) node);
+	/* Probably OFFSET is exhausted */
+	last_tuple_pushed = !node->noCount &&
+		node->position - node->offset >= node->count;
+	limit_accepts_tuples = parent_accepts_tuples && !last_tuple_pushed;
+	if (!limit_accepts_tuples)
+		node->lstate = LIMIT_DONE;
+	return limit_accepts_tuples;
+}
 
-	return slot;
+/* NULL came from below, so this LIMIT is done	anyway */
+void
+ExecPushNullToLimit(TupleTableSlot *slot, LimitState *node)
+{
+	node->lstate = LIMIT_DONE;
+	ExecPushNull(slot, (PlanState *) node);
 }
 
 /*
@@ -290,9 +157,6 @@ recompute_limits(LimitState *node)
 	node->position = 0;
 	node->subSlot = NULL;
 
-	/* Set state-machine state */
-	node->lstate = LIMIT_RESCAN;
-
 	/* Notify child node about limit, if useful */
 	pass_down_bound(node, outerPlanState(node));
 }
@@ -361,7 +225,7 @@ pass_down_bound(LimitState *node, PlanState *child_node)
  * ----------------------------------------------------------------
  */
 LimitState *
-ExecInitLimit(Limit *node, EState *estate, int eflags)
+ExecInitLimit(Limit *node, EState *estate, int eflags, PlanState *parent)
 {
 	LimitState *limitstate;
 	Plan	   *outerPlan;
@@ -375,6 +239,7 @@ ExecInitLimit(Limit *node, EState *estate, int eflags)
 	limitstate = makeNode(LimitState);
 	limitstate->ps.plan = (Plan *) node;
 	limitstate->ps.state = estate;
+	limitstate->ps.parent = parent;
 
 	limitstate->lstate = LIMIT_INITIAL;
 
@@ -403,7 +268,8 @@ ExecInitLimit(Limit *node, EState *estate, int eflags)
 	 * then initialize outer plan
 	 */
 	outerPlan = outerPlan(node);
-	outerPlanState(limitstate) = ExecInitNode(outerPlan, estate, eflags, NULL);
+	outerPlanState(limitstate) = ExecInitNode(outerPlan, estate, eflags,
+											  (PlanState *) limitstate);
 
 	/*
 	 * limit nodes do no projections, so initialize projection info for this
diff --git a/src/include/executor/nodeLimit.h b/src/include/executor/nodeLimit.h
index 6e4084b46d..ceefca0bf2 100644
--- a/src/include/executor/nodeLimit.h
+++ b/src/include/executor/nodeLimit.h
@@ -16,8 +16,10 @@
 
 #include "nodes/execnodes.h"
 
-extern LimitState *ExecInitLimit(Limit *node, EState *estate, int eflags);
-extern TupleTableSlot *ExecLimit(LimitState *node);
+extern LimitState *ExecInitLimit(Limit *node, EState *estate, int eflags,
+								 PlanState *parent);
+extern bool ExecPushTupleToLimit(TupleTableSlot *slot, LimitState *node);
+extern void ExecPushNullToLimit(TupleTableSlot *slot, LimitState *node);
 extern void ExecEndLimit(LimitState *node);
 extern void ExecReScanLimit(LimitState *node);
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index abbe67ba0c..056db943b0 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2208,13 +2208,9 @@ typedef struct LockRowsState
  */
 typedef enum
 {
-	LIMIT_INITIAL,				/* initial state for LIMIT node */
-	LIMIT_RESCAN,				/* rescan after recomputing parameters */
-	LIMIT_EMPTY,				/* there are no returnable rows */
-	LIMIT_INWINDOW,				/* have returned a row in the window */
-	LIMIT_SUBPLANEOF,			/* at EOF of subplan (within window) */
-	LIMIT_WINDOWEND,			/* stepped off end of window */
-	LIMIT_WINDOWSTART			/* stepped off beginning of window */
+	LIMIT_INITIAL,		/* initial state for LIMIT node */
+	LIMIT_ACTIVE,		/* waiting for tuples */
+	LIMIT_DONE,			/* pushed all needed tuples */
 } LimitStateCond;
 
 typedef struct LimitState
-- 
2.11.0

