>From d8b1533955e3471fb2eb6a030619dcbc258955a8 Mon Sep 17 00:00:00 2001
From: Daniel Bausch <bausch@dvs.tu-darmstadt.de>
Date: Mon, 28 Oct 2013 10:43:16 +0100
Subject: [PATCH 3/4] First try on tuple look-ahead in nestloop

Similarly to the prefetching logic just added to the index scan, look
ahead tuples in the outer loop of a nested loop scan.  For every tuple
looked ahead issue an explicit request for prefetching to the inner
plan.  Modify the index scan to react on this request.
---
 src/backend/access/index/indexam.c   |  81 +++++++++-----
 src/backend/executor/execProcnode.c  |  36 +++++++
 src/backend/executor/nodeIndexscan.c |  16 +++
 src/backend/executor/nodeNestloop.c  | 200 ++++++++++++++++++++++++++++++++++-
 src/include/access/genam.h           |   4 +
 src/include/executor/executor.h      |   3 +
 src/include/executor/nodeIndexscan.h |   1 +
 src/include/nodes/execnodes.h        |  12 +++
 8 files changed, 323 insertions(+), 30 deletions(-)

diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index d8a4622..5f44dec 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -493,6 +493,57 @@ index_prefetch_queue_pop(IndexScanDesc scan)
 	return res;
 }
 
+#ifdef USE_PREFETCH
+int
+index_prefetch(IndexScanDesc scan, int maxPrefetch, ScanDirection direction)
+{
+	FmgrInfo   *procedure;
+	int			numPrefetched = 0;
+	bool		found;
+	BlockNumber	pf_block;
+	FILE	   *logfile;
+
+	GET_SCAN_PROCEDURE(amgettuple);
+
+	while (numPrefetched < maxPrefetch && !scan->xs_done &&
+		   index_prefetch_queue_space(scan) > 0)
+	{
+		/*
+		 * The AM's amgettuple proc finds the next index entry matching the
+		 * scan keys, and puts the TID into scan->xs_ctup.t_self.  It should
+		 * also set scan->xs_recheck and possibly scan->xs_itup, though we pay
+		 * no attention to those fields here.
+		 */
+		found = DatumGetBool(FunctionCall2(procedure,
+										   PointerGetDatum(scan),
+										   Int32GetDatum(direction)));
+		if (found)
+		{
+			index_prefetch_queue_push(scan, &scan->xs_ctup.t_self);
+			pf_block = ItemPointerGetBlockNumber(&scan->xs_ctup.t_self);
+
+			/*
+			 * Prefetch only if not the current buffer and not exactly the
+			 * previously prefetched buffer (heuristic random detection)
+			 * because sequential read-ahead would be redundant
+			 */
+			if ((!BufferIsValid(scan->xs_cbuf) ||
+				 pf_block != BufferGetBlockNumber(scan->xs_cbuf)) &&
+				pf_block != scan->xs_last_prefetch)
+			{
+				PrefetchBuffer(scan->heapRelation, MAIN_FORKNUM, pf_block);
+				scan->xs_last_prefetch = pf_block;
+				numPrefetched++;
+			}
+		}
+		else
+			scan->xs_done = true;
+	}
+
+	return numPrefetched;
+}
+#endif
+
 /* ----------------
  * index_getnext_tid - get the next TID from a scan
  *
@@ -506,7 +557,6 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 	FmgrInfo   *procedure;
 	bool		found;
 	ItemPointer	from_queue;
-	BlockNumber	pf_block;
 
 	SCAN_CHECKS;
 	GET_SCAN_PROCEDURE(amgettuple);
@@ -516,34 +566,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 #ifdef USE_PREFETCH
 	if (!scan->xs_want_itup)
 	{
-		while (!scan->xs_done && index_prefetch_queue_space(scan) > 0) {
-			/*
-			 * The AM's amgettuple proc finds the next index entry matching
-			 * the scan keys, and puts the TID into scan->xs_ctup.t_self.  It
-			 * should also set scan->xs_recheck and possibly scan->xs_itup,
-			 * though we pay no attention to those fields here.
-			 */
-			found = DatumGetBool(FunctionCall2(procedure,
-											   PointerGetDatum(scan),
-											   Int32GetDatum(direction)));
-			if (found)
-			{
-				index_prefetch_queue_push(scan, &scan->xs_ctup.t_self);
-				pf_block = ItemPointerGetBlockNumber(&scan->xs_ctup.t_self);
-				/* prefetch only if not the current buffer and not exactly the
-				 * previously prefetched buffer (heuristic random detection)
-				 * because sequential read-ahead would be redundant */
-				if ((!BufferIsValid(scan->xs_cbuf) ||
-					 pf_block != BufferGetBlockNumber(scan->xs_cbuf)) &&
-					pf_block != scan->xs_last_prefetch)
-				{
-					PrefetchBuffer(scan->heapRelation, MAIN_FORKNUM, pf_block);
-					scan->xs_last_prefetch = pf_block;
-				}
-			}
-			else
-				scan->xs_done = true;
-		}
+		index_prefetch(scan, INDEXSCAN_PREFETCH_COUNT, direction);
 		from_queue = index_prefetch_queue_pop(scan);
 		if (from_queue)
 		{
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 76dd62f..a8f2c90 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -741,3 +741,39 @@ ExecEndNode(PlanState *node)
 			break;
 	}
 }
+
+
+#ifdef USE_PREFETCH
+/* ----------------------------------------------------------------
+ *		ExecPrefetchNode
+ *
+ *		Request explicit prefetching from a subtree/node without
+ *		actually forming a tuple.
+ *
+ *		The node shall request at most 'maxPrefetch' pages being
+ *		prefetched.
+ *
+ *		The function returns how many pages have been requested.
+ *
+ *		Calling this function for a type that does not support
+ *		prefetching is not an error.  It just returns 0 as if no
+ *		prefetching was possible.
+ * ----------------------------------------------------------------
+ */
+int
+ExecPrefetchNode(PlanState *node, int maxPrefetch)
+{
+	if (node == NULL)
+		return 0;
+
+	switch (nodeTag(node))
+	{
+		case T_IndexScanState:
+			return ExecPrefetchIndexScan((IndexScanState *) node,
+										 maxPrefetch);
+
+		default:
+			return 0;
+	}
+}
+#endif
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index f1062f1..bab0e7a 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -192,6 +192,22 @@ ExecReScanIndexScan(IndexScanState *node)
 	ExecScanReScan(&node->ss);
 }
 
+#ifdef USE_PREFETCH
+/* ----------------------------------------------------------------
+ *		ExecPrefetchIndexScan(node, maxPrefetch)
+ *
+ *		Trigger prefetching of index scan without actually fetching
+ *		a tuple.
+ * ----------------------------------------------------------------
+ */
+int
+ExecPrefetchIndexScan(IndexScanState *node, int maxPrefetch)
+{
+	return index_prefetch(node->iss_ScanDesc, maxPrefetch,
+						  node->ss.ps.state->es_direction);
+}
+#endif
+
 
 /*
  * ExecIndexEvalRuntimeKeys
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index c7a08ed..21ad5f8 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -25,6 +25,90 @@
 #include "executor/nodeNestloop.h"
 #include "utils/memutils.h"
 
+#ifdef USE_PREFETCH
+static int
+NestLoopLookAheadQueueSpace(NestLoopState *node)
+{
+	if (node->nl_lookAheadQueueTail < 0)
+		return NESTLOOP_PREFETCH_COUNT;
+
+	Assert(node->nl_lookAheadQueueHead >= 0);
+
+	return (NESTLOOP_PREFETCH_COUNT
+			- (node->nl_lookAheadQueueTail - node->nl_lookAheadQueueHead + 1))
+		% NESTLOOP_PREFETCH_COUNT;
+}
+
+/* makes materialized copy of tuple table slot */
+static bool
+NestLoopLookAheadQueuePush(NestLoopState *node, TupleTableSlot *tuple)
+{
+	TupleTableSlot **queueEntry;
+
+	Assert(NestLoopLookAheadQueueSpace(node) > 0);
+
+	if (node->nl_lookAheadQueueTail == -1)
+		node->nl_lookAheadQueueHead = node->nl_lookAheadQueueTail = 0;
+	else
+		node->nl_lookAheadQueueTail =
+			(node->nl_lookAheadQueueTail +1) % NESTLOOP_PREFETCH_COUNT;
+
+	queueEntry = &node->nl_lookAheadQueue[node->nl_lookAheadQueueTail];
+
+	if (!(*queueEntry))
+	{
+		*queueEntry = ExecInitExtraTupleSlot(node->js.ps.state);
+		ExecSetSlotDescriptor(*queueEntry,
+							  ExecGetResultType(outerPlanState(node)));
+	}
+
+	ExecCopySlot(*queueEntry, tuple);
+
+	return true;
+}
+
+static TupleTableSlot *
+NestLoopLookAheadQueuePop(NestLoopState *node)
+{
+	TupleTableSlot *res;
+
+	if (node->nl_lookAheadQueueHead < 0)
+		return NULL;
+
+	res = node->nl_lookAheadQueue[node->nl_lookAheadQueueHead];
+
+	if (node->nl_lookAheadQueueHead == node->nl_lookAheadQueueTail)
+		node->nl_lookAheadQueueHead = node->nl_lookAheadQueueTail = -1;
+	else
+		node->nl_lookAheadQueueHead =
+			(node->nl_lookAheadQueueHead + 1) % NESTLOOP_PREFETCH_COUNT;
+
+	return res;
+}
+
+static void
+NestLoopLookAheadQueueClear(NestLoopState *node)
+{
+	TupleTableSlot *lookAheadTuple;
+	int		i;
+
+	/*
+	 * As we do not clear the tuple table slots on pop, we need to scan the
+	 * whole array, regardless of the current queue fill.
+	 *
+	 * We cannot really free the slot, as there is no well defined interface
+	 * for that, but the emptied slots will be freed when the query ends.
+	 */
+	for (i = 0; i < NESTLOOP_PREFETCH_COUNT; i++)
+	{
+		lookAheadTuple = node->nl_lookAheadQueue[i];
+		/* look only on pointer - all non NULL fields are non-empty */
+		if (lookAheadTuple)
+			ExecClearTuple(lookAheadTuple);
+	}
+
+}
+#endif /* USE_PREFETCH */
 
 /* ----------------------------------------------------------------
  *		ExecNestLoop(node)
@@ -120,7 +204,87 @@ ExecNestLoop(NestLoopState *node)
 		if (node->nl_NeedNewOuter)
 		{
 			ENL1_printf("getting new outer tuple");
-			outerTupleSlot = ExecProcNode(outerPlan);
+
+#ifdef USE_PREFETCH
+			/*
+			 * While we have outer tuples and were not able to request enought
+			 * prefetching from the inner plan to properly load the system,
+			 * request more outer tuples and inner prefetching for them.
+			 *
+			 * Unfortunately we can do outer look-ahead directed prefetching
+			 * only when we are rescanning the inner plan anyway; otherwise we
+			 * would break the inner scan.  Only an independent copy of the
+			 * inner plan state would allow us to prefetch accross inner loops
+			 * regardless of inner scan position.
+			 */
+			while (!node->nl_lookAheadDone &&
+				   node->nl_numInnerPrefetched < NESTLOOP_PREFETCH_COUNT &&
+				   NestLoopLookAheadQueueSpace(node) > 0)
+			{
+				TupleTableSlot *lookAheadTupleSlot = ExecProcNode(outerPlan);
+
+				if (!TupIsNull(lookAheadTupleSlot))
+				{
+					NestLoopLookAheadQueuePush(node, lookAheadTupleSlot);
+
+					/*
+					 * Set inner params according to look-ahead tuple.
+					 *
+					 * Fetch the values of any outer Vars that must be passed
+					 * to the inner scan, and store them in the appropriate
+					 * PARAM_EXEC slots.
+					 */
+					foreach(lc, nl->nestParams)
+					{
+						NestLoopParam *nlp = (NestLoopParam *) lfirst(lc);
+						int			paramno = nlp->paramno;
+						ParamExecData *prm;
+
+						prm = &(econtext->ecxt_param_exec_vals[paramno]);
+						/* Param value should be an OUTER_VAR var */
+						Assert(IsA(nlp->paramval, Var));
+						Assert(nlp->paramval->varno == OUTER_VAR);
+						Assert(nlp->paramval->varattno > 0);
+						prm->value = slot_getattr(lookAheadTupleSlot,
+												  nlp->paramval->varattno,
+												  &(prm->isnull));
+						/* Flag parameter value as changed */
+						innerPlan->chgParam =
+							bms_add_member(innerPlan->chgParam, paramno);
+					}
+
+					/*
+					 * Rescan inner plan with changed parameters and request
+					 * explicit prefetch.  Limit the inner prefetch amount
+					 * according to our own bookkeeping.
+					 *
+					 * When the so processed outer tuple gets finally active
+					 * in the inner loop, the inner plan will autonomously
+					 * prefetch the same tuples again.  This is redundant but
+					 * avoiding that seems too complicated for now.  It should
+					 * not hurt too much and may even help in case the
+					 * prefetched blocks have been evicted again in the
+					 * meantime.
+					 */
+					ExecReScan(innerPlan);
+					node->nl_numInnerPrefetched +=
+						ExecPrefetchNode(innerPlan,
+										 NESTLOOP_PREFETCH_COUNT -
+										 node->nl_numInnerPrefetched);
+				}
+				else
+					node->nl_lookAheadDone = true; /* outer plan exhausted */
+			}
+
+			/*
+			 * If there is already the next outerPlan in our look-ahead queue,
+			 * get the next outer tuple from there, otherwise execute the
+			 * outer plan.
+			 */
+			outerTupleSlot = NestLoopLookAheadQueuePop(node);
+			if (TupIsNull(outerTupleSlot) && !node->nl_lookAheadDone)
+#endif /* USE_PREFETCH */
+				outerTupleSlot = ExecProcNode(outerPlan);
 
 			/*
 			 * if there are no more outer tuples, then the join is complete..
@@ -174,6 +338,18 @@ ExecNestLoop(NestLoopState *node)
 		innerTupleSlot = ExecProcNode(innerPlan);
 		econtext->ecxt_innertuple = innerTupleSlot;
 
+#ifdef USE_PREFETCH
+		/*
+		 * Decrement prefetch counter as we cosume inner tuples.  We need to
+		 * check for >0 because prefetching might not have happened for the
+		 * consumed tuple, maybe because explicit prefetching is not supported
+		 * by the inner plan or because the explicit prefetching requested by
+		 * us is exhausted and the inner plan is doing it on its own now.
+		 */
+		if (node->nl_numInnerPrefetched > 0)
+			node->nl_numInnerPrefetched--;
+#endif
+
 		if (TupIsNull(innerTupleSlot))
 		{
 			ENL1_printf("no inner tuple, need new outer tuple");
@@ -296,6 +472,9 @@ NestLoopState *
 ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 {
 	NestLoopState *nlstate;
+#ifdef USE_PREFETCH
+	int i;
+#endif
 
 	/* check for unsupported flags */
 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -381,6 +560,15 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	nlstate->nl_NeedNewOuter = true;
 	nlstate->nl_MatchedOuter = false;
 
+#ifdef USE_PREFETCH
+	nlstate->nl_lookAheadQueueHead = nlstate->nl_lookAheadQueueTail = -1;
+	nlstate->nl_lookAheadDone = false;
+	nlstate->nl_numInnerPrefetched = 0;
+
+	for (i = 0; i < NESTLOOP_PREFETCH_COUNT; i++)
+		nlstate->nl_lookAheadQueue[i] = NULL;
+#endif
+
 	NL1_printf("ExecInitNestLoop: %s\n",
 			   "node initialized");
 
@@ -409,6 +597,10 @@ ExecEndNestLoop(NestLoopState *node)
 	 */
 	ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
 
+#ifdef USE_PREFETCH
+	NestLoopLookAheadQueueClear(node);
+#endif
+
 	/*
 	 * close down subplans
 	 */
@@ -444,4 +636,10 @@ ExecReScanNestLoop(NestLoopState *node)
 	node->js.ps.ps_TupFromTlist = false;
 	node->nl_NeedNewOuter = true;
 	node->nl_MatchedOuter = false;
+
+#ifdef USE_PREFETCH
+	NestLoopLookAheadQueueClear(node);
+	node->nl_lookAheadDone = false;
+	node->nl_numInnerPrefetched = 0;
+#endif
 }
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index a800041..7733b3c 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -146,6 +146,10 @@ extern void index_markpos(IndexScanDesc scan);
 extern void index_restrpos(IndexScanDesc scan);
 extern ItemPointer index_getnext_tid(IndexScanDesc scan,
 				  ScanDirection direction);
+#ifdef USE_PREFETCH
+extern int index_prefetch(IndexScanDesc scan, int maxPrefetch,
+						  ScanDirection direction);
+#endif
 extern HeapTuple index_fetch_heap(IndexScanDesc scan);
 extern HeapTuple index_getnext(IndexScanDesc scan, ScanDirection direction);
 extern int64 index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 75841c8..88d0522 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -221,6 +221,9 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
+#ifdef USE_PREFETCH
+extern int ExecPrefetchNode(PlanState *node, int maxPrefetch);
+#endif
 
 /*
  * prototypes from functions in execQual.c
diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h
index 71dbd9c..f93632c 100644
--- a/src/include/executor/nodeIndexscan.h
+++ b/src/include/executor/nodeIndexscan.h
@@ -18,6 +18,7 @@
 
 extern IndexScanState *ExecInitIndexScan(IndexScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecIndexScan(IndexScanState *node);
+extern int ExecPrefetchIndexScan(IndexScanState *node, int maxPrefetch);
 extern void ExecEndIndexScan(IndexScanState *node);
 extern void ExecIndexMarkPos(IndexScanState *node);
 extern void ExecIndexRestrPos(IndexScanState *node);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3b430e0..27fe65d 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1526,6 +1526,18 @@ typedef struct NestLoopState
 	bool		nl_NeedNewOuter;
 	bool		nl_MatchedOuter;
 	TupleTableSlot *nl_NullInnerTupleSlot;
+
+#ifdef USE_PREFETCH
+# ifndef NESTLOOP_PREFETCH_COUNT
+#  define NESTLOOP_PREFETCH_COUNT 32
+# endif
+	/* look-ahead queue (for prefetching) - ringbuffer */
+	TupleTableSlot *nl_lookAheadQueue[NESTLOOP_PREFETCH_COUNT];
+	int			nl_lookAheadQueueHead;
+	int			nl_lookAheadQueueTail;
+	bool		nl_lookAheadDone;
+	int			nl_numInnerPrefetched;
+#endif
 } NestLoopState;
 
 /* ----------------
-- 
2.0.5

