From 71b6163734934db3160de81fd064e7d113b9122f Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 1 Mar 2017 15:45:51 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
 DestReceiver.

Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
 src/backend/executor/spi.c | 79 ++++++++++++++++++++++++++++++++++++++++------
 src/backend/tcop/dest.c    | 11 +++++++
 src/include/executor/spi.h |  4 +++
 src/include/tcop/dest.h    |  1 +
 4 files changed, 85 insertions(+), 10 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 3a89ccd..a0af31a 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool fire_triggers, uint64 tcount);
+				  bool read_only, bool fire_triggers, uint64 tcount,
+				  DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
 					Datum *Values, const char *Nulls);
@@ -321,7 +322,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
 	res = _SPI_execute_plan(&plan, NULL,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
+
+	_SPI_end_call(true);
+	return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+		DestReceiver *callback)
+{
+	_SPI_plan	plan;
+	int			res;
+
+	if (src == NULL || tcount < 0)
+		return SPI_ERROR_ARGUMENT;
+
+	res = _SPI_begin_call(true);
+	if (res < 0)
+		return res;
+
+	memset(&plan, 0, sizeof(_SPI_plan));
+	plan.magic = _SPI_PLAN_MAGIC;
+	plan.cursor_options = 0;
+
+	_SPI_prepare_oneshot_plan(src, &plan);
+
+	res = _SPI_execute_plan(&plan, NULL,
+							InvalidSnapshot, InvalidSnapshot,
+							read_only, true, tcount, callback);
 
 	_SPI_end_call(true);
 	return res;
@@ -355,7 +383,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
+
+	_SPI_end_call(true);
+	return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+				 bool read_only, long tcount, DestReceiver *callback)
+{
+	int			res;
+
+	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+		return SPI_ERROR_ARGUMENT;
+
+	if (plan->nargs > 0 && Values == NULL)
+		return SPI_ERROR_PARAM;
+
+	res = _SPI_begin_call(true);
+	if (res < 0)
+		return res;
+
+	res = _SPI_execute_plan(plan,
+							_SPI_convert_params(plan->nargs, plan->argtypes,
+												Values, Nulls),
+							InvalidSnapshot, InvalidSnapshot,
+							read_only, true, tcount, callback);
 
 	_SPI_end_call(true);
 	return res;
@@ -384,7 +439,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 
 	res = _SPI_execute_plan(plan, params,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -425,7 +480,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							snapshot, crosscheck_snapshot,
-							read_only, fire_triggers, tcount);
+							read_only, fire_triggers, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -472,7 +527,7 @@ SPI_execute_with_args(const char *src,
 
 	res = _SPI_execute_plan(&plan, paramLI,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -1907,7 +1962,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool fire_triggers, uint64 tcount)
+				  bool read_only, bool fire_triggers, uint64 tcount,
+				  DestReceiver *callback)
 {
 	int			my_res = 0;
 	uint64		my_processed = 0;
@@ -1918,6 +1974,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	ErrorContextCallback spierrcontext;
 	CachedPlan *cplan = NULL;
 	ListCell   *lc1;
+	DestReceiver *dest = callback;
 
 	/*
 	 * Setup error traceback support for ereport()
@@ -2037,7 +2094,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 		{
 			PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
 			bool		canSetTag = stmt->canSetTag;
-			DestReceiver *dest;
 
 			_SPI_current->processed = 0;
 			_SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2138,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				UpdateActiveSnapshotCommandId();
 			}
 
-			dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+			if (!callback)
+				dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
 
 			if (stmt->utilityStmt == NULL)
 			{
@@ -2108,6 +2165,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			{
 				char		completionTag[COMPLETION_TAG_BUFSIZE];
 
+				// XXX throw error if callback is set
 				ProcessUtility(stmt,
 							   plansource->query_string,
 							   PROCESS_UTILITY_QUERY,
@@ -2281,7 +2339,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 	switch (operation)
 	{
 		case CMD_SELECT:
-			if (queryDesc->dest->mydest != DestSPI)
+			if (queryDesc->dest->mydest != DestSPI &&
+					queryDesc->dest->mydest != DestSPICallback)
 			{
 				/* Don't return SPI_OK_SELECT if we're discarding result */
 				res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3..bd671e0 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
 	DestSPI
 };
 
+static DestReceiver spi_callbackDR = {
+	donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+	DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
 		case DestSPI:
 			return &spi_printtupDR;
 
+		case DestSPICallback:
+			return &spi_callbackDR;
+
 		case DestTuplestore:
 			return CreateTuplestoreDestReceiver();
 
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
 		case DestNone:
 		case DestDebug:
 		case DestSPI:
+		case DestSPICallback:
 		case DestTuplestore:
 		case DestIntoRel:
 		case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
 		case DestNone:
 		case DestDebug:
 		case DestSPI:
+		case DestSPICallback:
 		case DestTuplestore:
 		case DestIntoRel:
 		case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
 		case DestNone:
 		case DestDebug:
 		case DestSPI:
+		case DestSPICallback:
 		case DestTuplestore:
 		case DestIntoRel:
 		case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d..13719e1 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int	SPI_connect(void);
 extern int	SPI_finish(void);
 extern int	SPI_execute(const char *src, bool read_only, long tcount);
+extern int	SPI_execute_callback(const char *src, bool read_only, long tcount,
+									DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 				 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
 								ParamListInfo params,
 								bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+				 bool read_only, long tcount, DestReceiver *callback);
 extern int	SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 		  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2..1d1d641 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
 	DestRemoteExecute,			/* sent to frontend, in Execute command */
 	DestRemoteSimple,			/* sent to frontend, w/no catalog access */
 	DestSPI,					/* results sent to SPI manager */
+	DestSPICallback,			/* results sent to user-specified callback function */
 	DestTuplestore,				/* results sent to Tuplestore */
 	DestIntoRel,				/* results sent to relation (SELECT INTO) */
 	DestCopyOut,				/* results sent to COPY TO code */
-- 
2.5.5

