Faster methods for getting SPI results
I've been looking at the performance of SPI calls within plpython.
There's a roughly 1.5x difference from equivalent python code just in
pulling data out of the SPI tuplestore. Some of that is due to an
inefficiency in how plpython is creating result dictionaries, but fixing
that is ultimately a dead-end: if you're dealing with a lot of results
in python, you want a tuple of arrays, not an array of tuples.
While we could just brute-force a tuple of arrays by plowing through the
SPI tuplestore (this is what pl/r does), there's still a lot of extra
work involved in doing that. AFAICT there's at least 2 copies that
happen between the executor producing a tuple and it making it into the
tuplestore, plus the tuplestore is going to consume a potentially very
large amount of memory for a very short period of time, before all the
data gets duplicated (again) into python objects.
It would be a lot more efficient if we could just grab datums from the
executor and make a single copy into plpython (or R), letting the PL
deal with all the memory management overhead.
I briefly looked at using SPI cursors to do just that, but that looks
even worse: every fetch is executed in a subtransaction, and every fetch
creates an entire tuplestore even if it's just going to return a single
value. (But hey, we never claimed cursors were fast...)
Is there any way to avoid all of this? I'm guessing one issue might be
that we don't want to call an external interpreter while potentially
holding page pins, but even then couldn't we just copy a single tuple at
a time and save a huge amount of palloc overhead?
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12/20/16 10:14 PM, Jim Nasby wrote:
It would be a lot more efficient if we could just grab datums from the
executor and make a single copy into plpython (or R), letting the PL
deal with all the memory management overhead.I briefly looked at using SPI cursors to do just that, but that looks
even worse: every fetch is executed in a subtransaction, and every fetch
creates an entire tuplestore even if it's just going to return a single
value. (But hey, we never claimed cursors were fast...)Is there any way to avoid all of this? I'm guessing one issue might be
that we don't want to call an external interpreter while potentially
holding page pins, but even then couldn't we just copy a single tuple at
a time and save a huge amount of palloc overhead?
AFAICT that's exactly how DestRemote works: it grabs a raw slot from the
executor, makes sure it's fully expanded, and sends it on it's way via
pq_send*(). So presumably the same could be done for SPI, by creating a
new CommandDest (ISTM none of the existing ones would do what we want).
I'm not sure what the API for this should look like. One possibility is
to have SPI_execute and friends accept a flag that indicates not to
build a tupletable. I don't think a query needs to be read-only to allow
for no tuplestore, so overloading read_only seems like a bad idea.
Another option is to treat this as a "lightweight cursor" that only
allows forward fetches. One nice thing about that option is it leaves
open the possibility of using a small tuplestore for each "fetch",
without all the overhead that a full blown cursor has. This assumes
there are some use cases where you want to operate on relatively small
sets of tuples at a time, but you don't need to materialize the whole
thing in one shot.
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12/21/16 8:21 AM, Jim Nasby wrote:
On 12/20/16 10:14 PM, Jim Nasby wrote:
It would be a lot more efficient if we could just grab datums from the
executor and make a single copy into plpython (or R), letting the PL
deal with all the memory management overhead.I briefly looked at using SPI cursors to do just that, but that looks
even worse: every fetch is executed in a subtransaction, and every fetch
creates an entire tuplestore even if it's just going to return a single
value. (But hey, we never claimed cursors were fast...)Is there any way to avoid all of this? I'm guessing one issue might be
that we don't want to call an external interpreter while potentially
holding page pins, but even then couldn't we just copy a single tuple at
a time and save a huge amount of palloc overhead?AFAICT that's exactly how DestRemote works: it grabs a raw slot from the
executor, makes sure it's fully expanded, and sends it on it's way via
pq_send*(). So presumably the same could be done for SPI, by creating a
new CommandDest (ISTM none of the existing ones would do what we want).I'm not sure what the API for this should look like. One possibility is
to have SPI_execute and friends accept a flag that indicates not to
build a tupletable. I don't think a query needs to be read-only to allow
for no tuplestore, so overloading read_only seems like a bad idea.Another option is to treat this as a "lightweight cursor" that only
allows forward fetches. One nice thing about that option is it leaves
open the possibility of using a small tuplestore for each "fetch",
without all the overhead that a full blown cursor has. This assumes
there are some use cases where you want to operate on relatively small
sets of tuples at a time, but you don't need to materialize the whole
thing in one shot.
I've looked at this some more, and ITSM that the only way to do this
without some major surgery is to create a new type of Destination
specifically for SPI that allows for the execution of an arbitrary C
function for each tuple to be sent. AFAICT this should be fairly safe,
since DestRemote can potentially block while sending a tuple and also
runs output functions (which presumably could themselves generate errors).
_SPI_execute_plan() would need to accept an arbitrary DestReceiver
struct, and use that (if specified) instead of creating it's own.
Once that's done, my plan is to allow plpy to use this functionality
with a receiver function that adds tuple fields to corresponding python
lists. This should result in significantly less overhead than going
through a tuplestore when dealing with a large number of rows.
Before I go code this up, I'd like to know if there's some fatal flaw in
this, or if there's an easier way to hack this up just to test my
performance theory.
Suggestions?
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 28 December 2016 at 09:58, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
I've looked at this some more, and ITSM that the only way to do this without
some major surgery is to create a new type of Destination specifically for
SPI that allows for the execution of an arbitrary C function for each tuple
to be sent.
That sounds a lot more sensible than the prior proposals. Callback driven.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12/27/16 9:10 PM, Craig Ringer wrote:
On 28 December 2016 at 09:58, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
I've looked at this some more, and ITSM that the only way to do this without
some major surgery is to create a new type of Destination specifically for
SPI that allows for the execution of an arbitrary C function for each tuple
to be sent.That sounds a lot more sensible than the prior proposals. Callback driven.
Are there other places this would be useful? I'm reluctant to write all
of this just to discover it doesn't help performance at all, but if it's
useful on it's own I can just submit it as a stand-alone patch.
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 28 December 2016 at 12:32, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
On 12/27/16 9:10 PM, Craig Ringer wrote:
On 28 December 2016 at 09:58, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
I've looked at this some more, and ITSM that the only way to do this
without
some major surgery is to create a new type of Destination specifically
for
SPI that allows for the execution of an arbitrary C function for each
tuple
to be sent.That sounds a lot more sensible than the prior proposals. Callback driven.
Are there other places this would be useful? I'm reluctant to write all of
this just to discover it doesn't help performance at all, but if it's useful
on it's own I can just submit it as a stand-alone patch.
I don't have a use for it personally. In BDR and pglogical anything
that does work with nontrivial numbers of tuples uses lower level
scans anyway.
I expect anything that uses the SPI to run arbitrary user queries
could have a use for something like this though. Any PL, for one.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12/27/16 9:10 PM, Craig Ringer wrote:
On 28 December 2016 at 09:58, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
I've looked at this some more, and ITSM that the only way to do this without
some major surgery is to create a new type of Destination specifically for
SPI that allows for the execution of an arbitrary C function for each tuple
to be sent.That sounds a lot more sensible than the prior proposals. Callback driven.
Here's what I've got right now. I haven't bothered with
SPI_execute_callback() yet, and there's some missing sanity checks.
I'm not sure if the changes to CreateDestReceiver() are warranted or
necessary, though it would at least give you sane defaults. My
incomplete code that would make use of this currently does
CallbackState callback;
memcpy(callback.pub, CreateDestReceiver(DestSPICallback),
sizeof(DestReceiver));
callback.pub.receiveSlot = PLy_CSreceive;
callback.pub.rStartup = PLy_CSStartup;
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12/28/16 3:14 AM, Craig Ringer wrote:
On 28 December 2016 at 12:32, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
On 12/27/16 9:10 PM, Craig Ringer wrote:
On 28 December 2016 at 09:58, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
I've looked at this some more, and ITSM that the only way to do this
without
some major surgery is to create a new type of Destination specifically
for
SPI that allows for the execution of an arbitrary C function for each
tuple
to be sent.That sounds a lot more sensible than the prior proposals. Callback driven.
Are there other places this would be useful? I'm reluctant to write all of
this just to discover it doesn't help performance at all, but if it's useful
on it's own I can just submit it as a stand-alone patch.I don't have a use for it personally. In BDR and pglogical anything
that does work with nontrivial numbers of tuples uses lower level
scans anyway.I expect anything that uses the SPI to run arbitrary user queries
could have a use for something like this though. Any PL, for one.
Just a quick update on this: I've gotten this working well enough in
plpython to do some performance testing. This patch does change python
results from being a list of dicts to a dict of lists, but I suspect the
vast majority of the speed improvement is from not creating a tuplestore.
The attached sample (from OS X /usr/bin/sample) is interesting. The
highlight is:
! 3398 SPI_execute_callback (in postgres) + 163 [0x110125793]
! 3394 _SPI_execute_plan (in postgres) + 1262 [0x1101253fe]
! : 2043 standard_ExecutorRun (in postgres) + 288 [0x1100f9a40]
! : | 1990 ExecProcNode (in postgres) + 250 [0x1100fd62a]
The top line is the entry into SPI from plpython. The bottom line is
generate_series into a tuplestore and then reading from that tuplestore.
Almost all the time being spent in standard_ExecutorRun is in
PLy_CSreceive, which is appending values to a set of python lists as
it's getting tuples.
The other noteworthy item in the sample is this:
535 list_dealloc (in Python) + 116,103,... [0x11982b1b4,0x11982b1a7,...]
that's how long it's taking python to free the 3 lists (each with
9999999 python int objects).
In short (and at best*), this makes plpython just as fast at processing
results as SELECT count(SELECT s, s, s FROM generate_series()).
The * on that is there's something odd going on where plpython starts
out really fast at this, then gets 100% slower. I've reached out to some
python folks about that. Even so, the overall results from a quick test
on my laptop are (IMHO) impressive:
Old Code New Code Improvement
Pure SQL 2 sec 2 sec
plpython 12.7-14 sec 4-10 sec ~1.3-3x
plpython - SQL 10.7-12 sec 2-8 sec ~1.3-6x
Pure SQL is how long an equivalent query takes to run with just SQL.
plpython - SQL is simply the raw python times minus the pure SQL time.
I suspect other PL languages that have fairly fast object alloc and
dealloc would see a similar benefit.
BTW, the patch currently breaks on nested calls to plpython, but I don't
think that should change the performance much.
The test function:
CREATE OR REPLACE FUNCTION test_series(
iter int
) RETURNS int LANGUAGE plpythonu AS $body$
d = plpy.execute('SELECT s AS some_table_id, s AS some_field_name, s AS some_other_field_name FROM generate_series(1,{}) s'.format(iter) )
return len(d['some_table_id'])
$body$;
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
Attachments:
plpython_callbeck_v2.difftext/plain; charset=UTF-8; name=plpython_callbeck_v2.diff; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 80fc4c4725..ace30d75f0 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
Node *stmt = (Node *) lfirst(lc2);
bool canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2072,7 +2128,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (IsA(stmt, PlannedStmt) &&
((PlannedStmt *) stmt)->utilityStmt == NULL)
@@ -2098,6 +2155,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 60a92801f1..212cfbd101 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -162,6 +170,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -205,6 +214,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -250,6 +260,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 76ba394a2b..15187c47e3 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index dd80433f74..5389f496a2 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -90,6 +90,7 @@ typedef enum
DestRemote, /* results sent to frontend process */
DestRemoteExecute, /* sent to frontend, in Execute command */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..ab303367d3 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,8 +28,27 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext mctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ /* Dictionary of Lists */
+ PyObject *dict;
+ PyObject **lists;
+} CallbackState;
+
+
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
static PyObject *PLy_spi_execute_query(char *query, long limit);
+static PyObject *PLy_spi_execute_query2(char *query, long limit);
static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
uint64 rows, int status);
@@ -341,8 +360,159 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
return ret;
}
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
+ PLyTypeInfo *args;
+
+ MemoryContext mctx, old_mctx;
+ PyObject *dict;
+ PyObject **lists;
+ int i;
+
+ /*
+ * We may be in a different execution context when we're called, so switch
+ * back to our original one.
+ */
+ mctx = myState->mctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /* Store this as a sanity check */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ /*
+ * We never palloc python objects, but this is an array of object pointers,
+ * so it's OK.
+ */
+ myState->lists = lists = palloc0(args->in.r.natts * sizeof(PyObject *));
+
+ myState->dict = dict = PyDict_New();
+ if (dict == NULL)
+ PLy_elog(ERROR, "could not create new dictionary");
+
+
+ for (i = 0; i < args->in.r.natts; i++)
+ {
+ char *key;
+ PyObject *value;
+
+ if (typeinfo->attrs[i]->attisdropped)
+ continue;
+
+ /* NOTE: If size is > 0 then the list must get initialized! */
+ value = PyList_New(0);
+ if (value == NULL)
+ PLy_elog(ERROR, "could not create new list");
+
+ key = NameStr(typeinfo->attrs[i]->attname);
+ PyDict_SetItemString(dict, key, value);
+ Py_DECREF(value);
+
+ /* We want fast access to the lists, so we store them in our array of pointers */
+ lists[i] = value;
+ }
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+
+ MemoryContextDelete(myState->mctx);
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ TupleDesc desc = slot->tts_tupleDescriptor;
+ CallbackState *myState = (CallbackState *) self;
+ PLyTypeInfo *args = myState->args;
+ PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int i, rv;
+
+ /* Verify saved state matches incoming slot */
+ Assert(myState->desc == desc);
+ Assert(args->in.r.natts == desc->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ MemoryContextSwitchTo(scratch_context);
+
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ PyObject * volatile value = NULL;
+ PLyDatumToOb * volatile atts = &args->in.r.atts[i];
+
+ if (desc->attrs[i]->attisdropped)
+ continue;
+
+ if (myState->lists[i] == NULL)
+ ereport(ERROR,
+ (errmsg("missing list for attribute %d", i)));
+ /* XXX If the function can't be null, ditch that check */
+ if (slot->tts_isnull[i] || atts->func == NULL)
+ {
+ Py_INCREF(Py_None);
+ value = Py_None;
+ }
+ else
+ {
+ PG_TRY();
+ {
+ value = (atts->func) (atts, slot->tts_values[i]);
+ }
+ PG_CATCH();
+ {
+ Py_XDECREF(value);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark value
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ rv = PyList_Append(myState->lists[i], value);
+ Py_DECREF(value);
+
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
+ }
+ MemoryContextSwitchTo(oldcontext);
+ /* Should we just do this once, for the whole tuple?? */
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
+
+ /* If we get here then we were successful */
+ return true;
+}
+
static PyObject *
-PLy_spi_execute_query(char *query, long limit)
+PLy_spi_execute_query2(char *query, long limit)
{
int rv;
volatile MemoryContext oldcontext;
@@ -384,6 +554,75 @@ PLy_spi_execute_query(char *query, long limit)
}
static PyObject *
+PLy_spi_execute_query(char *query, long limit)
+{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ int rv;
+ volatile MemoryContext oldcontext, cb_ctx;
+ volatile ResourceOwner oldowner;
+ PyObject *ret = NULL;
+ CallbackState callback;
+
+ oldowner = CurrentResourceOwner;
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python callback context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ //callback = palloc0(sizeof(CallbackState));
+ callback.mctx = cb_ctx;
+ memcpy(&(callback.pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+ callback.pub.receiveSlot = PLy_CSreceive;
+ callback.pub.rStartup = PLy_CSStartup;
+ //callback.pub.rDestroy = PLy_CSDestroy;
+ callback.exec_ctx = exec_ctx;
+
+ PLy_spi_subtransaction_begin(oldcontext, oldowner);
+
+ PG_TRY();
+ {
+
+ pg_verifymbstr(query, strlen(query), false);
+ rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) &callback);
+ /*
+ * callback.dict gets set in PLy_CSStartup, which happens during
+ * executor startup. It's not valid before then.
+ */
+ ret = callback.dict;
+
+ PLy_spi_subtransaction_commit(oldcontext, oldowner);
+ }
+ PG_CATCH();
+ {
+ PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ if (rv < 0)
+ {
+ Py_XDECREF(ret);
+ PLy_exception_set(PLy_exc_spi_error,
+ "SPI_execute failed: %s",
+ SPI_result_code_string(rv));
+ return NULL;
+ }
+
+ /* Free the callback context. */
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(cb_ctx);
+
+ return ret;
+}
+
+static PyObject *
PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
{
PLyResultObject *result;
On 1/5/17 9:50 PM, Jim Nasby wrote:
The * on that is there's something odd going on where plpython starts
out really fast at this, then gets 100% slower. I've reached out to some
python folks about that. Even so, the overall results from a quick test
on my laptop are (IMHO) impressive:Old Code New Code Improvement
Pure SQL 2 sec 2 sec
plpython 12.7-14 sec 4-10 sec ~1.3-3x
plpython - SQL 10.7-12 sec 2-8 sec ~1.3-6xPure SQL is how long an equivalent query takes to run with just SQL.
plpython - SQL is simply the raw python times minus the pure SQL time.
I finally got all the kinks worked out and did some testing with python
3. Performance for my test [1] improved ~460% when returning a dict of
lists (as opposed to the current list of dicts). Based on previous
testing, I expect that using this method to return a list of dicts will
be about 8% slower. The inconsistency in results on 2.7 has to do with
how python 2 handles ints.
Someone who's familiar with pl/perl should take a look at this and see
if it would apply there. I've attached the SPI portion of this patch.
I think the last step here is to figure out how to support switching
between the current behavior and the "columnar" behavior of a dict of
lists. I believe the best way to do that is to add two optional
arguments to the execution functions: container=[] and members={}, and
then copy those to produce the output objects. That means you can get
the new behavior by doing something like:
plpy.execute('...', container={}, members=[])
Or, more interesting, you could do:
plpy.execute('...', container=Pandas.DataFrame, members=Pandas.Series)
since that's what a lot of people are going to want anyway.
In the future we could also add a GUC to change the default behavior.
Any concerns with that approach?
1:
d = plpy.execute('SELECT s AS some_table_id, s AS some_field_name, s AS some_other_field_name FROM generate_series(1,{}) s'.format(iter) )
return len(d['some_table_id'])
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
Attachments:
spi_callback.patchtext/plain; charset=UTF-8; name=spi_callback.patch; x-mac-creator=0; x-mac-type=0Download
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 7bd37283b7..97585d272e 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = (PlannedStmt *) lfirst(lc2);
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index da39f43f38..85141a0a0f 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -162,6 +170,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -205,6 +214,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -250,6 +260,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 93f9b7463a..9a8c98e267 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -90,6 +90,7 @@ typedef enum
DestRemote, /* results sent to frontend process */
DestRemoteExecute, /* sent to frontend, in Execute command */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
On 24 January 2017 at 11:23, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
I finally got all the kinks worked out and did some testing with python 3.
Performance for my test [1] improved ~460% when returning a dict of lists
(as opposed to the current list of dicts). Based on previous testing, I
expect that using this method to return a list of dicts will be about 8%
slower. The inconsistency in results on 2.7 has to do with how python 2
handles ints.
Impressive results.
I think the last step here is to figure out how to support switching between
the current behavior and the "columnar" behavior of a dict of lists.
That sounds like it'd be much better approached as a separate, later patch.
If I understand you correctly, you propose to return the resultset
a b
1 10
2 20
which is currently returned as
[ {"a":1, "b":10}, {"a":2, "b":20} ]
instead as
{ "a": [1, 2], "b": [10, 20] }
?
If so I see that as a lot more of a niche thing. I can see why it'd be
useful and would help performance, but it seems much more disruptive.
It requires users to discover it exists, actively adopt a different
style of ingesting data, etc. For a 10%-ish gain in a PL.
I strongly suggest making this design effort a separate thread, and
focusing on the SPI improvements that give "free" no-user-action
performance boosts here.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 1/23/17 10:36 PM, Craig Ringer wrote:
which is currently returned as
[ {"a":1, "b":10}, {"a":2, "b":20} ]
instead as
{ "a": [1, 2], "b": [10, 20] }
Correct.
If so I see that as a lot more of a niche thing. I can see why it'd be
useful and would help performance, but it seems much more disruptive.
It requires users to discover it exists, actively adopt a different
style of ingesting data, etc. For a 10%-ish gain in a PL.
In data science, what we're doing now is actually the niche. All real
analytics happens with something like a Pandas DataFrame, which is
organized as a dict of lists.
This isn't just idle nomenclature either: organizing results in what
amounts to a column store provides a significant speed improvement for
most analytics, because you're working on an array of contiguous memory
(at least, when you're using more advanced types like DataFrames and
Series).
I strongly suggest making this design effort a separate thread, and
focusing on the SPI improvements that give "free" no-user-action
performance boosts here.
Fair enough. I posted the SPI portion of that yesterday. That should be
useful for pl/R and possibly pl/perl. pl/tcl could make use of it, but
it would end up executing arbitrary tcl code in the middle of portal
execution, which doesn't strike me as a great idea. Unfortunately, I
don't think plpgsql could make much use of this for similar reasons.
I'll post a plpython patch that doesn't add the output format control.
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 1/23/17 9:23 PM, Jim Nasby wrote:
I think the last step here is to figure out how to support switching
between the current behavior and the "columnar" behavior of a dict of lists.
I've thought more about this... instead of trying to switch from the
current situation of 1 choice of how results are return to 2 choices, I
think it'd be better to just expose the API that the new Destination
type provides to SPI. Specifically, execute a python function during
Portal startup, and a different function for receiving tuples. There'd
be an optional 3rd function for Portal shutdown.
The startup function would be handed details of the resultset it was
about to receive, as a list that contained python tuples with the
results of SPI_fname, _gettype, _gettypeid. This function would return a
callback version number and a python object that would be kept in the
DestReceiver.
The receiver function would get the object created by the startup
function, as well as a python tuple of the TupleTableSlot that had gone
through type conversion. It would need to add the value to the object
from the startup function. It would return true or false, just like a
Portal receiver function does.
The shutdown function would receive the object that's been passed
around. It would be able to do any post-processing. Whatever it returned
is what would be handed back to python as the results of the query.
The version number returned by the startup function allows for future
improvements to this facility. One idea there is allowing the startup
function to control how Datums get mapped into python objects.
In order to support all of this without breaking backwards compatibility
or forking a new API, plpy.execute would accept a kwdict, to avoid
conflicting with the arbitrary number of arguments that can currently be
accepted. We'd look in the kwdict for a key called "portal_functions"
pointing at a 2 or 3 element tuple of the startup, receive and shutdown
functions. plpy would pre-define a tuple that provides the current
behavior, and that's what would be used by default. In the future, we
might add a way to control the default.
Comments?
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 1/24/17 10:43 PM, Jim Nasby wrote:
I strongly suggest making this design effort a separate thread, and
focusing on the SPI improvements that give "free" no-user-action
performance boosts here.Fair enough. I posted the SPI portion of that yesterday. That should be
useful for pl/R and possibly pl/perl. pl/tcl could make use of it, but
it would end up executing arbitrary tcl code in the middle of portal
execution, which doesn't strike me as a great idea. Unfortunately, I
don't think plpgsql could make much use of this for similar reasons.I'll post a plpython patch that doesn't add the output format control.
I've attached the results of that. Unfortunately the speed improvement
is only 27% at this point (with 9999999 tuples). Presumably that's
because it's constructing a brand new dictionary from scratch for each
tuple.
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
Attachments:
0002-Minimal-adoption-of-SPI-callbacks-in-plpython.patchtext/plain; charset=UTF-8; name=0002-Minimal-adoption-of-SPI-callbacks-in-plpython.patch; x-mac-creator=0; x-mac-type=0Download
>From 0fb68b2e2649ab81fae8c86c45c1522f1b0d56ab Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Tue, 28 Feb 2017 21:54:45 -0600
Subject: [PATCH 2/2] Minimal adoption of SPI callbacks in plpython
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 294 ++++++++++++++++++++++++++++++++------------
3 files changed, 231 insertions(+), 79 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..6f48e70329 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,10 +28,30 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
+
static PyObject *PLy_spi_execute_query(char *query, long limit);
static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -196,6 +216,7 @@ PLy_spi_execute(PyObject *self, PyObject *args)
static PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -289,9 +310,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
- exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ callback = PLy_Callback_New(exec_ctx);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+ exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -316,9 +339,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -344,6 +369,8 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
int rv;
volatile MemoryContext oldcontext;
volatile ResourceOwner oldowner;
@@ -356,20 +383,23 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ callback = PLy_Callback_New(exec_ctx);
+ rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -383,94 +413,200 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
{
+ volatile MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python callback context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
+{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ /*
+ * We may be in a different execution context when we're called, so switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /* We need to store this because the TupleDesc the receive function gets has no names. */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives
+ * outside of an SPI context. We trust that PLy_result_dealloc()
+ * will clean it up when the time is right. XXX This might result in a leak
+ * if an error happens and the result doesn't get dereferenced.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
- if (status > 0 && tuptable == NULL)
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
+
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ TupleDesc slotdesc = slot->tts_tupleDescriptor;
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slotdesc->tdtypeid);
+ Assert(args->in.r.natts == slotdesc->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark value
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- cxt = AllocSetContextCreate(CurrentMemoryContext,
- "PL/Python temp context",
- ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- oldcontext = CurrentMemoryContext;
- PG_TRY();
- {
- MemoryContext oldcontext2;
+ return true;
+}
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use Py_ssize_t for list
- * size and list indices; so we cannot support a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row = PLyDict_FromTuple(&args,
- tuptable->vals[i],
- tuptable->tupdesc);
- PyList_SetItem(result->rows, i, row);
- }
- }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
+ {
/*
- * Save tuple descriptor for later use by result set metadata
- * functions. Save it in TopMemoryContext so that it survives
- * outside of an SPI context. We trust that PLy_result_dealloc()
- * will clean it up when the time is right. (Do this as late as
- * possible, to minimize the number of ways the tupdesc could get
- * leaked due to errors.)
+ * This happens if the command returned no results. Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
- }
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.11.1
0001-Add-SPI_execute_callback-and-callback-based-DestRece.patchtext/plain; charset=UTF-8; name=0001-Add-SPI_execute_callback-and-callback-based-DestRece.patch; x-mac-creator=0; x-mac-type=0Download
>From 7ef3e944c1ee8266d70fafae080afc6beb492102 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 25 Jan 2017 12:57:40 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
DestReceiver.
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 76 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 11 +++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 83 insertions(+), 9 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 55f97b14e6..d55e06509f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..bd671e0b26 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute command */
DestRemoteSimple, /* sent to frontend, w/no catalog access */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
--
2.11.1
On 12/20/16 23:14, Jim Nasby wrote:
I've been looking at the performance of SPI calls within plpython.
There's a roughly 1.5x difference from equivalent python code just in
pulling data out of the SPI tuplestore. Some of that is due to an
inefficiency in how plpython is creating result dictionaries, but fixing
that is ultimately a dead-end: if you're dealing with a lot of results
in python, you want a tuple of arrays, not an array of tuples.
There is nothing that requires us to materialize the results into an
actual list of actual rows. We could wrap the SPI_tuptable into a
Python object and implement __getitem__ or __iter__ to emulate sequence
or mapping access.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Mar 2, 2017 at 10:03 AM, Peter Eisentraut <
peter.eisentraut@2ndquadrant.com> wrote:
On 12/20/16 23:14, Jim Nasby wrote:
I've been looking at the performance of SPI calls within plpython.
There's a roughly 1.5x difference from equivalent python code just in
pulling data out of the SPI tuplestore. Some of that is due to an
inefficiency in how plpython is creating result dictionaries, but fixing
that is ultimately a dead-end: if you're dealing with a lot of results
in python, you want a tuple of arrays, not an array of tuples.There is nothing that requires us to materialize the results into an
actual list of actual rows. We could wrap the SPI_tuptable into a
Python object and implement __getitem__ or __iter__ to emulate sequence
or mapping access.
Python objects have a small (but non-zero) overhead in terms of both memory
and speed. A built-in dictionary is probably one of the least-expensive
(memory/cpu) choices, although how the dictionary is constructed also
impacts performance. Another choice is a tuple.
Avoiding Py_BuildValue(...) in exchange for a bit more complexity (via
PyTuple_New(..) and PyTuple_SetItem(...)) is also a nice performance win in
my experience.
--
Jon
On 3/2/17 8:03 AM, Peter Eisentraut wrote:
On 12/20/16 23:14, Jim Nasby wrote:
I've been looking at the performance of SPI calls within plpython.
There's a roughly 1.5x difference from equivalent python code just in
pulling data out of the SPI tuplestore. Some of that is due to an
inefficiency in how plpython is creating result dictionaries, but fixing
that is ultimately a dead-end: if you're dealing with a lot of results
in python, you want a tuple of arrays, not an array of tuples.There is nothing that requires us to materialize the results into an
actual list of actual rows. We could wrap the SPI_tuptable into a
Python object and implement __getitem__ or __iter__ to emulate sequence
or mapping access.
Would it be possible to have that just pull tuples directly from the
executor? The overhead of populating the tuplestore just to drain it
again can become quite significant, and AFAICT it's completely unnecessary.
Unfortunately, I think adding support for that would be even more
invasive, which is why I haven't attempted it. On the flip side, I
believe that kind of an interface would be usable by plpgsql, whereas
the DestReceiver approach is not (AFAICT).
--
Jim Nasby, Chief Data Architect, OpenSCG
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2/28/17 9:42 PM, Jim Nasby wrote:
I'll post a plpython patch that doesn't add the output format control.
I've attached the results of that. Unfortunately the speed improvement
is only 27% at this point (with 9999999 tuples). Presumably that's
because it's constructing a brand new dictionary from scratch for each
tuple.
I found a couple bugs. New patches attached.
--
Jim Nasby, Chief Data Architect, OpenSCG
Attachments:
0001-Add-SPI_execute_callback-and-callback-based-DestRece.patchtext/plain; charset=UTF-8; name=0001-Add-SPI_execute_callback-and-callback-based-DestRece.patch; x-mac-creator=0; x-mac-type=0Download
From 116b6a45b0146e42f1faa130d78e9362950c18c3 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 1 Mar 2017 15:45:51 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
DestReceiver.
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 79 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 11 +++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 85 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 55f97b14e6..ffeba679da 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
@@ -2262,7 +2320,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
switch (operation)
{
case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest != DestSPI &&
+ queryDesc->dest->mydest != DestSPICallback)
{
/* Don't return SPI_OK_SELECT if we're discarding result */
res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..bd671e0b26 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute command */
DestRemoteSimple, /* sent to frontend, w/no catalog access */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
--
2.11.1
0002-Modify-plpython-to-use-SPI-callbacks.patchtext/plain; charset=UTF-8; name=0002-Modify-plpython-to-use-SPI-callbacks.patch; x-mac-creator=0; x-mac-type=0Download
From f6a623637870f67036e375b2928ac1adc9c7184b Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 1 Mar 2017 15:46:53 -0600
Subject: [PATCH 2/2] Modify plpython to use SPI callbacks
This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 299 ++++++++++++++++++++++++++++++++------------
3 files changed, 234 insertions(+), 81 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..031b2ed8a9 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,10 +28,30 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
+
static PyObject *PLy_spi_execute_query(char *query, long limit);
static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -196,6 +216,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
static PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -238,12 +260,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
oldcontext = CurrentMemoryContext;
oldowner = CurrentResourceOwner;
+ callback = PLy_Callback_New(exec_ctx);
PLy_spi_subtransaction_begin(oldcontext, oldowner);
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
char *volatile nulls;
volatile int j;
@@ -289,9 +311,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
- exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+ exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -316,9 +339,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -344,9 +369,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
- int rv;
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback = PLy_Callback_New(exec_ctx);
volatile MemoryContext oldcontext;
volatile ResourceOwner oldowner;
+ int rv;
PyObject *ret = NULL;
oldcontext = CurrentMemoryContext;
@@ -356,20 +383,22 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -383,94 +412,202 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+ MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python callback context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
+
+ /*
+ * We may be in a different execution context when we're called, so switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /* We need to store this because the TupleDesc the receive function gets has no names. */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives
+ * outside of an SPI context. We trust that PLy_result_dealloc()
+ * will clean it up when the time is right. XXX This might result in a leak
+ * if an error happens and the result doesn't get dereferenced.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
- if (status > 0 && tuptable == NULL)
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
+
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv = 1;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+ Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark row
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ if (row)
+ {
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+ }
+
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- cxt = AllocSetContextCreate(CurrentMemoryContext,
- "PL/Python temp context",
- ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- oldcontext = CurrentMemoryContext;
- PG_TRY();
- {
- MemoryContext oldcontext2;
+ return true;
+}
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use Py_ssize_t for list
- * size and list indices; so we cannot support a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row = PLyDict_FromTuple(&args,
- tuptable->vals[i],
- tuptable->tupdesc);
- PyList_SetItem(result->rows, i, row);
- }
- }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
+ {
/*
- * Save tuple descriptor for later use by result set metadata
- * functions. Save it in TopMemoryContext so that it survives
- * outside of an SPI context. We trust that PLy_result_dealloc()
- * will clean it up when the time is right. (Do this as late as
- * possible, to minimize the number of ways the tupdesc could get
- * leaked due to errors.)
+ * This happens if the command returned no results. Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
- }
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.11.1
On 3/5/17 16:07, Jim Nasby wrote:
There is nothing that requires us to materialize the results into an
actual list of actual rows. We could wrap the SPI_tuptable into a
Python object and implement __getitem__ or __iter__ to emulate sequence
or mapping access.Would it be possible to have that just pull tuples directly from the
executor? The overhead of populating the tuplestore just to drain it
again can become quite significant, and AFAICT it's completely unnecessary.
I think there are many options, depending on what you want. If you want
to materialize the result, then you have to materialize it somewhere,
and then make a Python object around that. Or you could make an
iterator interface that just reads a tuple at a time. Or maybe there
are other options.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6 March 2017 at 05:09, Jim Nasby <jim.nasby@openscg.com> wrote:
On 2/28/17 9:42 PM, Jim Nasby wrote:
I'll post a plpython patch that doesn't add the output format control.
I've attached the results of that. Unfortunately the speed improvement
is only 27% at this point (with 9999999 tuples). Presumably that's
because it's constructing a brand new dictionary from scratch for each
tuple.
Taking a look at this now.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 5 April 2017 at 08:00, Craig Ringer <craig@2ndquadrant.com> wrote:
Taking a look at this now.
Rebased to current master with conflicts and whitespace errors fixed.
Review pending.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Add-SPI_execute_callback-and-callback-based-DestRece.patchtext/x-patch; charset=US-ASCII; name=0001-Add-SPI_execute_callback-and-callback-based-DestRece.patchDownload
From 71b6163734934db3160de81fd064e7d113b9122f Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 1 Mar 2017 15:45:51 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
DestReceiver.
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 79 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 11 +++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 85 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 3a89ccd..a0af31a 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -321,7 +322,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -355,7 +383,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -384,7 +439,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -425,7 +480,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -472,7 +527,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1907,7 +1962,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1918,6 +1974,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2037,7 +2094,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2138,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2108,6 +2165,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
@@ -2281,7 +2339,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
switch (operation)
{
case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest != DestSPI &&
+ queryDesc->dest->mydest != DestSPICallback)
{
/* Don't return SPI_OK_SELECT if we're discarding result */
res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3..bd671e0 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d..13719e1 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2..1d1d641 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute command */
DestRemoteSimple, /* sent to frontend, w/no catalog access */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
--
2.5.5
0002-Modify-plpython-to-use-SPI-callbacks.patchtext/x-patch; charset=US-ASCII; name=0002-Modify-plpython-to-use-SPI-callbacks.patchDownload
From c846452b1b0ca30f2ae9c08488482ef1b9b9e9a9 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 1 Mar 2017 15:46:53 -0600
Subject: [PATCH 2/2] Modify plpython to use SPI callbacks
This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 303 ++++++++++++++++++++++++++++++++------------
3 files changed, 235 insertions(+), 84 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804..07501f1 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4..7cbe0a8 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856cc..dcec0c7 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
oldcontext = CurrentMemoryContext;
oldowner = CurrentResourceOwner;
+ callback = PLy_Callback_New(exec_ctx);
PLy_spi_subtransaction_begin(oldcontext, oldowner);
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
char *volatile nulls;
volatile int j;
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
- exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+ exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback = PLy_Callback_New(exec_ctx);
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
int rv;
- volatile MemoryContext oldcontext;
- volatile ResourceOwner oldowner;
PyObject *ret = NULL;
oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -382,94 +409,202 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
{
+ MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python callback context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
+{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ /*
+ * We may be in a different execution context when we're called, so switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
- if (status > 0 && tuptable == NULL)
+ /* We need to store this because the TupleDesc the receive function gets has no names. */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives
+ * outside of an SPI context. We trust that PLy_result_dealloc()
+ * will clean it up when the time is right. XXX This might result in a leak
+ * if an error happens and the result doesn't get dereferenced.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
+
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv = 1;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+ Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark row
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ if (row)
+ {
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+ }
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- cxt = AllocSetContextCreate(CurrentMemoryContext,
- "PL/Python temp context",
- ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- oldcontext = CurrentMemoryContext;
- PG_TRY();
+ return true;
+}
+
+
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
{
- MemoryContext oldcontext2;
-
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use Py_ssize_t for list
- * size and list indices; so we cannot support a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row = PLyDict_FromTuple(&args,
- tuptable->vals[i],
- tuptable->tupdesc);
-
- PyList_SetItem(result->rows, i, row);
- }
- }
-
/*
- * Save tuple descriptor for later use by result set metadata
- * functions. Save it in TopMemoryContext so that it survives
- * outside of an SPI context. We trust that PLy_result_dealloc()
- * will clean it up when the time is right. (Do this as late as
- * possible, to minimize the number of ways the tupdesc could get
- * leaked due to errors.)
+ * This happens if the command returned no results. Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
- }
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.5.5
On 5 April 2017 at 08:23, Craig Ringer <craig@2ndquadrant.com> wrote:
On 5 April 2017 at 08:00, Craig Ringer <craig@2ndquadrant.com> wrote:
Taking a look at this now.
Rebased to current master with conflicts and whitespace errors fixed.
Review pending.
This patch fails to update the documentation at all.
https://www.postgresql.org/docs/devel/static/spi.html
The patch crashes in initdb with --enable-cassert builds:
performing post-bootstrap initialization ... TRAP:
FailedAssertion("!(myState->pub.mydest == DestSQLFunction)", File:
"functions.c", Line: 800)
sh: line 1: 30777 Aborted (core dumped)
"/home/craig/projects/2Q/postgres/tmp_install/home/craig/pg/10/bin/postgres"
--single -F -O -j -c search_path=pg_catalog -c exit_on_error=true
template1 > /dev/null
child process exited with exit code 134
Backtrace attached.
Details on patch 1:
missing newline
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+/* Execute a previously prepared plan with a callback Destination */
caps "Destination"
+ // XXX throw error if callback is set
^^
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
Is the callback destreceiver supposed to be a blackhole? Why? Its
name, spi_callbackDR and DestSPICallback, doesn't seem to indicate
that it drops its input.
Presumably that's a default destination you're then supposed to modify
with your own callbacks? There aren't any comments to indicate what's
going on here.
Comments on patch 2:
If this is the "bare minimum" patch, what is pending? Does it impose
any downsides or limits?
+/* Get switch execution contexts */
+extern PLyExecutionContext
*PLy_switch_execution_context(PLyExecutionContext *new);
Comment makes no sense to me. This seems something like memory context
switch, where you supply the new and return the old. But the comment
doesn't explain it at all.
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
These are declared in the plpy_spi.c. Why aren't these static?
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
int rv;
- volatile MemoryContext oldcontext;
- volatile ResourceOwner oldowner;
Unnecessary code movement.
In PLy_Callback_New, I think your use of a sub-context is sensible. Is
it necessary to palloc0 though?
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
The code here looks like it could be part of spi.c's callback support,
rather than plpy_spi specific, since you provide a destroy callback in
the SPI callback struct.
+ /* We need to store this because the TupleDesc the receive
function gets has no names. */
+ myState->desc = typeinfo;
Is it safe to just store the pointer to the TupleDesc here? What's the lifetime?
+ * will clean it up when the time is right. XXX This might result in a leak
+ * if an error happens and the result doesn't get dereferenced.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
especially given this XXX comment...
Patch needs bug fix, docs updates, fixes for issues marked in
comments. But overall approach looks sensible enough.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
On 4/4/17 7:44 PM, Craig Ringer wrote:
The patch crashes in initdb with --enable-cassert builds:
Thanks for the review! I'll get to the rest of it in a bit, but I'm
unable to reproduce the initdb failure. I looked at the assert line and
I don't see anything obvious either. :/
Can you send your full configure call? uname -a? Mine is:
./configure --with-includes=/opt/local/include
--with-libraries=/opt/local/lib --enable-debug --with-libxml --with-tcl
--with-perl --with-python --enable-depend --enable-dtrace
--enable-tap-tests --prefix=/Users/decibel/pgsql/HEAD/i/i
--with-pgport=$PGC_PORT -C --enable-cassert --enable-debug CFLAGS='-ggdb
-O0 -fno-omit-frame-pointer'
Darwin decina.local 15.6.0 Darwin Kernel Version 15.6.0: Mon Jan 9
23:07:29 PST 2017; root:xnu-3248.60.11.2.1~1/RELEASE_X86_64 x86_64
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/4/17 7:44 PM, Craig Ringer wrote:
On 5 April 2017 at 08:23, Craig Ringer <craig@2ndquadrant.com> wrote:
On 5 April 2017 at 08:00, Craig Ringer <craig@2ndquadrant.com> wrote:
This patch fails to update the documentation at all.
I'll fix that soon.
missing newline
Fixed.
+/* Execute a previously prepared plan with a callback Destination */
caps "Destination"
Hmm, I capitalized it since DestReceiver is capitalized. I guess it's
best to just drop Destination.
+ // XXX throw error if callback is set
Fixed (opted to just use an Assert).
+static DestReceiver spi_callbackDR = { + donothingReceive, donothingStartup, donothingCleanup, donothingCleanup, + DestSPICallback +}; Presumably that's a default destination you're then supposed to modify with your own callbacks? There aren't any comments to indicate what's going on here.
Correct. Added the following comment:
/*
* This is strictly a starting point for creating a callback. It should not
* actually be used.
*/
Comments on patch 2:
If this is the "bare minimum" patch, what is pending? Does it impose
any downsides or limits?
No limits. I'm not aware of any downsides.
It's "bare minimum" because a follow-on step is to allow different
methods of returning results. In particular, my testing indicates that
patch 1 + returning a dict of lists (as opposed to the current list of
dicts) results in a 460% improvement vs ~30% with patch 2.
+/* Get switch execution contexts */ +extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);Comment makes no sense to me. This seems something like memory context
switch, where you supply the new and return the old. But the comment
doesn't explain it at all.
Comment changed to
/* Switch execution context (similar to MemoryContextSwitchTo */
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo); +void PLy_CSDestroy(DestReceiver *self);These are declared in the plpy_spi.c. Why aren't these static?
Derp. Fixed.
+ volatile MemoryContext oldcontext; + volatile ResourceOwner oldowner; int rv; - volatile MemoryContext oldcontext; - volatile ResourceOwner oldowner;Unnecessary code movement.
IMHO it reads better that way. I've left it for now so $COMMITTER can
decide, but if it really bugs you let me know and I'll swap it.
In PLy_Callback_New, I think your use of a sub-context is sensible. Is
it necessary to palloc0 though?
Hrm, maybe not... but it seems like cheap insurance considering the
amount of other stuff involved in just starting a new SPI call. And
honestly, I'd rather not mess with it at this point. :) I have added an
XXX comment though.
+static CallbackState * +PLy_Callback_Free(CallbackState *callback)The code here looks like it could be part of spi.c's callback support,
rather than plpy_spi specific, since you provide a destroy callback in
the SPI callback struct.
I added this for use in PG_CATCH() blocks, because it's not clear to me
that the portal gets cleaned up in those cases. It's certainly possible
that it's pointless.
FWIW, I'm pretty sure I copied that pattern from somewhere else...
probably plpgsql or pltcl.
+ /* We need to store this because the TupleDesc the receive function gets has no names. */ + myState->desc = typeinfo;Is it safe to just store the pointer to the TupleDesc here? What's the lifetime?
Only Portal lifetime.
+ * will clean it up when the time is right. XXX This might result in a leak + * if an error happens and the result doesn't get dereferenced. + */ + MemoryContextSwitchTo(TopMemoryContext); + result->tupdesc = CreateTupleDescCopy(typeinfo);especially given this XXX comment...
I've changed the comment to the following. Hopefully this clarifies things:
/*
* Save tuple descriptor for later use by result set metadata
* functions. Save it in TopMemoryContext so that it survives outside of
* an SPI context. We trust that PLy_result_dealloc() will clean it up
* when the time is right. The difference between result and everything
* else is that result needs to survive after the portal is destroyed,
* because result is what's handed back to the plpython function. While
* it's tempting to use something other than TopMemoryContext, that won't
* work: the user could potentially put result into the global dictionary,
* which means it could survive as long as the session does. This might
* result in a leak if an error happens and the result doesn't get
* dereferenced, but if that happens it means the python GC has failed us,
* at which point we probably have bigger problems.
*
* This still isn't perfect though; if something the result tupledesc
* references has it's OID changed then the tupledesc will be invalid. I'm
* not sure it's worth worrying about that though.
*/
Updated patches attached, but I still need to update the docs.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
Attachments:
0001-Add-SPI_execute_callback.patchtext/plain; charset=UTF-8; name=0001-Add-SPI_execute_callback.patch; x-mac-creator=0; x-mac-type=0Download
From 0a2ef661f55a047763a43b0eebd7483760e4a427 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 5 Apr 2017 20:52:39 -0500
Subject: [PATCH 1/2] Add SPI_execute_callback
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 80 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 15 +++++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 90 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index ca547dc6d9..4f6c3011f9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -321,7 +322,35 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -355,7 +384,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -384,7 +440,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -425,7 +481,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -472,7 +528,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1907,7 +1963,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1918,6 +1975,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2037,7 +2095,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2139,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2108,6 +2166,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ Assert(!callback);
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
@@ -2281,7 +2340,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
switch (operation)
{
case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest != DestSPI &&
+ queryDesc->dest->mydest != DestSPICallback)
{
/* Don't return SPI_OK_SELECT if we're discarding result */
res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..f68b6e1b51 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,15 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+/*
+ * This is strictly a starting point for creating a callback. It should not
+ * actually be used.
+ */
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +135,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +184,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +229,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +276,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d477..13719e1df5 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute command */
DestRemoteSimple, /* sent to frontend, w/no catalog access */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
--
2.11.1
0002-Switch-plpython-to-using-SPI_execute_callback.patchtext/plain; charset=UTF-8; name=0002-Switch-plpython-to-using-SPI_execute_callback.patch; x-mac-creator=0; x-mac-type=0Download
From d693fa42135e5f773cc8affcd26eba4d2ef22f2b Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 5 Apr 2017 21:30:39 -0500
Subject: [PATCH 2/2] Switch plpython to using SPI_execute_callback
This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 313 ++++++++++++++++++++++++++++++++------------
3 files changed, 248 insertions(+), 81 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..fe30dbc14b 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Switch execution context (similar to MemoryContextSwitchTo */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856ccbac..236cc6d998 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+static void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+static void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
oldcontext = CurrentMemoryContext;
oldowner = CurrentResourceOwner;
+ callback = PLy_Callback_New(exec_ctx);
PLy_spi_subtransaction_begin(oldcontext, oldowner);
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
char *volatile nulls;
volatile int j;
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
- exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+ exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
- int rv;
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback = PLy_Callback_New(exec_ctx);
volatile MemoryContext oldcontext;
volatile ResourceOwner oldowner;
+ int rv;
PyObject *ret = NULL;
oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -382,94 +409,218 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+ MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ /* XXX does this really need palloc0? */
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python callback context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
+
+ /*
+ * We may be in a different execution context when we're called, so switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /*
+ * We need to store this because the TupleDesc that the receive function
+ * gets has no names.
+ */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ /* result is actually myState.result */
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives outside of
+ * an SPI context. We trust that PLy_result_dealloc() will clean it up
+ * when the time is right. The difference between result and everything
+ * else is that result needs to survive after the portal is destroyed,
+ * because result is what's handed back to the plpython function. While
+ * it's tempting to use something other than TopMemoryContext, that won't
+ * work: the user could potentially put result into the global dictionary,
+ * which means it could survive as long as the session does. This might
+ * result in a leak if an error happens and the result doesn't get
+ * dereferenced, but if that happens it means the python GC has failed us,
+ * at which point we probably have bigger problems.
+ *
+ * This still isn't perfect though; if something the result tupledesc
+ * references has it's OID changed then the tupledesc will be invalid. I'm
+ * not sure it's worth worrying about that though.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
- if (status > 0 && tuptable == NULL)
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv = 1;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+ Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark row
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ if (row)
+ {
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+ }
- cxt = AllocSetContextCreate(CurrentMemoryContext,
- "PL/Python temp context",
- ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- oldcontext = CurrentMemoryContext;
- PG_TRY();
- {
- MemoryContext oldcontext2;
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use Py_ssize_t for list
- * size and list indices; so we cannot support a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row = PLyDict_FromTuple(&args,
- tuptable->vals[i],
- tuptable->tupdesc);
+ return true;
+}
- PyList_SetItem(result->rows, i, row);
- }
- }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
+ {
/*
- * Save tuple descriptor for later use by result set metadata
- * functions. Save it in TopMemoryContext so that it survives
- * outside of an SPI context. We trust that PLy_result_dealloc()
- * will clean it up when the time is right. (Do this as late as
- * possible, to minimize the number of ways the tupdesc could get
- * leaked due to errors.)
+ * This happens if the command returned no results. Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
- }
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.11.1
On 6 April 2017 at 09:40, Jim Nasby <jim.nasby@openscg.com> wrote:
On 4/4/17 7:44 PM, Craig Ringer wrote:
The patch crashes in initdb with --enable-cassert builds:
Thanks for the review! I'll get to the rest of it in a bit, but I'm unable
to reproduce the initdb failure. I looked at the assert line and I don't see
anything obvious either. :/Can you send your full configure call? uname -a? Mine is:
./configure --with-includes=/opt/local/include
--with-libraries=/opt/local/lib --enable-debug --with-libxml --with-tcl
--with-perl --with-python --enable-depend --enable-dtrace --enable-tap-tests
--prefix=/Users/decibel/pgsql/HEAD/i/i --with-pgport=$PGC_PORT -C
--enable-cassert --enable-debug CFLAGS='-ggdb -O0 -fno-omit-frame-pointer'
./configure --prefix=/home/craig/pg/10 --enable-cassert --enable-debug
--enable-tap-tests --with-python
make -s clean
make -s -j4
make check
results in the crash here.
if I add
CFLAGS=""
to the arguments (which suppresses the default "-O2"), or pass
CFLAGS="-O0"
then the crash goes away.
$ python --version
Python 2.7.11
$ lsb_release -a
LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch
Distributor ID: Fedora
Description: Fedora release 23 (Twenty Three)
Release: 23
Codename: TwentyThree
$ gcc --version
gcc (GCC) 5.3.1 20160406 (Red Hat 5.3.1-6)
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/5/17 7:44 PM, Jim Nasby wrote:
Updated patches attached, but I still need to update the docs.
Attached is a complete series of patches that includes the docs patch.
Right now, the docs don't include a concrete example, because adding one
would be a pretty large if it demonstrated real usage, which presumably
means Yet Another Contrib Module strictly for the purpose of
demonstrating something. Rather than doing that, ISTM it'd be better to
point the user at what plpythonu is doing.
Another option would be to have a very simple example that only uses
*receiveSlot, but that seems rather pointless to me.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
Attachments:
0001-Add-SPI_execute_callback.patchtext/plain; charset=UTF-8; name=0001-Add-SPI_execute_callback.patch; x-mac-creator=0; x-mac-type=0Download
From 0a2ef661f55a047763a43b0eebd7483760e4a427 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 5 Apr 2017 20:52:39 -0500
Subject: [PATCH 1/3] Add SPI_execute_callback
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 80 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 15 +++++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 90 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index ca547dc6d9..4f6c3011f9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount);
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -321,7 +322,35 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -355,7 +384,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+ _SPI_convert_params(plan->nargs, plan->argtypes,
+ Values, Nulls),
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, callback);
_SPI_end_call(true);
return res;
@@ -384,7 +440,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -425,7 +481,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -472,7 +528,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1907,7 +1963,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64 tcount)
+ bool read_only, bool fire_triggers, uint64 tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1918,6 +1975,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2037,7 +2095,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2139,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2108,6 +2166,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char completionTag[COMPLETION_TAG_BUFSIZE];
+ Assert(!callback);
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
@@ -2281,7 +2340,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
switch (operation)
{
case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest != DestSPI &&
+ queryDesc->dest->mydest != DestSPICallback)
{
/* Don't return SPI_OK_SELECT if we're discarding result */
res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..f68b6e1b51 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,15 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+/*
+ * This is strictly a starting point for creating a callback. It should not
+ * actually be used.
+ */
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +135,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +184,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +229,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +276,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d477..13719e1df5 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver *callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute command */
DestRemoteSimple, /* sent to frontend, w/no catalog access */
DestSPI, /* results sent to SPI manager */
+ DestSPICallback, /* results sent to user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code */
--
2.11.1
0002-Add-documentation-for-SPI_execute_callback.patchtext/plain; charset=UTF-8; name=0002-Add-documentation-for-SPI_execute_callback.patch; x-mac-creator=0; x-mac-type=0Download
From 093cbf519296ff262a534bbe912cafac477f5692 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 5 Apr 2017 22:42:13 -0500
Subject: [PATCH 2/3] Add documentation for SPI_execute_callback
---
doc/src/sgml/spi.sgml | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 118 insertions(+)
diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml
index 86be87c0fd..901bb9cbb1 100644
--- a/doc/src/sgml/spi.sgml
+++ b/doc/src/sgml/spi.sgml
@@ -600,6 +600,124 @@ int SPI_exec(const char * <parameter>command</parameter>, long <parameter>count<
<!-- *********************************************** -->
+<refentry id="spi-spi-execute-callback">
+ <indexterm><primary>SPI_exececute_callback</primary></indexterm>
+
+ <refmeta>
+ <refentrytitle>SPI_execute_callback</refentrytitle>
+ <manvolnum>3</manvolnum>
+ </refmeta>
+
+ <refnamediv>
+ <refname>SPI_execute_callback</refname>
+ <refpurpose>execute a read/write command </refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+int SPI_execute_callback(const char * <parameter>command</parameter>, bool <parameter>read_only</parameter>,
+ long <parameter>count</parameter>, DestReceiver * <parameter>callback</parameter>)
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <function>SPI_execute_callback</function> is the same as
+ <function>SPI_execute</function>, except that instead of returning results
+ via <structname>SPITupleTable</structname>, the user-supplied <parameter>callback</parameter>
+ is used. Unlike
+ <function>SPI_execute</function>,
+ <function>SPI_execute_callback</function>
+ will run the callback for every SQL command passed in to <parameter>command</parameter>.
+ </para>
+
+ <para>
+ The structure <structname>DestReceiver</structname> is defined
+ as:
+<programlisting>
+typedef struct _DestReceiver DestReceiver;
+
+struct _DestReceiver
+{
+ /* Called for each tuple to be output: */
+ bool (*receiveSlot) (TupleTableSlot *slot,
+ DestReceiver *self);
+ /* Per-executor-run initialization and shutdown: */
+ void (*rStartup) (DestReceiver *self,
+ int operation,
+ TupleDesc typeinfo);
+ void (*rShutdown) (DestReceiver *self);
+ /* Destroy the receiver object itself (if dynamically allocated) */
+ void (*rDestroy) (DestReceiver *self);
+ /* CommandDest code for this receiver */
+ CommandDest mydest;
+ /* Private fields might appear beyond this point... */
+};
+</programlisting>
+
+ <structfield>(*receiveSlot)</structfield> is a function that is called for
+ every tuple generated.
+
+ <structfield>(*rStartup)</structfield> and
+ <structfield>(*rShutdown)</structfield> are called when query execution
+ starts and finishes. Their use is optional.
+
+ <structfield>(*rDestroy)</structfield> is called when the query execution
+ context is freed. If your <structname>DestReceiver</structname> dynamically
+ allocates memory, you can use <structfield>(*rDestroy)</structfield> to free
+ that memory. You should do this if <structname>DestReceiver</structname> is
+ created in a long-lived memory context.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Arguments</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><literal>const char * <parameter>command</parameter></literal></term>
+ <listitem>
+ <para>
+ string containing command to execute
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>long <parameter>count</parameter></literal></term>
+ <listitem>
+ <para>
+ maximum number of rows to return,
+ or <literal>0</> for no limit
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>DestReceiver <parameter>callback</parameter></literal></term>
+ <listitem>
+ <para>
+ callback to be executed for each tuple generated by
+ <parameter>command</parameter>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Return Value</title>
+
+ <para>
+ See <function>SPI_execute</function>.
+ </para>
+ </refsect1>
+</refentry>
+
+<!-- *********************************************** -->
+
<refentry id="spi-spi-execute-with-args">
<indexterm><primary>SPI_execute_with_args</primary></indexterm>
--
2.11.1
0003-Switch-plpython-to-using-SPI_execute_callback.patchtext/plain; charset=UTF-8; name=0003-Switch-plpython-to-using-SPI_execute_callback.patch; x-mac-creator=0; x-mac-type=0Download
From 0ce5131c8bb03117d356b555431877baeb149175 Mon Sep 17 00:00:00 2001
From: Jim Nasby <Jim.Nasby@BlueTreble.com>
Date: Wed, 5 Apr 2017 21:30:39 -0500
Subject: [PATCH 3/3] Switch plpython to using SPI_execute_callback
This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 313 ++++++++++++++++++++++++++++++++------------
3 files changed, 248 insertions(+), 81 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..fe30dbc14b 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Switch execution context (similar to MemoryContextSwitchTo */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856ccbac..236cc6d998 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+static void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+static void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
oldcontext = CurrentMemoryContext;
oldowner = CurrentResourceOwner;
+ callback = PLy_Callback_New(exec_ctx);
PLy_spi_subtransaction_begin(oldcontext, oldowner);
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
char *volatile nulls;
volatile int j;
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
- exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+ exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
- int rv;
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback = PLy_Callback_New(exec_ctx);
volatile MemoryContext oldcontext;
volatile ResourceOwner oldowner;
+ int rv;
PyObject *ret = NULL;
oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+ rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -382,94 +409,218 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+ MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ /* XXX does this really need palloc0? */
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python callback context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
+
+ /*
+ * We may be in a different execution context when we're called, so switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /*
+ * We need to store this because the TupleDesc that the receive function
+ * gets has no names.
+ */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ /* result is actually myState.result */
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives outside of
+ * an SPI context. We trust that PLy_result_dealloc() will clean it up
+ * when the time is right. The difference between result and everything
+ * else is that result needs to survive after the portal is destroyed,
+ * because result is what's handed back to the plpython function. While
+ * it's tempting to use something other than TopMemoryContext, that won't
+ * work: the user could potentially put result into the global dictionary,
+ * which means it could survive as long as the session does. This might
+ * result in a leak if an error happens and the result doesn't get
+ * dereferenced, but if that happens it means the python GC has failed us,
+ * at which point we probably have bigger problems.
+ *
+ * This still isn't perfect though; if something the result tupledesc
+ * references has it's OID changed then the tupledesc will be invalid. I'm
+ * not sure it's worth worrying about that though.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
- if (status > 0 && tuptable == NULL)
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv = 1;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+ Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark row
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ if (row)
+ {
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+ }
- cxt = AllocSetContextCreate(CurrentMemoryContext,
- "PL/Python temp context",
- ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- oldcontext = CurrentMemoryContext;
- PG_TRY();
- {
- MemoryContext oldcontext2;
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use Py_ssize_t for list
- * size and list indices; so we cannot support a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row = PLyDict_FromTuple(&args,
- tuptable->vals[i],
- tuptable->tupdesc);
+ return true;
+}
- PyList_SetItem(result->rows, i, row);
- }
- }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
+ {
/*
- * Save tuple descriptor for later use by result set metadata
- * functions. Save it in TopMemoryContext so that it survives
- * outside of an SPI context. We trust that PLy_result_dealloc()
- * will clean it up when the time is right. (Do this as late as
- * possible, to minimize the number of ways the tupdesc could get
- * leaked due to errors.)
+ * This happens if the command returned no results. Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
- }
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.11.1
On 6 April 2017 at 11:46, Craig Ringer <craig@2ndquadrant.com> wrote:
On 6 April 2017 at 09:40, Jim Nasby <jim.nasby@openscg.com> wrote:
On 4/4/17 7:44 PM, Craig Ringer wrote:
The patch crashes in initdb with --enable-cassert builds:
Thanks for the review! I'll get to the rest of it in a bit, but I'm unable
to reproduce the initdb failure. I looked at the assert line and I don't see
anything obvious either. :/Can you send your full configure call? uname -a? Mine is:
./configure --with-includes=/opt/local/include
--with-libraries=/opt/local/lib --enable-debug --with-libxml --with-tcl
--with-perl --with-python --enable-depend --enable-dtrace --enable-tap-tests
--prefix=/Users/decibel/pgsql/HEAD/i/i --with-pgport=$PGC_PORT -C
--enable-cassert --enable-debug CFLAGS='-ggdb -O0 -fno-omit-frame-pointer'./configure --prefix=/home/craig/pg/10 --enable-cassert --enable-debug
--enable-tap-tests --with-python
make -s clean
make -s -j4
make checkresults in the crash here.
... which I can't reproduce now. Even though I cleared ccache and "git
reset -fdx" before I ran the above and got the crash.
Assume it's a local system peculiarity. If I can reproduce again I'll
dig into it.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/5/17 9:08 PM, Craig Ringer wrote:
... which I can't reproduce now. Even though I cleared ccache and "git
reset -fdx" before I ran the above and got the crash.
Glad to hear that, since I can't repro at all. :)
Assume it's a local system peculiarity. If I can reproduce again I'll
dig into it.
Sounds good. Thanks!
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6 April 2017 at 11:50, Jim Nasby <jim.nasby@openscg.com> wrote:
Attached is a complete series of patches that includes the docs patch.
+ <function>SPI_execute_callback</function> is the same as
+ <function>SPI_execute</function>, except that instead of returning results
+ via <structname>SPITupleTable</structname>, the user-supplied
<parameter>callback</parameter>
+ is used. Unlike
+ <function>SPI_execute</function>,
+ <function>SPI_execute_callback</function>
+ will run the callback for every SQL command passed in to
<parameter>command</parameter>.
This doesn't explain why the user should care or prefer this approach.
Maybe after "Unlike":
"<function>SPI_execute_callback</> does not need to accumulate all the
query results into memory before the caller can process them. Instead
of building a <literal>SPI_tuptable</> containing all the results, the
supplied callback is invoked for each row processed by SPI. The row
data is passed to the callback then discarded when the callback
returns. This reduces copying and allows the application to process
results sooner."
The docs do not discuss memory lifetimes. What memory context is each
call invoked in and when is it reset? Something like:
"<literal>rStartup</>, <literal>receiveSlot</> and
<literal>rShutdown</> are all called in a memory context that is reset
for each <function>SPI_execute_callback</>."
Also, what rules apply in terms of what you can/cannot do from within
a callback? Presumably it's unsafe to perform additional SPI calls,
perform transactions, call into the executor, change the current
snapshot, etc, but I would consider that reasonably obvious. Are there
any specific things to avoid?
Under what circumstances is the query execution context [...] freed"
such that the destroy callback is called?
It is not clear to me when reading the document that the user of
SPI_execute_callback is expected to define their own
DestReceiver-compatible struct, and that the private fields that "may
appear beyond this point" are those defined by the application. How
does the app know that its DestReceiver is compatible with the result
of CreateDestReceiver(DestSPICallback)? Should it copy the fields?
etc. Your PLPython example incorporates DestReceiver by-value in its
own state struct; maybe your docs program listing should illustrate
that instead, show how to initialize the destreceiver member, and
separately list the callbacks in destreceiver in an <itemisedlist> ?
It definitely needs to mention the relevant parts of plpython to look
at for an example of usage.
Right now, the docs don't include a concrete example, because adding one
would be a pretty large if it demonstrated real usage, which presumably
means Yet Another Contrib Module strictly for the purpose of demonstrating
something. Rather than doing that, ISTM it'd be better to point the user at
what plpythonu is doing.
Seems fine to me.
Notes on the docs aside, I am pretty happy with this and think it's
reasonable to proceed with it for Pg 10.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6 April 2017 at 15:38, Craig Ringer <craig@2ndquadrant.com> wrote:
Notes on the docs aside, I am pretty happy with this and think it's
reasonable to proceed with it for Pg 10.
Actually, I'm a bit hesitant about returning a static struct that you
expect callers to copy and modify. But it seems to be an issue with
CreateDestReceiver's interface where it mixes returning pointers to
static structs with returning pointers to palloc'd memory, not a new
issue added by this patch.
I think you're better off bypassing CreateDestReceiver here, and doing
what CreateTransientRelDestReceiver(), etc do directly instead. That
way there's no need for the hoop jumping of returning a pointer to a
static struct to memcpy() into another struct in:
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback),
sizeof(DestReceiver));
Just document that the spi callback struct must have DestReciever as
its first member and must be palloc0'd.
But otherwise, pending docs changes, I think it's ready for committer.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/6/17 03:50, Craig Ringer wrote:
But otherwise, pending docs changes, I think it's ready for committer.
My opinion is still that this is ultimately the wrong approach. The
right fix for performance issues in PL/Python is to change PL/Python not
to materialize the list of tuples. Now with this change we would be
moving from two result materializations to one, but I think we are
keeping the wrong one.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/6/17 9:04 AM, Peter Eisentraut wrote:
On 4/6/17 03:50, Craig Ringer wrote:
But otherwise, pending docs changes, I think it's ready for committer.
My opinion is still that this is ultimately the wrong approach. The
right fix for performance issues in PL/Python is to change PL/Python not
to materialize the list of tuples. Now with this change we would be
moving from two result materializations to one, but I think we are
keeping the wrong one.
That's an option for future improvement, but I see no way to accomplish
that without completely breaking plpy.
I think the best way to handle this would be to allow plpython functions
to define their own callback function, which would be handed a python
tuple that was translated from the SPI result tuple. How best to do that
without breaking plpy will require some thought though.
In the meantime, I don't think a 27% performance gain is anything to
sneeze at, and the SPI changes would be directly usable by pl/r and pl/tcl.
--
Jim C. Nasby, Data Architect jim@nasby.net
512.569.9461 (cell) http://jim.nasby.net
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
On 4/6/17 03:50, Craig Ringer wrote:
But otherwise, pending docs changes, I think it's ready for committer.
My opinion is still that this is ultimately the wrong approach. The
right fix for performance issues in PL/Python is to change PL/Python not
to materialize the list of tuples. Now with this change we would be
moving from two result materializations to one, but I think we are
keeping the wrong one.
I just looked at these patches for the first time, and TBH I'm a bit
dubious too. I don't particularly have an opinion about the 0003 patch,
but I think that's mostly what Peter is on about. I do have opinions
about 0001 (and 0002, which ought to be merged; we do not commit features
separately from documentation around here).
I can certainly get on board with the idea of letting a SPI caller provide
a DestReceiver instead of accumulating the query results into a
SPITupleTable, but the way it was done here seems quite bizarre. I think
for instance you've mishandled non-canSetTag queries; those should have
any results discarded, full stop. External callers will only be
interested in the result of the canSetTag subquery.
I don't much like DestSPICallback either. We may well need some better
way for extension code to create a custom DestReceiver that does something
out of the ordinary with query result tuples. But if so, it should not be
tied to SPI, not even just by name.
I think that 0001/0002 need to be refactored as (A) a change to make
DestReceiver creation more flexible, and then (B) a change to SPI to allow
a caller to pass in the receiver to use.
After poking around a bit, it seems like we've allowed the original
notion of CommandDest to get way out of hand. The only places where
CreateDestReceiver gets called with a non-constant argument (that is,
where the caller doesn't know perfectly well which kind of DestReceiver
it wants) are two calls in postgres.c, and both of those are ultimately
passing whereToSendOutput, which has got only a really limited set of
possible values. I am thinking that we should cut down enum CommandDest
to be just
DestNone, /* results are discarded */
DestDebug, /* results go to debugging output */
DestRemote, /* results sent to frontend process */
DestOther /* something else */
and change CreateDestReceiver so it throws an error for DestOther; the
way to create any type of receiver other than these few is to call the
underlying creation function directly, rather than going through
CreateDestReceiver.
Having done that, the means for creating a custom receiver is just to
set up an appropriate struct that embeds struct _DestReceiver and
always has mydest = DestOther (or maybe we should just lose the mydest
field altogether). We could document that a bit better, but it's really
not complicated.
There's enough cruft that's accumulated in this area that this would
probably not be an entirely trivial patch, but I think it would be
a good way to make things cleaner.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 7 April 2017 at 00:54, Tom Lane <tgl@sss.pgh.pa.us> wrote:
I can certainly get on board with the idea of letting a SPI caller provide
a DestReceiver instead of accumulating the query results into a
SPITupleTable, but the way it was done here seems quite bizarre. I think
for instance you've mishandled non-canSetTag queries; those should have
any results discarded, full stop. External callers will only be
interested in the result of the canSetTag subquery.
That's something I didn't even know was a thing to look for. Thanks
for spotting it.
For other readers who may also be confused, this refers to internally
generated queries that should not be visible to the caller. We use
these in things like CREATE SCHEMA, ALTER TABLE, in FKs, etc.
I wasn't aware that such queries could ever return a result set, though.
I don't much like DestSPICallback either. We may well need some better
way for extension code to create a custom DestReceiver that does something
out of the ordinary with query result tuples. But if so, it should not be
tied to SPI, not even just by name.I think that 0001/0002 need to be refactored as (A) a change to make
DestReceiver creation more flexible, and then (B) a change to SPI to allow
a caller to pass in the receiver to use.
That's exactly what I tried to avoid suggesting upthread, since it'd
be quite much more invasive than the current patch, though definitely
a desirable cleanup.
Personally I think this patch should be allowed to bypass
CreateDestReceiver and create its own struct. I don't really see that
it should be required to clean up the whole API first.
It's on the pointy end for Pg10, and I thought we'd be fine to include
this in pg10 then aim to clean up DestReceiver in early pg11, or even
as a post-feature-freeze refactoring fixup in pg10. Should the
callback approach be blocked because the API it has to use is a bit
ugly?
After poking around a bit, it seems like we've allowed the original
notion of CommandDest to get way out of hand. The only places where
CreateDestReceiver gets called with a non-constant argument (that is,
where the caller doesn't know perfectly well which kind of DestReceiver
it wants) are two calls in postgres.c, and both of those are ultimately
passing whereToSendOutput, which has got only a really limited set of
possible values. I am thinking that we should cut down enum CommandDest
to be justDestNone, /* results are discarded */
DestDebug, /* results go to debugging output */
DestRemote, /* results sent to frontend process */
DestOther /* something else */and change CreateDestReceiver so it throws an error for DestOther; the
way to create any type of receiver other than these few is to call the
underlying creation function directly, rather than going through
CreateDestReceiver.Having done that, the means for creating a custom receiver is just to
set up an appropriate struct that embeds struct _DestReceiver and
always has mydest = DestOther (or maybe we should just lose the mydest
field altogether). We could document that a bit better, but it's really
not complicated.
I strongly agree that this is the way DestReceiver creation should be
refactored.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Craig Ringer <craig@2ndquadrant.com> writes:
On 7 April 2017 at 00:54, Tom Lane <tgl@sss.pgh.pa.us> wrote:
... External callers will only be
interested in the result of the canSetTag subquery.
I wasn't aware that such queries could ever return a result set, though.
Possibly not, but the point is that they should be invisible to the
caller. As the patch is set up, I think they'd look like empty result
sets instead, because the passed-in DestReceiver is used for them.
What we want is to use DestNone for non-canSetTag queries, and either
DestSPI or the caller's receiver for the canSetTag query.
Personally I think this patch should be allowed to bypass
CreateDestReceiver and create its own struct. I don't really see that
it should be required to clean up the whole API first.
Well, if you bypass CreateDestReceiver then the question is what you're
putting in mydest and whether anything will get confused by that. The
core problem with the existing API is that there's no provision for
adding new kinds of DestReceivers without a corresponding addition to
the CommandDest enum. I think we really need some non-kluge solution
to that before moving forward.
It's on the pointy end for Pg10, and I thought we'd be fine to include
this in pg10 then aim to clean up DestReceiver in early pg11, or even
as a post-feature-freeze refactoring fixup in pg10. Should the
callback approach be blocked because the API it has to use is a bit
ugly?
Given Peter's objections, I don't think this is getting into v10 anyway,
so we might as well take a bit more time and do it right.
Also, I'm entirely -1 on "post-feature-freeze refactoring fixups".
We're going to have more than enough to do trying to stabilize the
existing committed code, I fear (cf Robert's pessimistic summary of
the open-items list, a couple days ago). We don't need to be
planning on doing new design post-freeze, whether it's painted as
mere refactoring or not.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/6/17 8:13 PM, Tom Lane wrote:
It's on the pointy end for Pg10, and I thought we'd be fine to include
this in pg10 then aim to clean up DestReceiver in early pg11, or even
as a post-feature-freeze refactoring fixup in pg10. Should the
callback approach be blocked because the API it has to use is a bit
ugly?Given Peter's objections, I don't think this is getting into v10 anyway,
so we might as well take a bit more time and do it right.
Well, Peter's objection is that we're not going far enough in plpython,
but there's absolutely no way to do more without breaking plpy, which
seems a non-starter. We should certainly be able to expand the existing
API to provide even more benefit, but I see no reason to leave the
performance gain this patch provides on the floor just because there's
more to be had with a different API.
Also, I'm entirely -1 on "post-feature-freeze refactoring fixups".
We're going to have more than enough to do trying to stabilize the
existing committed code, I fear (cf Robert's pessimistic summary of
the open-items list, a couple days ago). We don't need to be
planning on doing new design post-freeze, whether it's painted as
mere refactoring or not.
Agreed, and I agree that the current patch is a bit of a hack when it
comes to DestReceiver (or really, DestReceiver has become an ugly wart
over the years, as you pointed out).
I'll plan to pick this up again once the dust settles on this commitfest.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-04-06 09:14:43 -0700, Jim Nasby wrote:
On 4/6/17 9:04 AM, Peter Eisentraut wrote:
On 4/6/17 03:50, Craig Ringer wrote:
But otherwise, pending docs changes, I think it's ready for committer.
My opinion is still that this is ultimately the wrong approach. The
right fix for performance issues in PL/Python is to change PL/Python not
to materialize the list of tuples. Now with this change we would be
moving from two result materializations to one, but I think we are
keeping the wrong one.That's an option for future improvement, but I see no way to accomplish that
without completely breaking plpy.
Why? We could very well return a somewhat "smarter" object. Returning
rows row-by-row if accessed via iterator, materializes when accessed via
row offset.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/6/17 9:04 PM, Andres Freund wrote:
On 2017-04-06 09:14:43 -0700, Jim Nasby wrote:
On 4/6/17 9:04 AM, Peter Eisentraut wrote:
On 4/6/17 03:50, Craig Ringer wrote:
But otherwise, pending docs changes, I think it's ready for committer.
My opinion is still that this is ultimately the wrong approach. The
right fix for performance issues in PL/Python is to change PL/Python not
to materialize the list of tuples. Now with this change we would be
moving from two result materializations to one, but I think we are
keeping the wrong one.That's an option for future improvement, but I see no way to accomplish that
without completely breaking plpy.Why? We could very well return a somewhat "smarter" object. Returning
rows row-by-row if accessed via iterator, materializes when accessed via
row offset.
I completely agree with that. What I don't understand is the objection
to speeding up the old access method. Or are you thinking we'd just
abandon the old method?
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-04-06 21:06:59 -0700, Jim Nasby wrote:
On 4/6/17 9:04 PM, Andres Freund wrote:
On 2017-04-06 09:14:43 -0700, Jim Nasby wrote:
On 4/6/17 9:04 AM, Peter Eisentraut wrote:
On 4/6/17 03:50, Craig Ringer wrote:
But otherwise, pending docs changes, I think it's ready for committer.
My opinion is still that this is ultimately the wrong approach. The
right fix for performance issues in PL/Python is to change PL/Python not
to materialize the list of tuples. Now with this change we would be
moving from two result materializations to one, but I think we are
keeping the wrong one.That's an option for future improvement, but I see no way to accomplish that
without completely breaking plpy.Why? We could very well return a somewhat "smarter" object. Returning
rows row-by-row if accessed via iterator, materializes when accessed via
row offset.I completely agree with that. What I don't understand is the objection to
speeding up the old access method. Or are you thinking we'd just abandon the
old method?
What I'm saying is that we can do that transparently, with the current
API. And there's no need to materialize anything in plpython, we can
transparently use the SPI materialized version.
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Apr 6, 2017, at 9:10 PM, Andres Freund <andres@anarazel.de> wrote:
Why? We could very well return a somewhat "smarter" object. Returning
rows row-by-row if accessed via iterator, materializes when accessed via
row offset.I completely agree with that. What I don't understand is the objection to
speeding up the old access method. Or are you thinking we'd just abandon the
old method?What I'm saying is that we can do that transparently, with the current
API. And there's no need to materialize anything in plpython, we can
transparently use the SPI materialized version.
Oh, just switching from a list to an iterator. Ok, I finally get it.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Jim Nasby <jim.nasby@openscg.com> writes:
On 4/6/17 8:13 PM, Tom Lane wrote:
Given Peter's objections, I don't think this is getting into v10 anyway,
so we might as well take a bit more time and do it right.
Well, Peter's objection is that we're not going far enough in plpython,
but there's absolutely no way to do more without breaking plpy, which
seems a non-starter. We should certainly be able to expand the existing
API to provide even more benefit, but I see no reason to leave the
performance gain this patch provides on the floor just because there's
more to be had with a different API.
Personally I'm way more excited about what a SPI feature like this
could do for plpgsql than about what it can do for plpython. If the
latter is what floats your boat, that's fine; but I want a feature
that we can build on for other uses, not a hack that we know we need
to redesign next month.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-04-07 00:11:59 -0400, Tom Lane wrote:
Jim Nasby <jim.nasby@openscg.com> writes:
On 4/6/17 8:13 PM, Tom Lane wrote:
Given Peter's objections, I don't think this is getting into v10 anyway,
so we might as well take a bit more time and do it right.Well, Peter's objection is that we're not going far enough in plpython,
but there's absolutely no way to do more without breaking plpy, which
seems a non-starter. We should certainly be able to expand the existing
API to provide even more benefit, but I see no reason to leave the
performance gain this patch provides on the floor just because there's
more to be had with a different API.Personally I'm way more excited about what a SPI feature like this
could do for plpgsql than about what it can do for plpython. If the
latter is what floats your boat, that's fine; but I want a feature
that we can build on for other uses, not a hack that we know we need
to redesign next month.
Dislike of the proposed implementation, alternative proposals, and the
refutation of the "absolutely no way to do more without breaking plpy"
argument leads to me to conclude that this should be returned with
feedback.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 4/6/17 9:21 PM, Andres Freund wrote:
Personally I'm way more excited about what a SPI feature like this
could do for plpgsql than about what it can do for plpython. If the
latter is what floats your boat, that's fine; but I want a feature
that we can build on for other uses, not a hack that we know we need
to redesign next month.
Yeah, I thought about plpgsql and I can't see any way to make that work
through an SPI callback (perhaps just due to my ignorance on things C).
I suspect what plpgsql actually wants is a way to tell SPI to start the
executor up, a function that pulls individual tuples out of the
executor, and then a function to shut the executor down.
Dislike of the proposed implementation, alternative proposals, and the
refutation of the "absolutely no way to do more without breaking plpy"
argument leads to me to conclude that this should be returned with
feedback.
Agreed.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG http://OpenSCG.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12/20/16 23:14, Jim Nasby wrote:
I'm guessing one issue might be that
we don't want to call an external interpreter while potentially holding page
pins, but even then couldn't we just copy a single tuple at a time and save
a huge amount of palloc overhead?
On 04/06/17 03:38, Craig Ringer wrote:
Also, what rules apply in terms of what you can/cannot do from within
a callback? Presumably it's unsafe to perform additional SPI calls,
perform transactions, call into the executor, change the current
snapshot, etc, but I would consider that reasonably obvious. Are there
any specific things to avoid?
Confessing, right up front, that I'm not very familiar with the executor
or DestReceiver code, but thinking of issues that might be expected with
PLs, I wonder if there could be a design where the per-tuple callback
could sometimes return a status HAVE_SLOW_STUFF_TO_DO.
If it does, the executor could release some pins or locks, stack some
state, whatever allows it to (as far as practicable) relax restrictions
on what the callback would be allowed to do, then reinvoke the callback,
not with another tuple, but with OK_GO_DO_YOUR_SLOW_STUFF.
On return from that call, the executor could reacquire its stacked
state/locks/pins and resume generating tuples.
That way, a callback could, say, return normally 9 out of 10 times, just
quickly buffering up 10 tuples, and every 10th time return SLOW_STUFF_TO_DO
and get a chance to jump into the PL interpreter and deal with those 10 ...
(a) minimizing the restrictions on what the PL routine may do, and (b)
allowing any costs of state-stacking/lock-releasing-reacquiring, and control
transfer to the interpreter, to be amortized over some number of tuples.
How many tuples that should be might be an empirical question for any given
PL, but with a protocol like this, the callback has an easy way to control
it.
Or would that be overcomplicated?
-Chap
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
So the conclusion at the end of the last commitfest was that this patch
should be marked Returned With Feedback, and no new work appears to have
been done on it since then. Why is it in this fest at all? There
certainly doesn't seem to be any reason to review it again.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 09/12/2017 03:41 PM, Tom Lane wrote:
So the conclusion at the end of the last commitfest was that this patch
should be marked Returned With Feedback, and no new work appears to have
been done on it since then. Why is it in this fest at all? There
certainly doesn't seem to be any reason to review it again.
I'm not sure how to read the history of the CF entry. Could it
have rolled over to 2017-09 by default if its status was simply
never changed to Returned with Feedback as intended in the last
one? The history doesn't seem to show anything since 2017-03-19.
I would still advocate for a fast-callback/slow-callback distinction,
as in
/messages/by-id/59813946.40508@anastigmatix.net
if that does not seem overcomplicated to more experienced hands.
-Chap
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Chapman Flack <chap@anastigmatix.net> writes:
On 09/12/2017 03:41 PM, Tom Lane wrote:
So the conclusion at the end of the last commitfest was that this patch
should be marked Returned With Feedback, and no new work appears to have
been done on it since then. Why is it in this fest at all? There
certainly doesn't seem to be any reason to review it again.
I'm not sure how to read the history of the CF entry. Could it
have rolled over to 2017-09 by default if its status was simply
never changed to Returned with Feedback as intended in the last
one? The history doesn't seem to show anything since 2017-03-19.
Maybe, or whoever was closing out the last CF didn't notice Andres'
recommendation to mark it RWF.
I would still advocate for a fast-callback/slow-callback distinction,
as in
/messages/by-id/59813946.40508@anastigmatix.net
if that does not seem overcomplicated to more experienced hands.
I did not see any reason given in the thread why we should need that.
If you want to accumulate tuples ten at a time before you do something
with them, you can do that now, by calling ExecutorRun with count=10.
(plpgsql does something much like that IIRC.) The only reason not to
just use count=1 is that ExecutorRun and ExecutePlan have accumulated
assorted startup/shutdown cruft on the assumption that their runtime
didn't particularly matter. It still doesn't look that awful, but
it might be noticeable.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 09/12/17 17:00, Tom Lane wrote:
I did not see any reason given in the thread why we should need that.
If you want to accumulate tuples ten at a time before you do something
with them, you can do that now, by calling ExecutorRun with count=10.
Ah, that sounds easy enough. I'll withdraw the more-complicated suggestion.
-Chap
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12 Sep 2017, at 23:00, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Chapman Flack <chap@anastigmatix.net> writes:
On 09/12/2017 03:41 PM, Tom Lane wrote:
So the conclusion at the end of the last commitfest was that this patch
should be marked Returned With Feedback, and no new work appears to have
been done on it since then. Why is it in this fest at all? There
certainly doesn't seem to be any reason to review it again.I'm not sure how to read the history of the CF entry. Could it
have rolled over to 2017-09 by default if its status was simply
never changed to Returned with Feedback as intended in the last
one? The history doesn't seem to show anything since 2017-03-19.Maybe, or whoever was closing out the last CF didn't notice Andres'
recommendation to mark it RWF.
It doesn’t seem to have been moved to this CF but was actually created here in
the first place. Reading this thread it seems like there is clear concensus on
the status though so changing to RWF.
cheers ./daniel
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers