From a2ae44429a68f9776d99285930b16f46c4c9197d Mon Sep 17 00:00:00 2001 From: amitlan Date: Wed, 16 Nov 2022 10:49:45 +0900 Subject: [PATCH v1] Teach crosstab() to use SPI_cursor_* interface Fixes memory issues for input queries that return many rows. --- contrib/tablefunc/tablefunc.c | 368 +++++++++++++++++++--------------- 1 file changed, 204 insertions(+), 164 deletions(-) diff --git a/contrib/tablefunc/tablefunc.c b/contrib/tablefunc/tablefunc.c index b967e6d4be..feebe497d7 100644 --- a/contrib/tablefunc/tablefunc.c +++ b/contrib/tablefunc/tablefunc.c @@ -356,12 +356,12 @@ crosstab(PG_FUNCTION_ARGS) { char *sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - Tuplestorestate *tupstore; + Tuplestorestate *tupstore = NULL; TupleDesc tupdesc; uint64 call_cntr; uint64 max_calls; AttInMetadata *attinmeta; - SPITupleTable *spi_tuptable; + SPITupleTable *spi_tuptable = NULL; TupleDesc spi_tupdesc; bool firstpass; char *lastrowid; @@ -371,6 +371,9 @@ crosstab(PG_FUNCTION_ARGS) MemoryContext oldcontext; int ret; uint64 proc; + SPIPlanPtr plan; + Portal portal; + bool tupstore_initialized = false; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) @@ -389,204 +392,244 @@ crosstab(PG_FUNCTION_ARGS) /* internal error */ elog(ERROR, "crosstab: SPI_connect returned %d", ret); - /* Retrieve the desired rows */ - ret = SPI_execute(sql, true, 0); - proc = SPI_processed; - - /* If no qualifying tuples, fall out early */ - if (ret != SPI_OK_SELECT || proc == 0) - { - SPI_finish(); - rsinfo->isDone = ExprEndResult; - PG_RETURN_NULL(); - } + /* Create an SPI plan and a cursor for fetching the result rows. */ + if ((plan = SPI_prepare(sql, 0, NULL)) == NULL) + /* internal error */ + elog(ERROR, "SPI_prepare(\"%s\") failed", sql); - spi_tuptable = SPI_tuptable; - spi_tupdesc = spi_tuptable->tupdesc; + if ((portal = SPI_cursor_open(NULL, plan, NULL, NULL, true)) == NULL) + /* internal error */ + elog(ERROR, "SPI_cursor_open(\"%s\") failed", sql); - /*---------- - * The provided SQL query must always return three columns. - * - * 1. rowname - * the label or identifier for each row in the final result - * 2. category - * the label or identifier for each column in the final result - * 3. values - * the value for each column in the final result - *---------- + /* + * Fetch rows from the cursor in batches to avoid overloading + * SPI_tuptable and store them in the result tuplestore. */ - if (spi_tupdesc->natts != 3) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid source data SQL statement"), - errdetail("The provided SQL must return 3 " - "columns: rowid, category, and values."))); - - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) +#define CROSSTAB_BATCHSIZE 10000 + for (;;) { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("return type must be a row type"))); - break; - } + /* Retrieve the desired rows */ + SPI_cursor_fetch(portal, true, CROSSTAB_BATCHSIZE); + proc = SPI_processed; - /* - * Check that return tupdesc is compatible with the data we got from SPI, - * at least based on number and type of attributes - */ - if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("return and sql tuple descriptions are " \ - "incompatible"))); + /* If no qualifying tuples, fall out early */ + if (proc == 0) + goto done; - /* - * switch to long-lived memory context - */ - oldcontext = MemoryContextSwitchTo(per_query_ctx); + spi_tuptable = SPI_tuptable; + spi_tupdesc = spi_tuptable->tupdesc; - /* make sure we have a persistent copy of the result tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); + /* + * If not already done, initialize the tuplestore to return the result + * in. + */ + if (!tupstore_initialized) + { + /* Sanity checks. */ + + /*---------- + * The provided SQL query must always return three columns. + * + * 1. rowname + * the label or identifier for each row in the final result + * 2. category + * the label or identifier for each column in the final result + * 3. values + * the value for each column in the final result + *---------- + */ + if (spi_tupdesc->natts != 3) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid source data SQL statement"), + errdetail("The provided SQL must return 3 " + "columns: rowid, category, and values."))); - /* initialize our tuplestore in long-lived context */ - tupstore = - tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random, - false, work_mem); + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type must be a row type"))); + break; + } - MemoryContextSwitchTo(oldcontext); + /* + * Check that return tupdesc is compatible with the data we got + * from SPI, at least based on number and type of attributes + */ + if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("return and sql tuple descriptions are " \ + "incompatible"))); + /* + * switch to long-lived memory context + */ + oldcontext = MemoryContextSwitchTo(per_query_ctx); - /* - * Generate attribute metadata needed later to produce tuples from raw C - * strings - */ - attinmeta = TupleDescGetAttInMetadata(tupdesc); + /* make sure we have a persistent copy of the result tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); - /* total number of tuples to be examined */ - max_calls = proc; + /* initialize our tuplestore in long-lived context */ + tupstore = + tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random, + false, work_mem); - /* the return tuple always must have 1 rowid + num_categories columns */ - num_categories = tupdesc->natts - 1; + MemoryContextSwitchTo(oldcontext); - firstpass = true; - lastrowid = NULL; + /* + * Generate attribute metadata needed later to produce tuples from + * raw C strings + */ + attinmeta = TupleDescGetAttInMetadata(tupdesc); - for (call_cntr = 0; call_cntr < max_calls; call_cntr++) - { - bool skip_tuple = false; - char **values; + /* total number of tuples to be examined */ + max_calls = proc; - /* allocate and zero space */ - values = (char **) palloc0((1 + num_categories) * sizeof(char *)); + /* the return tuple always must have 1 rowid + num_categories columns */ + num_categories = tupdesc->natts - 1; - /* - * now loop through the sql results and assign each value in sequence - * to the next category - */ - for (i = 0; i < num_categories; i++) - { - HeapTuple spi_tuple; - char *rowid; + tupstore_initialized = true; + } - /* see if we've gone too far already */ - if (call_cntr >= max_calls) - break; + firstpass = true; + lastrowid = NULL; - /* get the next sql result tuple */ - spi_tuple = spi_tuptable->vals[call_cntr]; + for (call_cntr = 0; call_cntr < max_calls; call_cntr++) + { + bool skip_tuple = false; + char **values; - /* get the rowid from the current sql result tuple */ - rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1); + /* allocate and zero space */ + values = (char **) palloc0((1 + num_categories) * sizeof(char *)); /* - * If this is the first pass through the values for this rowid, - * set the first column to rowid + * now loop through the sql results and assign each value in sequence + * to the next category */ - if (i == 0) + for (i = 0; i < num_categories; i++) { - xpstrdup(values[0], rowid); + HeapTuple spi_tuple; + char *rowid; + + /* see if we've gone too far already */ + if (call_cntr >= max_calls) + break; + + /* get the next sql result tuple */ + spi_tuple = spi_tuptable->vals[call_cntr]; + + /* get the rowid from the current sql result tuple */ + rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1); /* - * Check to see if the rowid is the same as that of the last - * tuple sent -- if so, skip this tuple entirely + * If this is the first pass through the values for this rowid, + * set the first column to rowid */ - if (!firstpass && xstreq(lastrowid, rowid)) + if (i == 0) { + xpstrdup(values[0], rowid); + + /* + * Check to see if the rowid is the same as that of the last + * tuple sent -- if so, skip this tuple entirely + */ + if (!firstpass && xstreq(lastrowid, rowid)) + { + xpfree(rowid); + skip_tuple = true; + break; + } + } + + /* + * If rowid hasn't changed on us, continue building the output + * tuple. + */ + if (xstreq(rowid, values[0])) + { + /* + * Get the next category item value, which is always + * attribute number three. + * + * Be careful to assign the value to the array index based + * on which category we are presently processing. + */ + values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3); + + /* + * increment the counter since we consume a row for each + * category, but not for last pass because the outer loop will + * do that for us + */ + if (i < (num_categories - 1)) + call_cntr++; + xpfree(rowid); + } + else + { + /* + * We'll fill in NULLs for the missing values, but we need + * to decrement the counter since this sql result row + * doesn't belong to the current output tuple. + */ + call_cntr--; xpfree(rowid); - skip_tuple = true; break; } } - /* - * If rowid hasn't changed on us, continue building the output - * tuple. - */ - if (xstreq(rowid, values[0])) + if (!skip_tuple) { - /* - * Get the next category item value, which is always attribute - * number three. - * - * Be careful to assign the value to the array index based on - * which category we are presently processing. - */ - values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3); + HeapTuple tuple; - /* - * increment the counter since we consume a row for each - * category, but not for last pass because the outer loop will - * do that for us - */ - if (i < (num_categories - 1)) - call_cntr++; - xpfree(rowid); + /* build the tuple and store it */ + tuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, tuple); + heap_freetuple(tuple); } - else - { - /* - * We'll fill in NULLs for the missing values, but we need to - * decrement the counter since this sql result row doesn't - * belong to the current output tuple. - */ - call_cntr--; - xpfree(rowid); - break; - } - } - - if (!skip_tuple) - { - HeapTuple tuple; - /* build the tuple and store it */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - heap_freetuple(tuple); + /* Remember current rowid */ + xpfree(lastrowid); + xpstrdup(lastrowid, values[0]); + firstpass = false; + + /* Clean up */ + for (i = 0; i < num_categories + 1; i++) + if (values[i] != NULL) + pfree(values[i]); + pfree(values); } - /* Remember current rowid */ - xpfree(lastrowid); - xpstrdup(lastrowid, values[0]); - firstpass = false; + /* Free up the memory of this batch's rows. */ + SPI_freetuptable(spi_tuptable); + } + +done: + /* release SPI related resources (and return to caller's context) */ + SPI_freetuptable(spi_tuptable); + SPI_cursor_close(portal); + SPI_freeplan(plan); + SPI_finish(); - /* Clean up */ - for (i = 0; i < num_categories + 1; i++) - if (values[i] != NULL) - pfree(values[i]); - pfree(values); + /* If the query returned 0 zeros, let the caller know so. */ + if (!tupstore_initialized) + { + rsinfo->isDone = ExprEndResult; + PG_RETURN_NULL(); } /* let the caller know we're sending back a tuplestore */ @@ -594,9 +637,6 @@ crosstab(PG_FUNCTION_ARGS) rsinfo->setResult = tupstore; rsinfo->setDesc = tupdesc; - /* release SPI related resources (and return to caller's context) */ - SPI_finish(); - return (Datum) 0; } -- 2.35.3