From c846452b1b0ca30f2ae9c08488482ef1b9b9e9a9 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 1 Mar 2017 15:46:53 -0600
Subject: [PATCH 2/2] Modify plpython to use SPI callbacks

This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
 src/pl/plpython/plpy_main.c |  13 ++
 src/pl/plpython/plpy_main.h |   3 +
 src/pl/plpython/plpy_spi.c  | 303 ++++++++++++++++++++++++++++++++------------
 3 files changed, 235 insertions(+), 84 deletions(-)

diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804..07501f1 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
 	return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+	PLyExecutionContext *last = PLy_execution_contexts;
+
+	if (PLy_execution_contexts == NULL)
+		elog(ERROR, "no Python function is currently executing");
+
+	PLy_execution_contexts = new;
+
+	return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4..7cbe0a8 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856cc..dcec0c7 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+	DestReceiver pub;
+	PLyExecutionContext *exec_ctx;
+	MemoryContext parent_ctx;
+	MemoryContext cb_ctx;
+	TupleDesc	desc;
+	PLyTypeInfo *args;
+
+	PyObject	*result;
+} CallbackState;
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
 							 uint64 rows, int status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
 PyObject *
 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 {
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	CallbackState	*callback;
 	volatile int nargs;
 	int			i,
 				rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 
 	oldcontext = CurrentMemoryContext;
 	oldowner = CurrentResourceOwner;
+	callback = PLy_Callback_New(exec_ctx);
 
 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
 
 	PG_TRY();
 	{
-		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
 		char	   *volatile nulls;
 		volatile int j;
 
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 			}
 		}
 
-		rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-							  exec_ctx->curr_proc->fn_readonly, limit);
-		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+							exec_ctx->curr_proc->fn_readonly, limit,
+							(DestReceiver *) callback);
+		ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
 		if (nargs > 0)
 			pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 		}
 
 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
+		PLy_Callback_Free(callback);
 		return NULL;
 	}
 	PG_END_TRY();
+	callback = PLy_Callback_Free(callback);
 
 	for (i = 0; i < nargs; i++)
 	{
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 static PyObject *
 PLy_spi_execute_query(char *query, long limit)
 {
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	CallbackState	*callback = PLy_Callback_New(exec_ctx);
+	volatile MemoryContext oldcontext;
+	volatile ResourceOwner oldowner;
 	int			rv;
-	volatile MemoryContext oldcontext;
-	volatile ResourceOwner oldowner;
 	PyObject   *ret = NULL;
 
 	oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
 
 	PG_TRY();
 	{
-		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
 		pg_verifymbstr(query, strlen(query), false);
-		rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
-		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+				(DestReceiver *) callback);
+
+		ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
 	}
 	PG_CATCH();
 	{
 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
+		PLy_Callback_Free(callback);
 		return NULL;
 	}
 	PG_END_TRY();
+	callback = PLy_Callback_Free(callback);
 
 	if (rv < 0)
 	{
@@ -382,94 +409,202 @@ PLy_spi_execute_query(char *query, long limit)
 	return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
 {
+	MemoryContext oldcontext, cb_ctx;
+	CallbackState *callback;
+
+	callback = palloc0(sizeof(CallbackState));
+
+	/*
+	 * Use a new context to make cleanup easier. Allocate it in the current
+	 * context so we don't have to worry about cleaning it up if there's an
+	 * error.
+	 */
+	cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+								"PL/Python callback context",
+								ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(cb_ctx);
+	callback->parent_ctx = oldcontext;
+	callback->cb_ctx = cb_ctx;
+	memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+	callback->pub.receiveSlot = PLy_CSreceive;
+	callback->pub.rStartup = PLy_CSStartup;
+	callback->pub.rDestroy = PLy_CSDestroy;
+	callback->exec_ctx = exec_ctx;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+	if (callback)
+	{
+		if (callback->cb_ctx)
+			(callback->pub.rDestroy) ((DestReceiver *) callback);
+
+		pfree(callback);
+	}
+
+	return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
+{
+	MemoryContext oldctx;
+
+	/* The result info needs to be in the parent context */
+	oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+	myState->result = PLy_result_new();
+	if (myState->result == NULL)
+		PLy_elog(ERROR, "could not create new result object");
+
+	MemoryContextSwitchTo(oldctx);
+	return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+	PLyExecutionContext *old_exec_ctx;
+	CallbackState *myState = (CallbackState *) self;
 	PLyResultObject *result;
-	volatile MemoryContext oldcontext;
+	PLyTypeInfo *args;
+	MemoryContext mctx, old_mctx;
 
-	result = (PLyResultObject *) PLy_result_new();
-	Py_DECREF(result->status);
-	result->status = PyInt_FromLong(status);
+	/*
+	 * We may be in a different execution context when we're called, so switch
+	 * back to our original one.
+	 */
+	mctx = myState->cb_ctx;
+	old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+	old_mctx = MemoryContextSwitchTo(mctx);
 
-	if (status > 0 && tuptable == NULL)
+	/* We need to store this because the TupleDesc the receive function gets has no names. */
+	myState->desc = typeinfo;
+
+	/* Setup type conversion info */
+	myState->args = args = palloc0(sizeof(PLyTypeInfo));
+	PLy_typeinfo_init(args, mctx);
+	PLy_input_tuple_funcs(args, typeinfo);
+
+	result = PLyCSNewResult(myState);
+
+	/*
+	 * Save tuple descriptor for later use by result set metadata
+	 * functions.  Save it in TopMemoryContext so that it survives
+	 * outside of an SPI context.  We trust that PLy_result_dealloc()
+	 * will clean it up when the time is right. XXX This might result in a leak
+	 * if an error happens and the result doesn't get dereferenced.
+	 */
+	MemoryContextSwitchTo(TopMemoryContext);
+	result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+	MemoryContextSwitchTo(old_mctx);
+	PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+	CallbackState *myState = (CallbackState *) self;
+	MemoryContext cb_ctx = myState->cb_ctx;
+
+	MemoryContextDelete(cb_ctx);
+	myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+	CallbackState 	*myState = (CallbackState *) self;
+	TupleDesc		desc = myState->desc;
+	PLyTypeInfo 	*args = myState->args;
+	PLyResultObject *result = (PLyResultObject *) myState->result;
+	PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+	MemoryContext 	scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+	MemoryContext 	oldcontext = CurrentMemoryContext;
+	int			rv = 1;
+	PyObject   *row;
+
+	/* Verify saved state matches incoming slot */
+	Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+	Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
+	/*
+	 * Do the work in the scratch context to avoid leaking memory from the
+	 * datatype output function calls.
+	 */
+	MemoryContextSwitchTo(scratch_context);
+
+	PG_TRY();
 	{
-		Py_DECREF(result->nrows);
-		result->nrows = (rows > (uint64) LONG_MAX) ?
-			PyFloat_FromDouble((double) rows) :
-			PyInt_FromLong((long) rows);
+		row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
 	}
-	else if (status > 0 && tuptable != NULL)
+	PG_CATCH();
 	{
-		PLyTypeInfo args;
-		MemoryContext cxt;
+		Py_XDECREF(row);
+		MemoryContextSwitchTo(oldcontext);
+		PLy_switch_execution_context(old_exec_ctx);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	/*
+	 * If we tried to do this in the PG_CATCH we'd have to mark row
+	 * as volatile, but that won't work with PyList_Append, so just
+	 * test the error code after doing Py_DECREF().
+	 */
+	if (row)
+	{
+		rv = PyList_Append(result->rows, row);
+		Py_DECREF(row);
+	}
 
-		Py_DECREF(result->nrows);
-		result->nrows = (rows > (uint64) LONG_MAX) ?
-			PyFloat_FromDouble((double) rows) :
-			PyInt_FromLong((long) rows);
+	if (rv != 0)
+		ereport(ERROR,
+				(errmsg("unable to append value to list")));
 
-		cxt = AllocSetContextCreate(CurrentMemoryContext,
-									"PL/Python temp context",
-									ALLOCSET_DEFAULT_SIZES);
-		PLy_typeinfo_init(&args, cxt);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(scratch_context);
+	PLy_switch_execution_context(old_exec_ctx);
 
-		oldcontext = CurrentMemoryContext;
-		PG_TRY();
+	return true;
+}
+
+
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+	PLyResultObject *result = (PLyResultObject *) callback->result;
+
+	/* If status < 0 this stuff would just get thrown away anyway. */
+	if (status > 0)
+	{
+		if (!result)
 		{
-			MemoryContext oldcontext2;
-
-			if (rows)
-			{
-				uint64		i;
-
-				/*
-				 * PyList_New() and PyList_SetItem() use Py_ssize_t for list
-				 * size and list indices; so we cannot support a result larger
-				 * than PY_SSIZE_T_MAX.
-				 */
-				if (rows > (uint64) PY_SSIZE_T_MAX)
-					ereport(ERROR,
-							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-							 errmsg("query result has too many rows to fit in a Python list")));
-
-				Py_DECREF(result->rows);
-				result->rows = PyList_New(rows);
-
-				PLy_input_tuple_funcs(&args, tuptable->tupdesc);
-				for (i = 0; i < rows; i++)
-				{
-					PyObject   *row = PLyDict_FromTuple(&args,
-														tuptable->vals[i],
-														tuptable->tupdesc);
-
-					PyList_SetItem(result->rows, i, row);
-				}
-			}
-
 			/*
-			 * Save tuple descriptor for later use by result set metadata
-			 * functions.  Save it in TopMemoryContext so that it survives
-			 * outside of an SPI context.  We trust that PLy_result_dealloc()
-			 * will clean it up when the time is right.  (Do this as late as
-			 * possible, to minimize the number of ways the tupdesc could get
-			 * leaked due to errors.)
+			 * This happens if the command returned no results. Create a dummy result set.
 			 */
-			oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-			result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
-			MemoryContextSwitchTo(oldcontext2);
+			result = PLyCSNewResult(callback);
+			callback->result = (PyObject *) result;
 		}
-		PG_CATCH();
-		{
-			MemoryContextSwitchTo(oldcontext);
-			MemoryContextDelete(cxt);
-			Py_DECREF(result);
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
 
-		MemoryContextDelete(cxt);
-		SPI_freetuptable(tuptable);
+		Py_DECREF(result->status);
+		result->status = PyInt_FromLong(status);
+		Py_DECREF(result->nrows);
+		result->nrows = (rows > (uint64) LONG_MAX) ?
+			PyFloat_FromDouble((double) rows) :
+			PyInt_FromLong((long) rows);
 	}
 
 	return (PyObject *) result;
-- 
2.5.5

