diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 40be506..a23add6 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -383,7 +383,9 @@ PersistHoldablePortal(Portal portal)
 		SetTuplestoreDestReceiverParams(queryDesc->dest,
 										portal->holdStore,
 										portal->holdContext,
-										true);
+										true,
+										NULL,
+										NULL);
 
 		/* Fetch the result set into the tuplestore */
 		ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index b108168..f2b698f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -60,7 +60,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int	_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 							  Snapshot snapshot, Snapshot crosscheck_snapshot,
-							  bool read_only, bool fire_triggers, uint64 tcount);
+							  bool read_only, bool fire_triggers, uint64 tcount,
+							  DestReceiver *caller_dest);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
 										 Datum *Values, const char *Nulls);
@@ -513,7 +514,7 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
 	res = _SPI_execute_plan(&plan, NULL,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -547,7 +548,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -576,7 +577,34 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 
 	res = _SPI_execute_plan(plan, params,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
+
+	_SPI_end_call(true);
+	return res;
+}
+
+/*
+ * Execute a previously prepared plan, sending result tuples to the
+ * caller-supplied DestReceiver rather than the usual SPI output arrangements.
+ */
+int
+SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+							   ParamListInfo params,
+							   bool read_only, long tcount,
+							   DestReceiver *dest)
+{
+	int			res;
+
+	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+		return SPI_ERROR_ARGUMENT;
+
+	res = _SPI_begin_call(true);
+	if (res < 0)
+		return res;
+
+	res = _SPI_execute_plan(plan, params,
+							InvalidSnapshot, InvalidSnapshot,
+							read_only, true, tcount, dest);
 
 	_SPI_end_call(true);
 	return res;
@@ -617,7 +645,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							snapshot, crosscheck_snapshot,
-							read_only, fire_triggers, tcount);
+							read_only, fire_triggers, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -664,7 +692,56 @@ SPI_execute_with_args(const char *src,
 
 	res = _SPI_execute_plan(&plan, paramLI,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
+
+	_SPI_end_call(true);
+	return res;
+}
+
+/*
+ * SPI_execute_with_receiver -- plan and execute a query with arguments
+ *
+ * This is the same as SPI_execute_with_args except that we send result tuples
+ * to the caller-supplied DestReceiver rather than the usual SPI output
+ * arrangements.
+ */
+int
+SPI_execute_with_receiver(const char *src,
+						  int nargs, Oid *argtypes,
+						  Datum *Values, const char *Nulls,
+						  bool read_only, long tcount,
+						  DestReceiver *dest)
+{
+	int			res;
+	_SPI_plan	plan;
+	ParamListInfo paramLI;
+
+	if (src == NULL || nargs < 0 || tcount < 0)
+		return SPI_ERROR_ARGUMENT;
+
+	if (nargs > 0 && (argtypes == NULL || Values == NULL))
+		return SPI_ERROR_PARAM;
+
+	res = _SPI_begin_call(true);
+	if (res < 0)
+		return res;
+
+	memset(&plan, 0, sizeof(_SPI_plan));
+	plan.magic = _SPI_PLAN_MAGIC;
+	plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
+	plan.nargs = nargs;
+	plan.argtypes = argtypes;
+	plan.parserSetup = NULL;
+	plan.parserSetupArg = NULL;
+
+	paramLI = _SPI_convert_params(nargs, argtypes,
+								  Values, Nulls);
+
+	_SPI_prepare_oneshot_plan(src, &plan);
+
+	res = _SPI_execute_plan(&plan, paramLI,
+							InvalidSnapshot, InvalidSnapshot,
+							read_only, true, tcount, dest);
 
 	_SPI_end_call(true);
 	return res;
@@ -2090,11 +2167,13 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  *		false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
+ * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
  */
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool fire_triggers, uint64 tcount)
+				  bool read_only, bool fire_triggers, uint64 tcount,
+				  DestReceiver *caller_dest)
 {
 	int			my_res = 0;
 	uint64		my_processed = 0;
@@ -2228,6 +2307,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			bool		canSetTag = stmt->canSetTag;
 			DestReceiver *dest;
 
+			/*
+			 * Reset output state.  (Note that if a non-SPI receiver is used,
+			 * _SPI_current->processed will stay zero, and that's what we'll
+			 * report to the caller.  It's the receiver's job to count tuples
+			 * in that case.)
+			 */
 			_SPI_current->processed = 0;
 			_SPI_current->tuptable = NULL;
 
@@ -2267,7 +2352,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				UpdateActiveSnapshotCommandId();
 			}
 
-			dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+			/*
+			 * Select appropriate tuple receiver.  Output from non-canSetTag
+			 * subqueries always goes to the bit bucket.
+			 */
+			if (!canSetTag)
+				dest = CreateDestReceiver(DestNone);
+			else if (caller_dest)
+				dest = caller_dest;
+			else
+				dest = CreateDestReceiver(DestSPI);
 
 			if (stmt->utilityStmt == NULL)
 			{
@@ -2373,7 +2467,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				SPI_freetuptable(_SPI_current->tuptable);
 				_SPI_current->tuptable = NULL;
 			}
-			/* we know that the receiver doesn't need a destroy call */
+
+			/*
+			 * We don't issue a destroy call to the receiver.  The SPI and
+			 * None receivers would ignore it anyway, while if the caller
+			 * supplied a receiver, it's not our job to destroy it.
+			 */
+
 			if (res < 0)
 			{
 				my_res = res;
@@ -2465,7 +2565,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 	switch (operation)
 	{
 		case CMD_SELECT:
-			if (queryDesc->dest->mydest != DestSPI)
+			if (queryDesc->dest->mydest == DestNone)
 			{
 				/* Don't return SPI_OK_SELECT if we're discarding result */
 				res = SPI_OK_UTILITY;
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index 6c2dfbc..e8172be 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -8,6 +8,8 @@
  * toasted values.  This is to support cursors WITH HOLD, which must retain
  * data even if the underlying table is dropped.
  *
+ * Also optionally, we can apply a tuple conversion map before storing.
+ *
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +23,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "access/tupconvert.h"
 #include "executor/tstoreReceiver.h"
 
 
@@ -31,14 +34,19 @@ typedef struct
 	Tuplestorestate *tstore;	/* where to put the data */
 	MemoryContext cxt;			/* context containing tstore */
 	bool		detoast;		/* were we told to detoast? */
+	TupleDesc	target_tupdesc; /* target tupdesc, or NULL if none */
+	const char *map_failure_msg;	/* tupdesc mapping failure message */
 	/* workspace: */
 	Datum	   *outvalues;		/* values array for result tuple */
 	Datum	   *tofree;			/* temp values to be pfree'd */
+	TupleConversionMap *tupmap; /* conversion map, if needed */
+	TupleTableSlot *mapslot;	/* slot for mapped tuples */
 } TStoreState;
 
 
 static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
 static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self);
 
 
 /*
@@ -69,27 +77,46 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
 		}
 	}
 
+	/* Check if tuple conversion is needed */
+	if (myState->target_tupdesc)
+		myState->tupmap = convert_tuples_by_position(typeinfo,
+													 myState->target_tupdesc,
+													 myState->map_failure_msg);
+	else
+		myState->tupmap = NULL;
+
 	/* Set up appropriate callback */
 	if (needtoast)
 	{
+		Assert(!myState->tupmap);
 		myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
 		/* Create workspace */
 		myState->outvalues = (Datum *)
 			MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
 		myState->tofree = (Datum *)
 			MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+		myState->mapslot = NULL;
+	}
+	else if (myState->tupmap)
+	{
+		myState->pub.receiveSlot = tstoreReceiveSlot_tupmap;
+		myState->outvalues = NULL;
+		myState->tofree = NULL;
+		myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc,
+													&TTSOpsVirtual);
 	}
 	else
 	{
 		myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
 		myState->outvalues = NULL;
 		myState->tofree = NULL;
+		myState->mapslot = NULL;
 	}
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
- * This is for the easy case where we don't have to detoast.
+ * This is for the easy case where we don't have to detoast nor map anything.
  */
 static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
@@ -158,6 +185,21 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 }
 
 /*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we must apply a tuple conversion map.
+ */
+static bool
+tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self)
+{
+	TStoreState *myState = (TStoreState *) self;
+
+	execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot);
+	tuplestore_puttupleslot(myState->tstore, myState->mapslot);
+
+	return true;
+}
+
+/*
  * Clean up at end of an executor run
  */
 static void
@@ -172,6 +214,12 @@ tstoreShutdownReceiver(DestReceiver *self)
 	if (myState->tofree)
 		pfree(myState->tofree);
 	myState->tofree = NULL;
+	if (myState->tupmap)
+		free_conversion_map(myState->tupmap);
+	myState->tupmap = NULL;
+	if (myState->mapslot)
+		ExecDropSingleTupleTableSlot(myState->mapslot);
+	myState->mapslot = NULL;
 }
 
 /*
@@ -204,17 +252,32 @@ CreateTuplestoreDestReceiver(void)
 
 /*
  * Set parameters for a TuplestoreDestReceiver
+ *
+ * tStore: where to store the tuples
+ * tContext: memory context containing tStore
+ * detoast: forcibly detoast contained data?
+ * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype
+ * map_failure_msg: error message to use if mapping to target_tupdesc fails
+ *
+ * We don't currently support both detoast and target_tupdesc at the same
+ * time, just because no existing caller needs that combination.
  */
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
 								Tuplestorestate *tStore,
 								MemoryContext tContext,
-								bool detoast)
+								bool detoast,
+								TupleDesc target_tupdesc,
+								const char *map_failure_msg)
 {
 	TStoreState *myState = (TStoreState *) self;
 
+	Assert(!(detoast && target_tupdesc));
+
 	Assert(myState->pub.mydest == DestTuplestore);
 	myState->tstore = tStore;
 	myState->cxt = tContext;
 	myState->detoast = detoast;
+	myState->target_tupdesc = target_tupdesc;
+	myState->map_failure_msg = map_failure_msg;
 }
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5781fb2..96ea74f 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -996,7 +996,9 @@ FillPortalStore(Portal portal, bool isTopLevel)
 	SetTuplestoreDestReceiverParams(treceiver,
 									portal->holdStore,
 									portal->holdContext,
-									false);
+									false,
+									NULL,
+									NULL);
 
 	switch (portal->strategy)
 	{
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 06de20a..9be0c68 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -90,6 +90,10 @@ extern int	SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 extern int	SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
 											ParamListInfo params,
 											bool read_only, long tcount);
+extern int	SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+										   ParamListInfo params,
+										   bool read_only, long tcount,
+										   DestReceiver *dest);
 extern int	SPI_exec(const char *src, long tcount);
 extern int	SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 					  long tcount);
@@ -102,6 +106,11 @@ extern int	SPI_execute_with_args(const char *src,
 								  int nargs, Oid *argtypes,
 								  Datum *Values, const char *Nulls,
 								  bool read_only, long tcount);
+extern int	SPI_execute_with_receiver(const char *src,
+									  int nargs, Oid *argtypes,
+									  Datum *Values, const char *Nulls,
+									  bool read_only, long tcount,
+									  DestReceiver *dest);
 extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
 extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
 									 int cursorOptions);
diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h
index b2390c4..e9461cf 100644
--- a/src/include/executor/tstoreReceiver.h
+++ b/src/include/executor/tstoreReceiver.h
@@ -24,6 +24,8 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
 											Tuplestorestate *tStore,
 											MemoryContext tContext,
-											bool detoast);
+											bool detoast,
+											TupleDesc target_tupdesc,
+											const char *map_failure_msg);
 
 #endif							/* TSTORE_RECEIVER_H */
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d3ad4fa..b5563ee 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -27,6 +27,7 @@
 #include "executor/execExpr.h"
 #include "executor/spi.h"
 #include "executor/spi_priv.h"
+#include "executor/tstoreReceiver.h"
 #include "funcapi.h"
 #include "mb/stringinfo_mb.h"
 #include "miscadmin.h"
@@ -3491,9 +3492,11 @@ static int
 exec_stmt_return_query(PLpgSQL_execstate *estate,
 					   PLpgSQL_stmt_return_query *stmt)
 {
-	Portal		portal;
-	uint64		processed = 0;
-	TupleConversionMap *tupmap;
+	int64		tcount;
+	DestReceiver *treceiver;
+	int			rc;
+	uint64		processed;
+	MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
 	MemoryContext oldcontext;
 
 	if (!estate->retisset)
@@ -3503,60 +3506,115 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 
 	if (estate->tuple_store == NULL)
 		exec_init_tuple_store(estate);
+	/* There might be some tuples in the tuplestore already */
+	tcount = tuplestore_tuple_count(estate->tuple_store);
+
+	/*
+	 * Set up DestReceiver to transfer results directly to tuplestore,
+	 * converting rowtype if necessary.  DestReceiver lives in mcontext.
+	 */
+	oldcontext = MemoryContextSwitchTo(stmt_mcontext);
+	treceiver = CreateDestReceiver(DestTuplestore);
+	SetTuplestoreDestReceiverParams(treceiver,
+									estate->tuple_store,
+									estate->tuple_store_cxt,
+									false,
+									estate->tuple_store_desc,
+									gettext_noop("structure of query does not match function result type"));
+	MemoryContextSwitchTo(oldcontext);
 
 	if (stmt->query != NULL)
 	{
 		/* static query */
-		exec_run_select(estate, stmt->query, 0, &portal);
+		PLpgSQL_expr *expr = stmt->query;
+		ParamListInfo paramLI;
+
+		/*
+		 * On the first call for this expression generate the plan.
+		 */
+		if (expr->plan == NULL)
+			exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+
+		/*
+		 * Set up ParamListInfo to pass to executor
+		 */
+		paramLI = setup_param_list(estate, expr);
+
+		/*
+		 * Execute the query
+		 */
+		rc = SPI_execute_plan_with_receiver(expr->plan, paramLI,
+											estate->readonly_func, 0,
+											treceiver);
+		if (rc != SPI_OK_SELECT)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("query \"%s\" is not a SELECT", expr->query)));
 	}
 	else
 	{
 		/* RETURN QUERY EXECUTE */
-		Assert(stmt->dynquery != NULL);
-		portal = exec_dynquery_with_params(estate, stmt->dynquery,
-										   stmt->params, NULL,
-										   0);
-	}
+		Datum		query;
+		bool		isnull;
+		Oid			restype;
+		int32		restypmod;
+		char	   *querystr;
 
-	/* Use eval_mcontext for tuple conversion work */
-	oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+		/*
+		 * Evaluate the string expression after the EXECUTE keyword. Its
+		 * result is the querystring we have to execute.
+		 */
+		Assert(stmt->dynquery != NULL);
+		query = exec_eval_expr(estate, stmt->dynquery,
+							   &isnull, &restype, &restypmod);
+		if (isnull)
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("query string argument of EXECUTE is null")));
 
-	tupmap = convert_tuples_by_position(portal->tupDesc,
-										estate->tuple_store_desc,
-										gettext_noop("structure of query does not match function result type"));
+		/* Get the C-String representation */
+		querystr = convert_value_to_string(estate, query, restype);
 
-	while (true)
-	{
-		uint64		i;
+		/* copy it into the stmt_mcontext before we clean up */
+		querystr = MemoryContextStrdup(stmt_mcontext, querystr);
 
-		SPI_cursor_fetch(portal, true, 50);
+		exec_eval_cleanup(estate);
 
-		/* SPI will have changed CurrentMemoryContext */
-		MemoryContextSwitchTo(get_eval_mcontext(estate));
+		/* Execute query, passing params if necessary */
+		if (stmt->params)
+		{
+			PreparedParamsData *ppd;
 
-		if (SPI_processed == 0)
-			break;
+			ppd = exec_eval_using_params(estate, stmt->params);
 
-		for (i = 0; i < SPI_processed; i++)
+			rc = SPI_execute_with_receiver(querystr,
+										   ppd->nargs, ppd->types,
+										   ppd->values, ppd->nulls,
+										   estate->readonly_func,
+										   0,
+										   treceiver);
+		}
+		else
 		{
-			HeapTuple	tuple = SPI_tuptable->vals[i];
-
-			if (tupmap)
-				tuple = execute_attr_map_tuple(tuple, tupmap);
-			tuplestore_puttuple(estate->tuple_store, tuple);
-			if (tupmap)
-				heap_freetuple(tuple);
-			processed++;
+			rc = SPI_execute_with_receiver(querystr,
+										   0, NULL, NULL, NULL,
+										   estate->readonly_func,
+										   0,
+										   treceiver);
 		}
 
-		SPI_freetuptable(SPI_tuptable);
+		if (rc < 0)
+			elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s",
+				 querystr, SPI_result_code_string(rc));
 	}
 
-	SPI_freetuptable(SPI_tuptable);
-	SPI_cursor_close(portal);
-
-	MemoryContextSwitchTo(oldcontext);
+	/* Clean up */
+	treceiver->rDestroy(treceiver);
 	exec_eval_cleanup(estate);
+	MemoryContextReset(stmt_mcontext);
+
+	/* Count how many tuples we got */
+	processed = tuplestore_tuple_count(estate->tuple_store) - tcount;
 
 	estate->eval_processed = processed;
 	exec_set_found(estate, processed != 0);
