From 4209ad4e9d3c46d143de07549061f55f23c50e9d Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Mon, 9 May 2016 11:48:11 -0400
Subject: [PATCH 3/3] Lightweight framework for waiting for events.

---
 src/backend/executor/Makefile       |   4 +-
 src/backend/executor/execAsync.c    | 256 ++++++++++++++++++++++++++++++++++++
 src/backend/executor/execProcnode.c |  82 ++++++++----
 src/include/executor/execAsync.h    |  23 ++++
 src/include/executor/executor.h     |   2 +
 src/include/nodes/execnodes.h       |  10 ++
 6 files changed, 352 insertions(+), 25 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 51edd4c..0675b01 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -12,8 +12,8 @@ subdir = src/backend/executor
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
-       execMain.o execParallel.o execProcnode.o execQual.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execGrouping.o execIndexing.o \
+       execJunk.o execMain.o execParallel.o execProcnode.o execQual.o \
        execScan.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
        nodeBitmapAnd.o nodeBitmapOr.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000..20601fa
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,256 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *	  Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * This file contains routines that are intended to asynchronous
+ * execution; that is, suspending an executor node until some external
+ * event occurs, or until one of its child nodes produces a tuple.
+ * This allows the executor to avoid blocking on a single external event,
+ * such as a file descriptor waiting on I/O, or a parallel worker which
+ * must complete work elsewhere in the plan tree, when there might at the
+ * same time be useful computation that could be accomplished in some
+ * other part of the plan tree.
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execParallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/executor.h"
+#include "storage/latch.h"
+
+#define	EVENT_BUFFER_SIZE		16
+
+static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit);
+
+void
+ExecAsyncWaitForNode(PlanState *planstate)
+{
+	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
+	PlanState  *callbacks[EVENT_BUFFER_SIZE];
+	int			ncallbacks = 0;
+	EState *estate = planstate->state;
+
+	while (!planstate->result_ready)
+	{
+		bool	reinit = (estate->es_wait_event_set == NULL);
+		int		n;
+		int		noccurred;
+
+		if (reinit)
+		{
+			/*
+			 * Allow for a few extra events without reinitializing.  It
+			 * doesn't seem worth the complexity of doing anything very
+			 * aggressive here, because plans that depend on massive numbers
+			 * of external FDs are likely to run afoul of kernel limits anyway.
+			 */
+			estate->es_max_async_events = estate->es_total_async_events + 16;
+			estate->es_wait_event_set =
+				CreateWaitEventSet(estate->es_query_cxt,
+								   estate->es_max_async_events);
+		}
+
+		/* Give each waiting node a chance to add or modify events. */
+		for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+			ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit);
+
+		/* Wait for at least one event to occur. */
+		noccurred = WaitEventSetWait(estate->es_wait_event_set, -1,
+									 occurred_event, EVENT_BUFFER_SIZE);
+		Assert(noccurred > 0);
+
+		/*
+		 * Loop over the occurred events and make a list of nodes that need
+		 * a callback.  The waiting nodes should have registered their wait
+		 * events with user_data pointing back to the node.
+		 */
+		for (n = 0; n < noccurred; ++n)
+		{
+			WaitEvent  *w = &occurred_event[n];
+			PlanState  *ps = w->user_data;
+
+			callbacks[ncallbacks++] = ps;
+		}
+
+		/*
+		 * Initially, this loop will call the node-type-specific function for
+		 * each node for which an event occurred.  If any of those nodes
+		 * produce a result, its parent enters the set of nodes that are
+		 * pending for a callback.  In this way, when a result becomes
+		 * available in a leaf of the plan tree, it can bubble upwards towards
+		 * the root as far as necessary.
+		 */
+		while (ncallbacks > 0)
+		{
+			int		i,
+					j;
+
+			/* Loop over all callbacks. */
+			for (i = 0; i < ncallbacks; ++i)
+			{
+				/* Skip if NULL. */
+				if (callbacks[i] == NULL)
+					continue;
+
+				/*
+				 * Remove any duplicates.  O(n) may not seem good, but it
+				 * should hopefully be OK as long as EVENT_BUFFER_SIZE is
+				 * not too large.
+				 */
+				for (j = i + 1; j < ncallbacks; ++j)
+					if (callbacks[i] == callbacks[j])
+						callbacks[j] = NULL;
+
+				/* Dispatch to node-type-specific code. */
+				ExecDispatchNode(callbacks[i]);
+
+				/*
+				 * If there's now a tuple ready, we must dispatch to the
+				 * parent node; otherwise, there's nothing more to do.
+				 */
+				if (callbacks[i]->result_ready)
+					callbacks[i] = callbacks[i]->parent;
+				else
+					callbacks[i] = NULL;
+			}
+
+			/* Squeeze out NULLs. */
+			for (i = 0, j = 0; j < ncallbacks; ++j)
+				if (callbacks[j] != NULL)
+					callbacks[i++] = callbacks[j];
+			ncallbacks = i;
+		}
+	}
+}
+
+/*
+ * An executor node should call this function to signal that it needs to wait
+ * on one more or events that can be registered on a WaitEventSet.  nevents
+ * should be the maximum number of events that it will wish to register.
+ * reinit should be true if the node can't reuse the WaitEventSet it most
+ * recently initialized, for example because it needs to drop a wait event
+ * from the set.
+ */
+void
+ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit)
+{
+	EState *estate = planstate->state;
+
+	Assert(nevents > 0); 	/* otherwise, use ExecAsyncDoesNotNeedWait */
+
+	/*
+	 * If this node is not already present in the array of waiting nodes,
+	 * then add it.  If that array hasn't been allocated or is full, this may
+	 * require (re)allocating it.
+	 */
+	if (planstate->n_async_events == 0)
+	{
+		if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes)
+		{
+			int		newmax;
+
+			if (estate->es_max_waiting_nodes == 0)
+			{
+				newmax = 16;
+				estate->es_waiting_nodes =
+					MemoryContextAlloc(estate->es_query_cxt, newmax);
+			}
+			else
+			{
+				newmax = estate->es_max_waiting_nodes * 2;
+				estate->es_waiting_nodes =
+					repalloc(estate->es_waiting_nodes,
+							 newmax * sizeof(PlanState *));
+			}
+			estate->es_max_waiting_nodes = newmax;
+		}
+		estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate;
+	}
+
+	/* Adjust per-node and per-estate totals. */
+	estate->es_total_async_events -= planstate->n_async_events;
+	planstate->n_async_events = nevents;
+	estate->es_total_async_events += planstate->n_async_events;
+
+	/*
+	 * If a WaitEventSet has already been created, we need to discard it and
+	 * start again if the user passed reinit = true, or if the total number of
+	 * required events exceeds the supported number.
+	 */
+	if (estate->es_wait_event_set != NULL && (reinit ||
+		estate->es_total_async_events > estate->es_max_async_events))
+	{
+		FreeWaitEventSet(estate->es_wait_event_set);
+		estate->es_wait_event_set = NULL;
+	}
+}
+
+/*
+ * If an executor node no longer needs to wait, it should call this function
+ * to report that fact.
+ */
+void
+ExecAsyncDoesNotNeedWait(PlanState *planstate)
+{
+	int		n;
+	EState *estate = planstate->state;
+
+	if (planstate->n_async_events <= 0)
+		return;
+
+	/*
+	 * Remove the node from the list of waiting nodes.  (Is a linear search
+	 * going to be a problem here?  I think probably not.)
+	 */
+	for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+	{
+		if (estate->es_waiting_nodes[n] == planstate)
+		{
+			estate->es_waiting_nodes[n] =
+				estate->es_waiting_nodes[--estate->es_num_waiting_nodes];
+			break;
+		}
+	}
+
+	/* We should always find ourselves in the array. */
+	Assert(n < estate->es_num_waiting_nodes);
+
+	/* We no longer need any asynchronous events. */
+	estate->es_total_async_events -= planstate->n_async_events;
+	planstate->n_async_events = 0;
+
+	/*
+	 * The next wait will need to rebuild the WaitEventSet, because whatever
+	 * events we registered are gone now.  It's probably OK that this code
+	 * assumes we actually did register some events at one point, because we
+	 * needed to wait at some point and we don't any more.
+	 */
+	if (estate->es_wait_event_set != NULL)
+	{
+		FreeWaitEventSet(estate->es_wait_event_set);
+		estate->es_wait_event_set = NULL;
+	}
+}
+
+/*
+ * Give per-nodetype function a chance to register wait events.
+ */
+static void
+ExecAsyncConfigureWait(PlanState *planstate, bool reinit)
+{
+	switch (nodeTag(planstate))
+	{
+		/* XXX: Add calls to per-nodetype handlers here. */
+		default:
+			elog(ERROR, "unexpected node type: %d", nodeTag(planstate));
+	}
+}
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 3f2ebff..b7ac08e 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -77,6 +77,7 @@
  */
 #include "postgres.h"
 
+#include "executor/execAsync.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeAppend.h"
@@ -368,24 +369,14 @@ ExecInitNode(Plan *node, EState *estate, PlanState *parent, int eflags)
 
 
 /* ----------------------------------------------------------------
- *		ExecProcNode
+ *		ExecDispatchNode
  *
- *		Execute the given node to return a(nother) tuple.
+ *		Invoke the given node's dispatch function.
  * ----------------------------------------------------------------
  */
-TupleTableSlot *
-ExecProcNode(PlanState *node)
+void
+ExecDispatchNode(PlanState *node)
 {
-	TupleTableSlot *result;
-
-	CHECK_FOR_INTERRUPTS();
-
-	/* mark any previous result as having been consumed */
-	node->result_ready = false;
-
-	if (node->chgParam != NULL) /* something changed */
-		ExecReScan(node);		/* let ReScan handle this */
-
 	if (node->instrument)
 		InstrStartNode(node->instrument);
 
@@ -539,22 +530,67 @@ ExecProcNode(PlanState *node)
 
 		default:
 			elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
-			result = NULL;
 			break;
 	}
 
-	/* We don't support asynchronous execution yet. */
-	Assert(node->result_ready);
+	if (node->instrument)
+	{
+		double	nTuples = 0.0;
 
-	/* Result should be a TupleTableSlot, unless it's NULL. */
-	Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+		if (node->result_ready && node->result != NULL &&
+			IsA(node->result, TupleTableSlot))
+			nTuples = 1.0;
 
-	result = (TupleTableSlot *) node->result;
+		InstrStopNode(node->instrument, nTuples);
+	}
+}
 
-	if (node->instrument)
-		InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
 
-	return result;
+/* ----------------------------------------------------------------
+ *		ExecExecuteNode
+ *
+ *		Request the next tuple from the given node.  Note that
+ *		if the node supports asynchrony, result_ready may not be
+ *		set on return (use ExecProcNode if you need that, or call
+ *		ExecAsyncWaitForNode).
+ * ----------------------------------------------------------------
+ */
+void
+ExecExecuteNode(PlanState *node)
+{
+	node->result_ready = false;
+	ExecDispatchNode(node);
+}
+
+
+/* ----------------------------------------------------------------
+ *		ExecProcNode
+ *
+ *		Get the next tuple from the given node.  If the node is
+ *		asynchronous, wait for a tuple to be ready before
+ *		returning.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecProcNode(PlanState *node)
+{
+	CHECK_FOR_INTERRUPTS();
+
+	/* mark any previous result as having been consumed */
+	node->result_ready = false;
+
+	if (node->chgParam != NULL) /* something changed */
+		ExecReScan(node);		/* let ReScan handle this */
+
+	ExecDispatchNode(node);
+
+	if (!node->result_ready)
+		ExecAsyncWaitForNode(node);
+
+	/* Result should be a TupleTableSlot, unless it's NULL. */
+	Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+
+	return (TupleTableSlot *) node->result;
 }
 
 
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000..38b37a1
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.h
+ *		Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/executor/execAsync.h
+ *--------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncWaitForNode(PlanState *planstate);
+extern void ExecAsyncNeedsWait(PlanState *planstate, int nevents,
+	bool reinit);
+extern void ExecAsyncDoesNotNeedWait(PlanState *planstate);
+
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 087735a..979dea3 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -223,6 +223,8 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
  */
 extern PlanState *ExecInitNode(Plan *node, EState *estate, PlanState *parent,
 			 int eflags);
+extern void ExecDispatchNode(PlanState *node);
+extern void ExecExecuteNode(PlanState *node);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a0bc8af..3dba03c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -382,6 +382,14 @@ typedef struct EState
 	ParamListInfo es_param_list_info;	/* values of external params */
 	ParamExecData *es_param_exec_vals;	/* values of internal params */
 
+	/* Asynchronous execution support */
+	struct PlanState **es_waiting_nodes;		/* array of waiting nodes */
+	int			es_num_waiting_nodes;	/* # of waiters in array */
+	int			es_max_waiting_nodes;	/* # of allocated entries */
+	int			es_total_async_events;	/* total of per-node n_async_events */
+	int			es_max_async_events;	/* # supported by event set */
+	struct WaitEventSet *es_wait_event_set;
+
 	/* Other working state: */
 	MemoryContext es_query_cxt; /* per-query context in which EState lives */
 
@@ -1034,6 +1042,8 @@ typedef struct PlanState
 	bool		result_ready;	/* true if result is ready */
 	Node	   *result;			/* result, most often TupleTableSlot */
 
+	int			n_async_events;	/* # of async events we want to register */
+
 	Instrumentation *instrument;	/* Optional runtime stats for this node */
 	WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */
 
-- 
2.5.4 (Apple Git-61)

