>From a1fcab2d9d001505a5fc25accdca71e88148e4ff Mon Sep 17 00:00:00 2001
From: Daniel Bausch <bausch@dvs.tu-darmstadt.de>
Date: Tue, 29 Oct 2013 16:41:09 +0100
Subject: [PATCH 4/4] Limit recursive prefetching for merge join

Add switch facility to limit the prefetching of a subtree recursively.
In a first try add support for some variants of merge join.  Distribute
the prefetch allowance evenly between outer and inner subplan.
---
 src/backend/access/index/indexam.c   |  5 +++-
 src/backend/executor/execProcnode.c  | 47 +++++++++++++++++++++++++++++++++++-
 src/backend/executor/nodeAgg.c       | 10 ++++++++
 src/backend/executor/nodeIndexscan.c | 18 ++++++++++++++
 src/backend/executor/nodeMaterial.c  | 14 +++++++++++
 src/backend/executor/nodeMergejoin.c | 22 +++++++++++++++++
 src/include/access/relscan.h         |  1 +
 src/include/executor/executor.h      |  1 +
 src/include/executor/nodeAgg.h       |  3 +++
 src/include/executor/nodeIndexscan.h |  3 +++
 src/include/executor/nodeMaterial.h  |  3 +++
 src/include/executor/nodeMergejoin.h |  3 +++
 src/include/nodes/execnodes.h        |  6 +++++
 13 files changed, 134 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 5f44dec..354bde6 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -255,6 +255,7 @@ index_beginscan(Relation heapRelation,
 	scan->xs_prefetch_head = scan->xs_prefetch_tail = -1;
 	scan->xs_last_prefetch = -1;
 	scan->xs_done = false;
+	scan->xs_prefetch_limit = INDEXSCAN_PREFETCH_COUNT;
 #endif
 
 	return scan;
@@ -506,7 +507,9 @@ index_prefetch(IndexScanDesc scan, int maxPrefetch, ScanDirection direction)
 	GET_SCAN_PROCEDURE(amgettuple);
 
 	while (numPrefetched < maxPrefetch && !scan->xs_done &&
-		   index_prefetch_queue_space(scan) > 0)
+		   index_prefetch_queue_space(scan) > 0 &&
+		   index_prefetch_queue_space(scan) >
+		   (INDEXSCAN_PREFETCH_COUNT - scan->xs_prefetch_limit))
 	{
 		/*
 		 * The AM's amgettuple proc finds the next index entry matching the
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a8f2c90..a14a0d0 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -745,6 +745,51 @@ ExecEndNode(PlanState *node)
 
 #ifdef USE_PREFETCH
 /* ----------------------------------------------------------------
+ *		ExecLimitPrefetchNode
+ *
+ *		Limit the amount of prefetching that may be requested by
+ *		a subplan.
+ *
+ *		Most of the handlers just pass-through the received value
+ *		to their subplans.  That is the case, when they have just
+ *		one subplan that might prefetch.  If they have two subplans
+ *		intelligent heuristics need to be applied to distribute the
+ *		prefetch allowance in a way delivering overall advantage.
+ * ----------------------------------------------------------------
+ */
+void
+ExecLimitPrefetchNode(PlanState *node, int limit)
+{
+	if (node == NULL)
+		return;
+
+	switch (nodeTag(node))
+	{
+		case T_IndexScanState:
+			ExecLimitPrefetchIndexScan((IndexScanState *) node, limit);
+			break;
+
+		case T_MergeJoinState:
+			ExecLimitPrefetchMergeJoin((MergeJoinState *) node, limit);
+			break;
+
+		case T_MaterialState:
+			ExecLimitPrefetchMaterial((MaterialState *) node, limit);
+			break;
+
+		case T_AggState:
+			ExecLimitPrefetchAgg((AggState *) node, limit);
+			break;
+
+		default:
+			elog(INFO,
+				 "missing ExecLimitPrefetchNode handler for node type: %d",
+				 (int) nodeTag(node));
+			break;
+	}
+}
+
+/* ----------------------------------------------------------------
  *		ExecPrefetchNode
  *
  *		Request explicit prefetching from a subtree/node without
@@ -776,4 +821,4 @@ ExecPrefetchNode(PlanState *node, int maxPrefetch)
 			return 0;
 	}
 }
-#endif
+#endif /* USE_PREFETCH */
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index e02a6ff..94f6d77 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1877,6 +1877,16 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	return aggstate;
 }
 
+#ifdef USE_PREFETCH
+void
+ExecLimitPrefetchAgg(AggState *node, int limit)
+{
+	Assert(node != NULL);
+
+	ExecLimitPrefetchNode(outerPlanState(node), limit);
+}
+#endif
+
 static Datum
 GetAggInitVal(Datum textInitVal, Oid transtype)
 {
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index bab0e7a..6ea236e 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -640,6 +640,24 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
 	return indexstate;
 }
 
+#ifdef USE_PREFETCH
+/* ----------------------------------------------------------------
+ *		ExecLimitPrefetchIndexScan
+ *
+ *		Sets/changes the number of tuples whose pages to request in
+ *		advance.
+ * ----------------------------------------------------------------
+ */
+void
+ExecLimitPrefetchIndexScan(IndexScanState *node, int limit)
+{
+	Assert(node != NULL);
+	Assert(node->iss_ScanDesc != NULL);
+
+	node->iss_ScanDesc->xs_prefetch_limit = limit;
+}
+#endif
+
 
 /*
  * ExecIndexBuildScanKeys
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 7a82f56..3370362 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -232,6 +232,20 @@ ExecInitMaterial(Material *node, EState *estate, int eflags)
 	return matstate;
 }
 
+#ifdef USE_PREFETCH
+/* ----------------------------------------------------------------
+ *		ExecLimitPrefetchMaterial
+ * ----------------------------------------------------------------
+ */
+void
+ExecLimitPrefetchMaterial(MaterialState *node, int limit)
+{
+	Assert(node != NULL);
+
+	ExecLimitPrefetchNode(outerPlanState(node), limit);
+}
+#endif
+
 /* ----------------------------------------------------------------
  *		ExecEndMaterial
  * ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index e69bc64..f25e074 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -1627,6 +1627,10 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
 	mergestate->mj_OuterTupleSlot = NULL;
 	mergestate->mj_InnerTupleSlot = NULL;
 
+#ifdef USE_PREFETCH
+	ExecLimitPrefetchMergeJoin(mergestate, MERGEJOIN_PREFETCH_COUNT);
+#endif
+
 	/*
 	 * initialization successful
 	 */
@@ -1636,6 +1640,24 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
 	return mergestate;
 }
 
+#ifdef USE_PREFETCH
+/* ----------------------------------------------------------------
+ *		ExecLimitPrefetchMergeJoin
+ * ----------------------------------------------------------------
+ */
+void
+ExecLimitPrefetchMergeJoin(MergeJoinState *node, int limit)
+{
+	int outerLimit = limit/2;
+	int innerLimit = limit/2;
+
+	Assert(node != NULL);
+
+	ExecLimitPrefetchNode(outerPlanState(node), outerLimit);
+	ExecLimitPrefetchNode(innerPlanState(node), innerLimit);
+}
+#endif
+
 /* ----------------------------------------------------------------
  *		ExecEndMergeJoin
  *
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index bccc1a4..3297900 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -104,6 +104,7 @@ typedef struct IndexScanDescData
 	int			xs_prefetch_tail;
 	BlockNumber	xs_last_prefetch;
 	bool		xs_done;
+	int			xs_prefetch_limit;
 #endif
 }	IndexScanDescData;
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 88d0522..09b94e0 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -222,6 +222,7 @@ extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 #ifdef USE_PREFETCH
+extern void ExecLimitPrefetchNode(PlanState *node, int limit);
 extern int ExecPrefetchNode(PlanState *node, int maxPrefetch);
 #endif
 
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index 38823d6..f775ec8 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -17,6 +17,9 @@
 #include "nodes/execnodes.h"
 
 extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags);
+#ifdef USE_PREFETCH
+extern void ExecLimitPrefetchAgg(AggState *node, int limit);
+#endif
 extern TupleTableSlot *ExecAgg(AggState *node);
 extern void ExecEndAgg(AggState *node);
 extern void ExecReScanAgg(AggState *node);
diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h
index f93632c..ccf3121 100644
--- a/src/include/executor/nodeIndexscan.h
+++ b/src/include/executor/nodeIndexscan.h
@@ -17,6 +17,9 @@
 #include "nodes/execnodes.h"
 
 extern IndexScanState *ExecInitIndexScan(IndexScan *node, EState *estate, int eflags);
+#ifdef USE_PREFETCH
+extern void ExecLimitPrefetchIndexScan(IndexScanState *node, int limit);
+#endif
 extern TupleTableSlot *ExecIndexScan(IndexScanState *node);
 extern int ExecPrefetchIndexScan(IndexScanState *node, int maxPrefetch);
 extern void ExecEndIndexScan(IndexScanState *node);
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index cfca0a5..5c81fe8 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -17,6 +17,9 @@
 #include "nodes/execnodes.h"
 
 extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);
+#ifdef USE_PREFETCH
+extern void ExecLimitPrefetchMaterial(MaterialState *node, int limit);
+#endif
 extern TupleTableSlot *ExecMaterial(MaterialState *node);
 extern void ExecEndMaterial(MaterialState *node);
 extern void ExecMaterialMarkPos(MaterialState *node);
diff --git a/src/include/executor/nodeMergejoin.h b/src/include/executor/nodeMergejoin.h
index fa6b5e0..e402b42 100644
--- a/src/include/executor/nodeMergejoin.h
+++ b/src/include/executor/nodeMergejoin.h
@@ -17,6 +17,9 @@
 #include "nodes/execnodes.h"
 
 extern MergeJoinState *ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags);
+#ifdef USE_PREFETCH
+extern void ExecLimitPrefetchMergeJoin(MergeJoinState *node, int limit);
+#endif
 extern TupleTableSlot *ExecMergeJoin(MergeJoinState *node);
 extern void ExecEndMergeJoin(MergeJoinState *node);
 extern void ExecReScanMergeJoin(MergeJoinState *node);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 27fe65d..64ed6fb 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1585,6 +1585,12 @@ typedef struct MergeJoinState
 	ExprContext *mj_InnerEContext;
 } MergeJoinState;
 
+#ifdef USE_PREFETCH
+# ifndef MERGEJOIN_PREFETCH_COUNT
+#  define MERGEJOIN_PREFETCH_COUNT 32
+# endif
+#endif
+
 /* ----------------
  *	 HashJoinState information
  *
-- 
2.0.5

