psql's FETCH_COUNT (cursor) is not being respected for CTEs
Hi -hackers,
I've spent some time fighting against "out of memory" errors coming
out of psql when trying to use the cursor via FETCH_COUNT. It might be
a not so well known fact (?) that CTEs are not executed with cursor
when asked to do so, but instead silently executed with potential huge
memory allocation going on. Patch is attached. My one doubt is that
not every statement starting with "WITH" is WITH(..) SELECT of course.
Demo (one might also get the "out of memory for query result"):
postgres@hive:~$ psql -Ant --variable='FETCH_COUNT=100' -c "WITH data
AS (SELECT generate_series(1, 20000000) as Total) select repeat('a',
100) || data.Total || repeat('b', 800) as total_pat from data;"
Killed
postgres@hive:~$ tail -4 /var/log/postgresql/postgresql-14-main.log
[..]
2023-01-04 12:46:20.193 CET [32936] postgres@postgres LOG: could not
send data to client: Broken pipe
[..]
2023-01-04 12:46:20.195 CET [32936] postgres@postgres FATAL:
connection to client lost
With the patch:
postgres@hive:~$ /tmp/psql16-with-patch -Ant
--variable='FETCH_COUNT=100' -c "WITH data AS (SELECT
generate_series(1, 20000000) as Total) select repeat('a', 100) ||
data.Total || repeat('b', 800) as total_pat from data;" | wc -l
20000000
postgres@hive:~$
Regards,
-Jakub Wartak.
Attachments:
0001-psql-allow-CTE-queries-to-be-executed-also-using-cur.patchapplication/octet-stream; name=0001-psql-allow-CTE-queries-to-be-executed-also-using-cur.patchDownload
From 8d84514c4ddce412ef8cfa5296109bb223047f9c Mon Sep 17 00:00:00 2001
From: Jakub Wartak <jakub.wartak@enterprisedb.com>
Date: Wed, 4 Jan 2023 12:57:58 +0100
Subject: [PATCH] psql: allow CTE queries to be executed also using cursor
---
src/bin/psql/common.c | 3 +++
1 file changed, 3 insertions(+)
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index f5909f9547..9428f4fd75 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -2228,6 +2228,9 @@ is_select_command(const char *query)
if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
return true;
+ if (wordlen == 4 && pg_strncasecmp(query, "with", 4) == 0)
+ return true;
+
return false;
}
--
2.30.2
Jakub Wartak wrote:
It might be a not so well known fact (?) that CTEs are not executed
with cursor when asked to do so, but instead silently executed with
potential huge memory allocation going on. Patch is attached. My one
doubt is that not every statement starting with "WITH" is WITH(..)
SELECT of course.
Yes, that's why WITH queries are currently filtered out by the
FETCH_COUNT feature.
Case in point:
test=> begin;
BEGIN
test=> create table tbl(i int);
CREATE TABLE
test=> declare psql_cursor cursor for
with r(i) as (values (1))
insert into tbl(i) select i from r;
ERROR: syntax error at or near "insert"
LINE 3: insert into tbl(i) select i from r;
So the fix you're proposing would fail on that kind of queries.
A solution would be for psql to use PQsetSingleRowMode() to retrieve
results row-by-row, as opposed to using a cursor, and then allocate
memory for only FETCH_COUNT rows at a time. Incidentally it solves
other problems like queries containing multiple statements, that also
fail to work properly with cursors, or UPDATE/INSERT... RETURNING.. on
large number of rows that could also benefit from pagination in
memory.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
On Wed, Jan 4, 2023 at 10:22 AM Daniel Verite <daniel@manitou-mail.org> wrote:
A solution would be for psql to use PQsetSingleRowMode() to retrieve
results row-by-row, as opposed to using a cursor, and then allocate
memory for only FETCH_COUNT rows at a time. Incidentally it solves
other problems like queries containing multiple statements, that also
fail to work properly with cursors, or UPDATE/INSERT... RETURNING.. on
large number of rows that could also benefit from pagination in
memory.
Is there any reason that someone hasn't, like, already done this?
Because if there isn't, we should really do this. And if there is,
like say that it would hurt performance or something, then we should
come up with a fix for that problem and then do something like this.
--
Robert Haas
EDB: http://www.enterprisedb.com
Robert Haas <robertmhaas@gmail.com> writes:
On Wed, Jan 4, 2023 at 10:22 AM Daniel Verite <daniel@manitou-mail.org> wrote:
A solution would be for psql to use PQsetSingleRowMode() to retrieve
results row-by-row, as opposed to using a cursor, and then allocate
memory for only FETCH_COUNT rows at a time.
Is there any reason that someone hasn't, like, already done this?
As you well know, psql's FETCH_COUNT mechanism is far older than
single-row mode. I don't think anyone's tried to transpose it
onto that. I agree that it seems like a good idea to try.
There will be more per-row overhead, but the increase in flexibility
is likely to justify that.
regards, tom lane
On Wed, Jan 4, 2023 at 11:36 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
As you well know, psql's FETCH_COUNT mechanism is far older than
single-row mode. I don't think anyone's tried to transpose it
onto that. I agree that it seems like a good idea to try.
There will be more per-row overhead, but the increase in flexibility
is likely to justify that.
Yeah, I was vaguely worried that there might be more per-row overhead,
not that I know a lot about this topic. I wonder if there's a way to
mitigate that. I'm a bit suspicious that what we want here is really
more of an incremental mode than a single-row mode i.e. yeah, you want
to fetch rows without materializing the whole result, but maybe not in
batches of exactly size one.
--
Robert Haas
EDB: http://www.enterprisedb.com
On Wed, Jan 4, 2023 at 6:38 PM Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, Jan 4, 2023 at 11:36 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
As you well know, psql's FETCH_COUNT mechanism is far older than
single-row mode. I don't think anyone's tried to transpose it
onto that. I agree that it seems like a good idea to try.
There will be more per-row overhead, but the increase in flexibility
is likely to justify that.Yeah, I was vaguely worried that there might be more per-row overhead,
not that I know a lot about this topic. I wonder if there's a way to
mitigate that. I'm a bit suspicious that what we want here is really
more of an incremental mode than a single-row mode i.e. yeah, you want
to fetch rows without materializing the whole result, but maybe not in
batches of exactly size one.
Given the low importance and very low priority of this, how about
adding it as a TODO wiki item then and maybe adding just some warning
instead? I've intentionally avoided parsing grammar and regexp so it's
not perfect (not that I do care about this too much either, as web
crawlers already have indexed this $thread). BTW I've found two
threads if know what are you looking for [1]/messages/by-id/a0a854b6-563c-4a11-bf1c-d6c6f924004d@manitou-mail.org[2]/messages/by-id/1274761885.4261.233.camel@minidragon
-Jakub Wartak.
[1]: /messages/by-id/a0a854b6-563c-4a11-bf1c-d6c6f924004d@manitou-mail.org
[2]: /messages/by-id/1274761885.4261.233.camel@minidragon
Attachments:
0001-psql-warn-about-CTE-queries-to-be-executed-without-u.patchapplication/octet-stream; name=0001-psql-warn-about-CTE-queries-to-be-executed-without-u.patchDownload
From ded00ed978bfa15591aa30b768400517e539f1c3 Mon Sep 17 00:00:00 2001
From: Jakub Wartak <jakub.wartak@enterprisedb.com>
Date: Tue, 10 Jan 2023 13:11:45 +0100
Subject: [PATCH] psql: warn about CTE queries to be executed without using
cursor
---
src/bin/psql/common.c | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 38f9b10b7c..566f3668e3 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -2241,6 +2241,11 @@ is_select_command(const char *query)
if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
return true;
+ if (wordlen == 4 && pg_strncasecmp(query, "with", 4) == 0 &&
+ strcasestr(query, "select" ) != NULL)
+ pg_log_warning("attempt to use cursor (due to FETCH_COUNT set) "
+ "has been ignored for WITH/CTE query");
+
return false;
}
--
2.30.2
Tom Lane wrote:
I agree that it seems like a good idea to try.
There will be more per-row overhead, but the increase in flexibility
is likely to justify that.
Here's a POC patch implementing row-by-row fetching.
If it wasn't for the per-row overhead, we could probably get rid of
ExecQueryUsingCursor() and use row-by-row fetches whenever
FETCH_COUNT is set, independently of the form of the query.
However the difference in processing time seems to be substantial: on
some quick tests with FETCH_COUNT=10000, I'm seeing almost a 1.5x
increase on large datasets. I assume it's the cost of more allocations.
I would have hoped that avoiding the FETCH queries and associated
round-trips with the cursor method would compensate for that, but it
doesn't appear to be the case, at least with a fast local connection.
So in this patch, psql still uses the cursor method if the
query starts with "select", and falls back to the row-by-row in
the main code (ExecQueryAndProcessResults) otherwise.
Anyway it solves the main issue of the over-consumption of memory
for CTE and update/insert queries returning large resultsets.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
psql-fetchcount-single-row-mode.difftext/x-patch; name=psql-fetchcount-single-row-mode.diffDownload
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 00627830c4..d3de9d8336 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -372,6 +372,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_SINGLE_TUPLE:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -675,13 +676,13 @@ PrintNotifications(void)
* Returns true if successful, false otherwise.
*/
static bool
-PrintQueryTuples(const PGresult *result, const printQueryOpt *opt,
+PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt,
FILE *printQueryFout)
{
bool ok = true;
FILE *fout = printQueryFout ? printQueryFout : pset.queryFout;
- printQuery(result, opt ? opt : &pset.popt, fout, false, pset.logfile);
+ printQueryChunks(result, nresults, opt ? opt : &pset.popt, fout, false, pset.logfile);
fflush(fout);
if (ferror(fout))
{
@@ -958,7 +959,7 @@ PrintQueryResult(PGresult *result, bool last,
else if (last && pset.crosstab_flag)
success = PrintResultInCrosstab(result);
else if (last || pset.show_all_results)
- success = PrintQueryTuples(result, opt, printQueryFout);
+ success = PrintQueryTuples((const PGresult**)&result, 1, opt, printQueryFout);
else
success = true;
@@ -1369,6 +1370,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_SINGLE_TUPLE ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ pclose(gfile_fout);
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1400,10 +1442,16 @@ ExecQueryAndProcessResults(const char *query,
bool success;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ PGresult **result_array = NULL; /* to collect results in single row mode */
+ int64 total_tuples = 0;
+ int ntuples;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
@@ -1424,6 +1472,33 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-row fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetSingleRowMode(pset.db))
+ {
+ pg_log_warning("fetching results in single row mode is unavailable");
+ fetch_count = 0;
+ }
+ else
+ {
+ result_array = (PGresult**) pg_malloc(fetch_count * sizeof(PGresult*));
+ }
+ }
+ else
+ fetch_count = 0; /* disable single-row mode */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1443,6 +1518,7 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1569,6 +1645,85 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_SINGLE_TUPLE)
+ {
+ FILE *tuples_fout = printQueryFout;
+ printQueryOpt my_popt = pset.popt;
+
+ ntuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ result_array[ntuples++] = result;
+ if (ntuples == fetch_count)
+ {
+ /* TODO: handle paging */
+ /* display the current chunk of results */
+ PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+ /* clear and reuse result_array */
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += ntuples;
+ total_tuples += ntuples;
+ ntuples = 0;
+ }
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the row-by-row sequence.
+ */
+ success = false;
+ break;
+ }
+
+ if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /* TODO: merge this block with the code above? */
+ /*
+ * The last row has been read. Display the last chunk of
+ * results and the footer.
+ */
+ my_popt.topt.stop_table = true;
+ PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ total_tuples += ntuples;
+ ntuples = 0;
+
+ result = NULL;
+ {
+ /*
+ * fake SetResultVariables() as in ExecQueryUsingCursor().
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1597,7 +1752,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1606,25 +1761,10 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
@@ -1643,17 +1783,10 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- pclose(gfile_fout);
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
+
+ if (result_array)
+ pg_free(result_array);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c
index 3396f9b462..d8f0a29773 100644
--- a/src/fe_utils/print.c
+++ b/src/fe_utils/print.c
@@ -3533,17 +3533,42 @@ printTable(const printTableContent *cont,
void
printQuery(const PGresult *result, const printQueryOpt *opt,
FILE *fout, bool is_pager, FILE *flog)
+{
+ printQueryChunks(&result, 1, opt, fout, is_pager, flog);
+}
+
+/*
+ * Print the results of a query that may have been obtained by a
+ * succession of calls to PQgetResult in single-row mode.
+ *
+ * results: array of results of a successful query. They must have the same columns.
+ * nbresults: size of results
+ * opt: formatting options
+ * fout: where to print to
+ * is_pager: true if caller has already redirected fout to be a pager pipe
+ * flog: if not null, also print the data there (for --log-file option)
+ */
+void
+printQueryChunks(const PGresult *results[], int nresults, const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog)
{
printTableContent cont;
int i,
r,
c;
+ int nrows = 0; /* total number of rows */
+ int ri; /* index into results[] */
if (cancel_pressed)
return;
+ for (ri = 0; ri < nresults; ri++)
+ {
+ nrows += PQntuples(results[ri]);
+ }
+
printTableInit(&cont, &opt->topt, opt->title,
- PQnfields(result), PQntuples(result));
+ (nresults > 0) ? PQnfields(results[0]) : 0, nrows);
/* Assert caller supplied enough translate_columns[] entries */
Assert(opt->translate_columns == NULL ||
@@ -3551,34 +3576,37 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
for (i = 0; i < cont.ncolumns; i++)
{
- printTableAddHeader(&cont, PQfname(result, i),
+ printTableAddHeader(&cont, PQfname(results[0], i),
opt->translate_header,
- column_type_alignment(PQftype(result, i)));
+ column_type_alignment(PQftype(results[0], i)));
}
/* set cells */
- for (r = 0; r < cont.nrows; r++)
+ for (ri = 0; ri < nresults; ri++)
{
- for (c = 0; c < cont.ncolumns; c++)
+ for (r = 0; r < PQntuples(results[ri]); r++)
{
- char *cell;
- bool mustfree = false;
- bool translate;
-
- if (PQgetisnull(result, r, c))
- cell = opt->nullPrint ? opt->nullPrint : "";
- else
+ for (c = 0; c < cont.ncolumns; c++)
{
- cell = PQgetvalue(result, r, c);
- if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ char *cell;
+ bool mustfree = false;
+ bool translate;
+
+ if (PQgetisnull(results[ri], r, c))
+ cell = opt->nullPrint ? opt->nullPrint : "";
+ else
{
- cell = format_numeric_locale(cell);
- mustfree = true;
+ cell = PQgetvalue(results[ri], r, c);
+ if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ {
+ cell = format_numeric_locale(cell);
+ mustfree = true;
+ }
}
- }
- translate = (opt->translate_columns && opt->translate_columns[c]);
- printTableAddCell(&cont, cell, translate, mustfree);
+ translate = (opt->translate_columns && opt->translate_columns[c]);
+ printTableAddCell(&cont, cell, translate, mustfree);
+ }
}
}
diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h
index 54f783c907..3befc41bdc 100644
--- a/src/include/fe_utils/print.h
+++ b/src/include/fe_utils/print.h
@@ -220,7 +220,10 @@ extern void printTableCleanup(printTableContent *const content);
extern void printTable(const printTableContent *cont,
FILE *fout, bool is_pager, FILE *flog);
extern void printQuery(const PGresult *result, const printQueryOpt *opt,
- FILE *fout, bool is_pager, FILE *flog);
+ FILE *fout, bool is_pager, FILE *flog);
+extern void printQueryChunks(const PGresult *results[], int nresults,
+ const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog);
extern char column_type_alignment(Oid);
I wrote:
Here's a POC patch implementing row-by-row fetching.
PFA an updated patch.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
psql-fetchcount-single-row-mode-v2.difftext/x-patch; name=psql-fetchcount-single-row-mode-v2.diffDownload
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index f907f5d4e8..ad5e8a5de9 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -372,6 +372,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_SINGLE_TUPLE:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -675,13 +676,13 @@ PrintNotifications(void)
* Returns true if successful, false otherwise.
*/
static bool
-PrintQueryTuples(const PGresult *result, const printQueryOpt *opt,
+PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt,
FILE *printQueryFout)
{
bool ok = true;
FILE *fout = printQueryFout ? printQueryFout : pset.queryFout;
- printQuery(result, opt ? opt : &pset.popt, fout, false, pset.logfile);
+ printQueryChunks(result, nresults, opt ? opt : &pset.popt, fout, false, pset.logfile);
fflush(fout);
if (ferror(fout))
{
@@ -958,7 +959,7 @@ PrintQueryResult(PGresult *result, bool last,
else if (last && pset.crosstab_flag)
success = PrintResultInCrosstab(result);
else if (last || pset.show_all_results)
- success = PrintQueryTuples(result, opt, printQueryFout);
+ success = PrintQueryTuples((const PGresult**)&result, 1, opt, printQueryFout);
else
success = true;
@@ -1371,6 +1372,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_SINGLE_TUPLE ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ pclose(gfile_fout);
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1402,10 +1444,16 @@ ExecQueryAndProcessResults(const char *query,
bool success;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ PGresult **result_array = NULL; /* to collect results in single row mode */
+ int64 total_tuples = 0;
+ int ntuples;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
@@ -1428,6 +1476,33 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-row fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetSingleRowMode(pset.db))
+ {
+ pg_log_warning("fetching results in single row mode is unavailable");
+ fetch_count = 0;
+ }
+ else
+ {
+ result_array = (PGresult**) pg_malloc(fetch_count * sizeof(PGresult*));
+ }
+ }
+ else
+ fetch_count = 0; /* disable single-row mode */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1447,6 +1522,7 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1573,6 +1649,96 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_SINGLE_TUPLE)
+ {
+ FILE *tuples_fout = printQueryFout;
+ printQueryOpt my_popt = pset.popt;
+ bool is_pager = false;
+ int flush_error = 0;
+
+ ntuples = 0;
+ total_tuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ result_array[ntuples++] = result;
+ if (ntuples == fetch_count)
+ {
+ /* pager: open at most once per resultset */
+ if (tuples_fout == stdout && !is_pager)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+ /* display the current chunk of results unless the output stream is not working */
+ if (!flush_error)
+ {
+ PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+ flush_error = fflush(tuples_fout);
+ }
+ /* clear and reuse result_array */
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += ntuples;
+ total_tuples += ntuples;
+ ntuples = 0;
+ }
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the row-by-row sequence.
+ */
+ success = false;
+ break;
+ }
+
+ if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /* TODO: merge this block with the code above? */
+ /*
+ * The last row has been read. Display the last chunk of
+ * results and the footer.
+ */
+ my_popt.topt.stop_table = true;
+ if (!flush_error)
+ {
+ PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
+ flush_error = fflush(tuples_fout);
+ }
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ total_tuples += ntuples;
+ ntuples = 0;
+
+ if (is_pager)
+ {
+ ClosePager(tuples_fout);
+ }
+
+ result = NULL;
+ /*partial_display_rowcount = total_tuples;*/
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1601,7 +1767,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1610,33 +1776,32 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables on last result if all went well */
if (!is_watch && last && success)
+ {
SetResultVariables(result, true);
+ if (partial_display)
+ {
+ /*
+ * fake SetResultVariables() as in ExecQueryUsingCursor().
+ */
+ char buf[32];
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ }
+
ClearOrSaveResult(result);
result = next_result;
@@ -1647,17 +1812,10 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- pclose(gfile_fout);
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
+
+ if (result_array)
+ pg_free(result_array);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c
index 3396f9b462..d8f0a29773 100644
--- a/src/fe_utils/print.c
+++ b/src/fe_utils/print.c
@@ -3533,17 +3533,42 @@ printTable(const printTableContent *cont,
void
printQuery(const PGresult *result, const printQueryOpt *opt,
FILE *fout, bool is_pager, FILE *flog)
+{
+ printQueryChunks(&result, 1, opt, fout, is_pager, flog);
+}
+
+/*
+ * Print the results of a query that may have been obtained by a
+ * succession of calls to PQgetResult in single-row mode.
+ *
+ * results: array of results of a successful query. They must have the same columns.
+ * nbresults: size of results
+ * opt: formatting options
+ * fout: where to print to
+ * is_pager: true if caller has already redirected fout to be a pager pipe
+ * flog: if not null, also print the data there (for --log-file option)
+ */
+void
+printQueryChunks(const PGresult *results[], int nresults, const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog)
{
printTableContent cont;
int i,
r,
c;
+ int nrows = 0; /* total number of rows */
+ int ri; /* index into results[] */
if (cancel_pressed)
return;
+ for (ri = 0; ri < nresults; ri++)
+ {
+ nrows += PQntuples(results[ri]);
+ }
+
printTableInit(&cont, &opt->topt, opt->title,
- PQnfields(result), PQntuples(result));
+ (nresults > 0) ? PQnfields(results[0]) : 0, nrows);
/* Assert caller supplied enough translate_columns[] entries */
Assert(opt->translate_columns == NULL ||
@@ -3551,34 +3576,37 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
for (i = 0; i < cont.ncolumns; i++)
{
- printTableAddHeader(&cont, PQfname(result, i),
+ printTableAddHeader(&cont, PQfname(results[0], i),
opt->translate_header,
- column_type_alignment(PQftype(result, i)));
+ column_type_alignment(PQftype(results[0], i)));
}
/* set cells */
- for (r = 0; r < cont.nrows; r++)
+ for (ri = 0; ri < nresults; ri++)
{
- for (c = 0; c < cont.ncolumns; c++)
+ for (r = 0; r < PQntuples(results[ri]); r++)
{
- char *cell;
- bool mustfree = false;
- bool translate;
-
- if (PQgetisnull(result, r, c))
- cell = opt->nullPrint ? opt->nullPrint : "";
- else
+ for (c = 0; c < cont.ncolumns; c++)
{
- cell = PQgetvalue(result, r, c);
- if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ char *cell;
+ bool mustfree = false;
+ bool translate;
+
+ if (PQgetisnull(results[ri], r, c))
+ cell = opt->nullPrint ? opt->nullPrint : "";
+ else
{
- cell = format_numeric_locale(cell);
- mustfree = true;
+ cell = PQgetvalue(results[ri], r, c);
+ if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ {
+ cell = format_numeric_locale(cell);
+ mustfree = true;
+ }
}
- }
- translate = (opt->translate_columns && opt->translate_columns[c]);
- printTableAddCell(&cont, cell, translate, mustfree);
+ translate = (opt->translate_columns && opt->translate_columns[c]);
+ printTableAddCell(&cont, cell, translate, mustfree);
+ }
}
}
diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h
index 54f783c907..3befc41bdc 100644
--- a/src/include/fe_utils/print.h
+++ b/src/include/fe_utils/print.h
@@ -220,7 +220,10 @@ extern void printTableCleanup(printTableContent *const content);
extern void printTable(const printTableContent *cont,
FILE *fout, bool is_pager, FILE *flog);
extern void printQuery(const PGresult *result, const printQueryOpt *opt,
- FILE *fout, bool is_pager, FILE *flog);
+ FILE *fout, bool is_pager, FILE *flog);
+extern void printQueryChunks(const PGresult *results[], int nresults,
+ const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog);
extern char column_type_alignment(Oid);
"Daniel Verite" <daniel@manitou-mail.org> writes:
PFA an updated patch.
This gives me several "-Wincompatible-pointer-types" warnings
(as are also reported by the cfbot):
common.c: In function 'ExecQueryAndProcessResults':
common.c:1686:24: warning: passing argument 1 of 'PrintQueryTuples' from incompatible pointer type [-Wincompatible-pointer-types]
PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
^~~~~~~~~~~~
common.c:679:35: note: expected 'const PGresult **' {aka 'const struct pg_result **'} but argument is of type 'PGresult **' {aka 'struct pg_result **'}
PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt,
~~~~~~~~~~~~~~~~~^~~~~~
common.c:1720:24: warning: passing argument 1 of 'PrintQueryTuples' from incompatible pointer type [-Wincompatible-pointer-types]
PrintQueryTuples(result_array, ntuples, &my_popt, tuples_fout);
^~~~~~~~~~~~
common.c:679:35: note: expected 'const PGresult **' {aka 'const struct pg_result **'} but argument is of type 'PGresult **' {aka 'struct pg_result **'}
PrintQueryTuples(const PGresult **result, int nresults, const printQueryOpt *opt,
~~~~~~~~~~~~~~~~~^~~~~~
I think the cause is the inconsistency about whether PGresult pointers
are pointer-to-const or not. Even without compiler warnings, I find
code like this very ugly:
- success = PrintQueryTuples(result, opt, printQueryFout);
+ success = PrintQueryTuples((const PGresult**)&result, 1, opt, printQueryFout);
I think what you probably ought to do to avoid all that is to change
the arguments of PrintQueryResult and nearby routines to be "const
PGresult *result" not just "PGresult *result".
I find it sad that we can't get rid of ExecQueryUsingCursor().
Maybe a little effort towards reducing overhead in the single-row
mode would help?
regards, tom lane
Tom Lane wrote:
This gives me several "-Wincompatible-pointer-types" warnings
[...]
I think what you probably ought to do to avoid all that is to change
the arguments of PrintQueryResult and nearby routines to be "const
PGresult *result" not just "PGresult *result".
The const-ness issue that I ignored in the previous patch is that
while C is fine with passing T* to a function expecting const T*, it's
not okay with passing T** to a function expecting const T**,
or more generally converting T** to const T**.
When callers need to pass arrays of PGresult* instead of const
PGresult*, I've opted to remove the const qualifiers for the functions
that are concerned by this change.
PFA an updated patch.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
psql-fetchcount-single-row-mode-v3.difftext/x-patch; name=psql-fetchcount-single-row-mode-v3.diffDownload
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 5973df2e39..476a9770f0 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -372,6 +372,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_SINGLE_TUPLE:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -695,7 +696,7 @@ PrintNotifications(void)
* Returns true if successful, false otherwise.
*/
static bool
-PrintQueryTuples(const PGresult *result, const printQueryOpt *opt,
+PrintQueryTuples(PGresult *result, const printQueryOpt *opt,
FILE *printQueryFout)
{
bool ok = true;
@@ -1391,6 +1392,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_SINGLE_TUPLE ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ SetShellResultVariables(pclose(gfile_fout));
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1422,10 +1464,18 @@ ExecQueryAndProcessResults(const char *query,
bool success;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ PGresult **result_array = NULL; /* to collect results in single row mode */
+ int64 total_tuples = 0;
+ int ntuples;
+ int flush_error = 0;
+ bool is_pager = false;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
@@ -1448,6 +1498,33 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-row fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetSingleRowMode(pset.db))
+ {
+ pg_log_warning("fetching results in single row mode is unavailable");
+ fetch_count = 0;
+ }
+ else
+ {
+ result_array = (PGresult**) pg_malloc(fetch_count * sizeof(PGresult*));
+ }
+ }
+ else
+ fetch_count = 0; /* disable single-row mode */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1467,6 +1544,8 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ /* whether the output starts before results are fully fetched */
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1593,6 +1672,94 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_SINGLE_TUPLE)
+ {
+ FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
+ printQueryOpt my_popt = pset.popt;
+
+ ntuples = 0;
+ total_tuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ result_array[ntuples++] = result;
+ if (ntuples == fetch_count)
+ {
+ /* pager: open at most once per resultset */
+ if (tuples_fout == stdout && !is_pager)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+ /* display the current chunk of results unless the output stream is not working */
+ if (!flush_error)
+ {
+ printQueryChunks(result_array, ntuples, &my_popt, tuples_fout,
+ is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+ /* clear and reuse result_array */
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += ntuples;
+ total_tuples += ntuples;
+ ntuples = 0;
+ }
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the row-by-row sequence.
+ */
+ success = false;
+ break;
+ }
+
+ if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /*
+ * The last row has been read. Display the last chunk of
+ * results and the footer.
+ */
+ my_popt.topt.stop_table = true;
+ if (!flush_error)
+ {
+ printQueryChunks(result_array, ntuples, &my_popt, tuples_fout,
+ is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+ for (int i=0; i < ntuples; i++)
+ PQclear(result_array[i]);
+ total_tuples += ntuples;
+ ntuples = 0;
+
+ if (is_pager)
+ {
+ ClosePager(tuples_fout);
+ }
+
+ result = NULL;
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1621,7 +1788,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1630,32 +1797,31 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables on last result if all went well */
if (!is_watch && last && success)
+ {
SetResultVariables(result, true);
+ if (partial_display)
+ {
+ /*
+ * fake SetResultVariables() as in ExecQueryUsingCursor().
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ }
ClearOrSaveResult(result);
result = next_result;
@@ -1667,17 +1833,10 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- SetShellResultVariables(pclose(gfile_fout));
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
+
+ if (result_array)
+ pg_free(result_array);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
diff --git a/src/fe_utils/print.c b/src/fe_utils/print.c
index 7af1ccb6b5..35f4f3d398 100644
--- a/src/fe_utils/print.c
+++ b/src/fe_utils/print.c
@@ -3532,19 +3532,44 @@ printTable(const printTableContent *cont,
* flog: if not null, also print the data there (for --log-file option)
*/
void
-printQuery(const PGresult *result, const printQueryOpt *opt,
+printQuery(PGresult *result, const printQueryOpt *opt,
FILE *fout, bool is_pager, FILE *flog)
+{
+ printQueryChunks(&result, 1, opt, fout, is_pager, flog);
+}
+
+/*
+ * Print the results of a query that may have been obtained by a
+ * succession of calls to PQgetResult in single-row mode.
+ *
+ * results: array of results of a successful query. They must have the same columns.
+ * nbresults: size of results
+ * opt: formatting options
+ * fout: where to print to
+ * is_pager: true if caller has already redirected fout to be a pager pipe
+ * flog: if not null, also print the data there (for --log-file option)
+ */
+void
+printQueryChunks(PGresult *results[], int nresults, const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog)
{
printTableContent cont;
int i,
r,
c;
+ int nrows = 0; /* total number of rows */
+ int ri; /* index into results[] */
if (cancel_pressed)
return;
+ for (ri = 0; ri < nresults; ri++)
+ {
+ nrows += PQntuples(results[ri]);
+ }
+
printTableInit(&cont, &opt->topt, opt->title,
- PQnfields(result), PQntuples(result));
+ (nresults > 0) ? PQnfields(results[0]) : 0, nrows);
/* Assert caller supplied enough translate_columns[] entries */
Assert(opt->translate_columns == NULL ||
@@ -3552,34 +3577,37 @@ printQuery(const PGresult *result, const printQueryOpt *opt,
for (i = 0; i < cont.ncolumns; i++)
{
- printTableAddHeader(&cont, PQfname(result, i),
+ printTableAddHeader(&cont, PQfname(results[0], i),
opt->translate_header,
- column_type_alignment(PQftype(result, i)));
+ column_type_alignment(PQftype(results[0], i)));
}
/* set cells */
- for (r = 0; r < cont.nrows; r++)
+ for (ri = 0; ri < nresults; ri++)
{
- for (c = 0; c < cont.ncolumns; c++)
+ for (r = 0; r < PQntuples(results[ri]); r++)
{
- char *cell;
- bool mustfree = false;
- bool translate;
-
- if (PQgetisnull(result, r, c))
- cell = opt->nullPrint ? opt->nullPrint : "";
- else
+ for (c = 0; c < cont.ncolumns; c++)
{
- cell = PQgetvalue(result, r, c);
- if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ char *cell;
+ bool mustfree = false;
+ bool translate;
+
+ if (PQgetisnull(results[ri], r, c))
+ cell = opt->nullPrint ? opt->nullPrint : "";
+ else
{
- cell = format_numeric_locale(cell);
- mustfree = true;
+ cell = PQgetvalue(results[ri], r, c);
+ if (cont.aligns[c] == 'r' && opt->topt.numericLocale)
+ {
+ cell = format_numeric_locale(cell);
+ mustfree = true;
+ }
}
- }
- translate = (opt->translate_columns && opt->translate_columns[c]);
- printTableAddCell(&cont, cell, translate, mustfree);
+ translate = (opt->translate_columns && opt->translate_columns[c]);
+ printTableAddCell(&cont, cell, translate, mustfree);
+ }
}
}
diff --git a/src/include/fe_utils/print.h b/src/include/fe_utils/print.h
index cc6652def9..4d73aad251 100644
--- a/src/include/fe_utils/print.h
+++ b/src/include/fe_utils/print.h
@@ -224,8 +224,11 @@ extern void printTableSetFooter(printTableContent *const content,
extern void printTableCleanup(printTableContent *const content);
extern void printTable(const printTableContent *cont,
FILE *fout, bool is_pager, FILE *flog);
-extern void printQuery(const PGresult *result, const printQueryOpt *opt,
+extern void printQuery(PGresult *result, const printQueryOpt *opt,
FILE *fout, bool is_pager, FILE *flog);
+extern void printQueryChunks(PGresult *results[], int nresults,
+ const printQueryOpt *opt,
+ FILE *fout, bool is_pager, FILE *flog);
extern char column_type_alignment(Oid);
Hi,
Here's a new version to improve the performance of FETCH_COUNT
and extend the cases when it can be used.
Patch 0001 adds a new mode in libpq to allow the app to retrieve
larger chunks of results than the single row of the row-by-row mode.
The maximum number of rows per PGresult is set by the user.
Patch 0002 uses that mode in psql and gets rid of the cursor
implementation as suggested upthread.
The performance numbers look good.
For a query retrieving 50M rows of about 200 bytes:
select repeat('abc', 200) from generate_series(1,5000000)
/usr/bin/time -v psql -At -c $query reports these metrics
(medians of 5 runs):
version | fetch_count | clock_time | user_time | sys_time | max_rss_size
(kB)
-----------+-------------+------------+-----------+----------+-------------------
16-stable | 0 | 6.58 | 3.98 | 2.09 |
3446276
16-stable | 100 | 9.25 | 4.10 | 1.90 |
8768
16-stable | 1000 | 11.13 | 5.17 | 1.66 |
8904
17-patch | 0 | 6.5 | 3.94 | 2.09 |
3442696
17-patch | 100 | 5 | 3.56 | 0.93 |
4096
17-patch | 1000 | 6.48 | 4.00 | 1.55 |
4344
Interestingly, retrieving by chunks of 100 rows appears to be a bit faster
than the default one big chunk. It means that independently
of using less memory, FETCH_COUNT implemented that way
would be a performance enhancement compared to both
not using it and using it in v16 with the cursor implementation.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
v4-0001-Implement-retrieval-of-results-in-chunks-with-lib.patchtext/plainDownload
From 766bbe84def2db494f646caeaf29eefeba893c1a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Mon, 20 Nov 2023 17:24:55 +0100
Subject: [PATCH v4 1/2] Implement retrieval of results in chunks with libpq.
This mode is similar to the single-row mode except that chunks
of results contain up to N rows instead of a single row.
It is meant to reduce the overhead of the row-by-row allocations
for large result sets.
The mode is selected with PQsetChunkedRowsMode(int maxRows) and results
have the new status code PGRES_TUPLES_CHUNK.
---
doc/src/sgml/libpq.sgml | 96 +++++++++++++++++++------
src/bin/pg_amcheck/pg_amcheck.c | 1 +
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-exec.c | 118 +++++++++++++++++++++++++------
src/interfaces/libpq/libpq-fe.h | 4 +-
src/interfaces/libpq/libpq-int.h | 7 +-
6 files changed, 183 insertions(+), 44 deletions(-)
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index ed88ac001a..8007bf67d8 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3537,7 +3537,20 @@ ExecStatusType PQresultStatus(const PGresult *res);
The <structname>PGresult</structname> contains a single result tuple
from the current command. This status occurs only when
single-row mode has been selected for the query
- (see <xref linkend="libpq-single-row-mode"/>).
+ (see <xref linkend="libpq-chunked-results-modes"/>).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pgres-tuples-chunk">
+ <term><literal>PGRES_TUPLES_CHUNK</literal></term>
+ <listitem>
+ <para>
+ The <structname>PGresult</structname> contains several tuples
+ from the current command. The count of tuples cannot exceed
+ the maximum passed to <xref linkend="libpq-PQsetChunkedRowsMode"/>.
+ This status occurs only when the chunked mode has been selected
+ for the query (see <xref linkend="libpq-chunked-results-modes"/>).
</para>
</listitem>
</varlistentry>
@@ -5187,8 +5200,8 @@ PGresult *PQgetResult(PGconn *conn);
<para>
Another frequently-desired feature that can be obtained with
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/>
- is retrieving large query results a row at a time. This is discussed
- in <xref linkend="libpq-single-row-mode"/>.
+ is retrieving large query results a limited number of rows at a time. This is discussed
+ in <xref linkend="libpq-chunked-results-modes"/>.
</para>
<para>
@@ -5551,12 +5564,13 @@ int PQflush(PGconn *conn);
</para>
<para>
- To enter single-row mode, call <function>PQsetSingleRowMode</function>
- before retrieving results with <function>PQgetResult</function>.
- This mode selection is effective only for the query currently
- being processed. For more information on the use of
- <function>PQsetSingleRowMode</function>,
- refer to <xref linkend="libpq-single-row-mode"/>.
+ To enter single-row or chunked modes, call
+ respectively <function>PQsetSingleRowMode</function>
+ or <function>PQsetChunkedRowsMode</function> before retrieving results
+ with <function>PQgetResult</function>. This mode selection is effective
+ only for the query currently being processed. For more information on the
+ use of these functions refer
+ to <xref linkend="libpq-chunked-results-modes" />.
</para>
<para>
@@ -5895,10 +5909,10 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</sect2>
</sect1>
- <sect1 id="libpq-single-row-mode">
- <title>Retrieving Query Results Row-by-Row</title>
+ <sect1 id="libpq-chunked-results-modes">
+ <title>Retrieving Query Results by chunks</title>
- <indexterm zone="libpq-single-row-mode">
+ <indexterm zone="libpq-chunked-results-modes">
<primary>libpq</primary>
<secondary>single-row mode</secondary>
</indexterm>
@@ -5909,13 +5923,15 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
<structname>PGresult</structname>. This can be unworkable for commands
that return a large number of rows. For such cases, applications can use
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/> in
- <firstterm>single-row mode</firstterm>. In this mode, the result row(s) are
- returned to the application one at a time, as they are received from the
- server.
+ <firstterm>single-row mode</firstterm> or <firstterm>chunked mode</firstterm>.
+ In these modes, the result row(s) are returned to the application one at a
+ time for the single-row mode and by chunks for the chunked mode, as they
+ are received from the server.
</para>
<para>
- To enter single-row mode, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ To enter these modes, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ or <xref linkend="libpq-PQsetChunkedRowsMode"/>
immediately after a successful call of <xref linkend="libpq-PQsendQuery"/>
(or a sibling function). This mode selection is effective only for the
currently executing query. Then call <xref linkend="libpq-PQgetResult"/>
@@ -5923,7 +5939,8 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
linkend="libpq-async"/>. If the query returns any rows, they are returned
as individual <structname>PGresult</structname> objects, which look like
normal query results except for having status code
- <literal>PGRES_SINGLE_TUPLE</literal> instead of
+ <literal>PGRES_SINGLE_TUPLE</literal> for the single-row mode and
+ <literal>PGRES_TUPLES_CHUNK</literal> for the chunked mode, instead of
<literal>PGRES_TUPLES_OK</literal>. After the last row, or immediately if
the query returns zero rows, a zero-row object with status
<literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
@@ -5936,9 +5953,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</para>
<para>
- When using pipeline mode, single-row mode needs to be activated for each
- query in the pipeline before retrieving results for that query
- with <function>PQgetResult</function>.
+ When using pipeline mode, the single-row or chunked mode need to be
+ activated for each query in the pipeline before retrieving results for that
+ query with <function>PQgetResult</function>.
See <xref linkend="libpq-pipeline-mode"/> for more information.
</para>
@@ -5972,14 +5989,49 @@ int PQsetSingleRowMode(PGconn *conn);
</variablelist>
</para>
+ <para>
+ <variablelist>
+ <varlistentry id="libpq-PQsetChunkedRowsMode">
+ <term><function>PQsetChunkedRowsMode</function>
+ <indexterm><primary>PQsetChunkedRowsMode</primary></indexterm></term>
+ <listitem>
+ <para>
+ Select the mode retrieving results in chunks for the currently-executing query.
+
+<synopsis>
+ int PQsetChunkedRowsMode(PGconn *conn,
+ int maxRows);
+</synopsis>
+ </para>
+
+ <para>
+ This function is similar to <xref linkend="libpq-PQsetSingleRowMode"/>,
+ except that it can retrieve a user-specified number of rows
+ per call to <xref linkend="libpq-PQgetResult"/>, instead of a single row.
+ This function can only be called immediately after
+ <xref linkend="libpq-PQsendQuery"/> or one of its sibling functions,
+ before any other operation on the connection such as
+ <xref linkend="libpq-PQconsumeInput"/> or
+ <xref linkend="libpq-PQgetResult"/>. If called at the correct time,
+ the function activates the chunked mode for the current query and
+ returns 1. Otherwise the mode stays unchanged and the function
+ returns 0. In any case, the mode reverts to normal after
+ completion of the current query.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
<caution>
<para>
While processing a query, the server may return some rows and then
encounter an error, causing the query to be aborted. Ordinarily,
<application>libpq</application> discards any such rows and reports only the
- error. But in single-row mode, those rows will have already been
+ error. But in single-row or chunked modes, those rows will have already been
returned to the application. Hence, the application will see some
- <literal>PGRES_SINGLE_TUPLE</literal> <structname>PGresult</structname>
+ <literal>PGRES_SINGLE_TUPLE</literal> or <literal>PGRES_TUPLES_CHUNK</literal>
+ <structname>PGresult</structname>
objects followed by a <literal>PGRES_FATAL_ERROR</literal> object. For
proper transactional behavior, the application must be designed to
discard or undo whatever has been done with the previously-processed
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index a6b3b56457..9c8a0916c7 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -989,6 +989,7 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return false;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 850734ac96..ae7c84247b 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -191,3 +191,4 @@ PQclosePrepared 188
PQclosePortal 189
PQsendClosePrepared 190
PQsendClosePortal 191
+PQsetChunkedRowsMode 192
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 04610ccf5e..2e96d1b538 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -41,7 +41,8 @@ char *const pgresStatus[] = {
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
- "PGRES_PIPELINE_ABORTED"
+ "PGRES_PIPELINE_ABORTED",
+ "PGRES_TUPLES_CHUNK"
};
/* We return this if we're unable to make a PGresult at all */
@@ -82,7 +83,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type,
static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
static int pqPipelineFlush(PGconn *conn);
-
+static bool canChangeRowMode(PGconn *conn);
/* ----------------
* Space management for PGresult.
@@ -199,6 +200,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
/* non-error cases */
break;
default:
@@ -910,8 +912,9 @@ pqPrepareAsyncResult(PGconn *conn)
/*
* Replace conn->result with next_result, if any. In the normal case
* there isn't a next result and we're just dropping ownership of the
- * current result. In single-row mode this restores the situation to what
- * it was before we created the current single-row result.
+ * current result. In single-row and chunked modes this restores the
+ * situation to what it was before we created the current single-row or
+ * chunk-of-rows result.
*/
conn->result = conn->next_result;
conn->error_result = false; /* next_result is never an error */
@@ -1197,10 +1200,11 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
* (Such a string should already be translated via libpq_gettext().)
* If it is left NULL, the error is presumed to be "out of memory".
*
- * In single-row mode, we create a new result holding just the current row,
- * stashing the previous result in conn->next_result so that it becomes
- * active again after pqPrepareAsyncResult(). This allows the result metadata
- * (column descriptions) to be carried forward to each result row.
+ * In single-row or chunked mode, we create a new result holding just the
+ * current set of rows, stashing the previous result in conn->next_result so
+ * that it becomes active again after pqPrepareAsyncResult(). This allows the
+ * result metadata (column descriptions) to be carried forward to each result
+ * row.
*/
int
pqRowProcessor(PGconn *conn, const char **errmsgp)
@@ -1225,6 +1229,28 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
if (!res)
return 0;
}
+ else if (conn->rowsChunkSize > 0)
+ {
+ /*
+ * In chunked mode, make a new PGresult that will hold N rows; the
+ * original conn->result is left unchanged, as in the single-row mode.
+ */
+ if (!conn->chunk_result)
+ {
+ /* Allocate and initialize the result to hold a chunk of rows */
+ res = PQcopyResult(res,
+ PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
+ PG_COPYRES_NOTICEHOOKS);
+ if (!res)
+ return 0;
+ /* Change result status to special chunk-of-rows value */
+ res->resultStatus = PGRES_TUPLES_CHUNK;
+ /* Keep this result to reuse for the next rows of the chunk */
+ conn->chunk_result = res;
+ }
+ else
+ res = conn->chunk_result; /* Use the current chunk */
+ }
/*
* Basically we just allocate space in the PGresult for each field and
@@ -1287,6 +1313,21 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->asyncStatus = PGASYNC_READY_MORE;
}
+ /*
+ * In chunked mode, if the count has reached the requested limit, make the
+ * rows of the current chunk available immediately.
+ */
+ else if (conn->rowsChunkSize > 0 && res->ntups >= conn->rowsChunkSize)
+ {
+ /* Stash old result for re-use later */
+ conn->next_result = conn->result;
+ conn->result = res;
+ /* Do not reuse that chunk of results */
+ conn->chunk_result = NULL;
+ /* And mark the result ready to return */
+ conn->asyncStatus = PGASYNC_READY_MORE;
+ }
+
return 1;
fail:
@@ -1742,8 +1783,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
*/
pqClearAsyncResult(conn);
- /* reset single-row processing mode */
+ /* reset row-by-row and chunked processing modes */
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
}
/* ready to send command message */
@@ -1927,25 +1969,51 @@ sendFailed:
*/
int
PQsetSingleRowMode(PGconn *conn)
+{
+ if (canChangeRowMode(conn))
+ {
+ conn->singleRowMode = true;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+/*
+ * Select chunked results processing mode
+ */
+int
+PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
+{
+ if (chunkSize >= 0 && canChangeRowMode(conn))
+ {
+ conn->rowsChunkSize = chunkSize;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+static
+bool
+canChangeRowMode(PGconn *conn)
{
/*
- * Only allow setting the flag when we have launched a query and not yet
- * received any results.
+ * Only allow setting the row-by-row or by-chunks modes when we have
+ * launched a query and not yet received any results.
*/
if (!conn)
- return 0;
+ return false;
if (conn->asyncStatus != PGASYNC_BUSY)
- return 0;
+ return false;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
- return 0;
+ return false;
if (pgHavePendingResult(conn))
- return 0;
+ return false;
- /* OK, set flag */
- conn->singleRowMode = true;
- return 1;
+ return true;
}
/*
@@ -2113,6 +2181,16 @@ PQgetResult(PGconn *conn)
case PGASYNC_READY:
+ /*
+ * If there is a pending chunk of results, return it
+ */
+ if (conn->chunk_result != NULL)
+ {
+ res = conn->chunk_result;
+ conn->chunk_result = NULL;
+ break;
+ }
+
/*
* For any query type other than simple query protocol, we advance
* the command queue here. This is because for simple query
@@ -3151,10 +3229,10 @@ pqPipelineProcessQueue(PGconn *conn)
}
/*
- * Reset single-row processing mode. (Client has to set it up for each
+ * Reset processing mode in chunks. (Client has to set it up for each
* query, if desired.)
*/
- conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
/*
* If there are no further commands to process in the queue, get us in
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 97762d56f5..002ed772c8 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -109,8 +109,9 @@ typedef enum
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
- PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort
+ PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
+ PGRES_TUPLES_CHUNK /* set of tuples from larger resultset */
} ExecStatusType;
typedef enum
@@ -463,6 +464,7 @@ extern int PQsendQueryPrepared(PGconn *conn,
const int *paramFormats,
int resultFormat);
extern int PQsetSingleRowMode(PGconn *conn);
+extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize);
extern PGresult *PQgetResult(PGconn *conn);
/* Routines for managing an asynchronous query */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index c745facfec..7786bd2435 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -431,6 +431,8 @@ struct pg_conn
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
+ int rowsChunkSize; /* non-zero to return query results by chunks
+ * not exceeding that number of rows */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
PGnotify *notifyHead; /* oldest unreported Notify msg */
@@ -536,7 +538,10 @@ struct pg_conn
*/
PGresult *result; /* result being constructed */
bool error_result; /* do we need to make an ERROR result? */
- PGresult *next_result; /* next result (used in single-row mode) */
+ PGresult *next_result; /* next result (used in single-row and
+ * by-chunks modes) */
+ PGresult *chunk_result; /* current chunk of results (limited to
+ * rowsChunkSize) */
/* Assorted state for SASL, SSL, GSS, etc */
const pg_fe_sasl_mech *sasl;
--
2.34.1
v4-0002-Reimplement-FETCH_COUNT-with-the-chunked-mode-in-.patchtext/plainDownload
From a5555f339d88ad2a77e567cfd249cb948871b796 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Mon, 20 Nov 2023 18:42:41 +0100
Subject: [PATCH v4 2/2] Reimplement FETCH_COUNT with the chunked mode in libpq
instead of cursors.
Cursors were used only when the command starts with the keyword "SELECT",
excluding queries that start with "WITH" or "UPDATE" or "INSERT" that
may also return large result sets.
Also, cursors imply more commands sent to the server (begin/declare cursor
/repeated fetch/close cursor/commit), whereas in chunked mode, only the actual
user query is sent, resulting in less round-trips.
This also fixes the bug that combined queries (query1 \; query2;) were
not correctly handled with FETCH_COUNT set, due to cursors not supporting
multiple queries.
---
src/bin/psql/common.c | 545 ++++++++++-------------------
src/bin/psql/t/001_basic.pl | 6 +-
src/test/regress/expected/psql.out | 9 +-
src/test/regress/sql/psql.sql | 4 +-
4 files changed, 196 insertions(+), 368 deletions(-)
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index daabf6f12b..adb915e5c2 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -31,7 +31,6 @@
#include "settings.h"
static bool DescribeQuery(const char *query, double *elapsed_msec);
-static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static int ExecQueryAndProcessResults(const char *query,
double *elapsed_msec,
bool *svpt_gone_p,
@@ -40,8 +39,6 @@ static int ExecQueryAndProcessResults(const char *query,
const printQueryOpt *opt,
FILE *printQueryFout);
static bool command_no_begin(const char *query);
-static bool is_select_command(const char *query);
-
/*
* openQueryOutputFile --- attempt to open a query output file
@@ -373,6 +370,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_TUPLES_CHUNK:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -1131,16 +1129,10 @@ SendQuery(const char *query)
/* Describe query's result columns, without executing it */
OK = DescribeQuery(query, &elapsed_msec);
}
- else if (pset.fetch_count <= 0 || pset.gexec_flag ||
- pset.crosstab_flag || !is_select_command(query))
- {
- /* Default fetch-it-all-and-print mode */
- OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
- }
else
{
- /* Fetch-in-segments mode */
- OK = ExecQueryUsingCursor(query, &elapsed_msec);
+ /* Default fetch-and-print mode */
+ OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
if (!OK && pset.echo == PSQL_ECHO_ERRORS)
@@ -1392,6 +1384,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_TUPLES_CHUNK ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ SetShellResultVariables(pclose(gfile_fout));
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1424,10 +1457,16 @@ ExecQueryAndProcessResults(const char *query,
bool return_early = false;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ int64 total_tuples = 0;
+ int flush_error = 0;
+ bool is_pager = false;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
@@ -1450,6 +1489,29 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-chunks fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetChunkedRowsMode(pset.db, fetch_count))
+ {
+ pg_log_warning("fetching results in chunks mode is unavailable");
+ fetch_count = 0;
+ }
+ }
+ else
+ fetch_count = 0; /* fetch one resultset per query */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1473,6 +1535,8 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ /* whether the output starts before results are fully fetched */
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1568,20 +1632,9 @@ ExecQueryAndProcessResults(const char *query,
}
else if (pset.gfname)
{
- /* send to \g file, which we may have opened already */
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- copy_stream = gfile_fout;
- }
- else
- success = false;
- }
- else
+ /* COPY followed by \g filename or \g |program */
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (success)
copy_stream = gfile_fout;
}
else
@@ -1599,6 +1652,90 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK)
+ {
+ FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
+ printQueryOpt my_popt = pset.popt;
+
+ total_tuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ /* pager: open at most once per resultset */
+ if (tuples_fout == stdout && !is_pager)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+ /* display the current chunk of results unless the output stream is not working */
+ if (!flush_error)
+ {
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += PQntuples(result);
+ total_tuples += PQntuples(result);
+
+ ClearOrSaveResult(result);
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the fetch sequence.
+ */
+ success = false;
+ if (is_pager)
+ ClosePager(tuples_fout);
+ break;
+ }
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /*
+ * The last row has been read. Display the footer.
+ */
+ my_popt.topt.stop_table = true;
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ total_tuples += PQntuples(result);
+
+ if (is_pager)
+ ClosePager(tuples_fout);
+ ClearOrSaveResult(result);
+ result = NULL;
+ break;
+ }
+ else if (PQresultStatus(result) != PGRES_TUPLES_CHUNK)
+ {
+ /*
+ * Error. We expect either PGRES_TUPLES_CHUNK or
+ * PGRES_TUPLES_OK.
+ */
+ if (is_pager)
+ ClosePager(tuples_fout);
+ success = false;
+ AcceptResult(result, true); /* display error whenever appropriate */
+ SetResultVariables(result, success);
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1627,7 +1764,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1636,32 +1773,33 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables from last result */
if (!is_watch && last)
- SetResultVariables(result, success);
+ {
+ if (!partial_display)
+ SetResultVariables(result, success);
+ else if (success)
+ {
+ /*
+ * fake SetResultVariables(). If an error occurred when
+ * retrieving chunks, these variables have been set already.
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ }
ClearOrSaveResult(result);
result = next_result;
@@ -1673,17 +1811,7 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- SetShellResultVariables(pclose(gfile_fout));
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
@@ -1696,274 +1824,6 @@ ExecQueryAndProcessResults(const char *query,
}
-/*
- * ExecQueryUsingCursor: run a SELECT-like query using a cursor
- *
- * This feature allows result sets larger than RAM to be dealt with.
- *
- * Returns true if the query executed successfully, false otherwise.
- *
- * If pset.timing is on, total query time (exclusive of result-printing) is
- * stored into *elapsed_msec.
- */
-static bool
-ExecQueryUsingCursor(const char *query, double *elapsed_msec)
-{
- bool OK = true;
- PGresult *result;
- PQExpBufferData buf;
- printQueryOpt my_popt = pset.popt;
- bool timing = pset.timing;
- FILE *fout;
- bool is_pipe;
- bool is_pager = false;
- bool started_txn = false;
- int64 total_tuples = 0;
- int ntuples;
- int fetch_count;
- char fetch_cmd[64];
- instr_time before,
- after;
- int flush_error;
-
- *elapsed_msec = 0;
-
- /* initialize print options for partial table output */
- my_popt.topt.start_table = true;
- my_popt.topt.stop_table = false;
- my_popt.topt.prior_records = 0;
-
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
- else
- INSTR_TIME_SET_ZERO(before);
-
- /* if we're not in a transaction, start one */
- if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
- {
- result = PQexec(pset.db, "BEGIN");
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- if (!OK)
- return false;
- started_txn = true;
- }
-
- /* Send DECLARE CURSOR */
- initPQExpBuffer(&buf);
- appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
- query);
-
- result = PQexec(pset.db, buf.data);
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- if (!OK)
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- termPQExpBuffer(&buf);
- if (!OK)
- goto cleanup;
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- /*
- * In \gset mode, we force the fetch count to be 2, so that we will throw
- * the appropriate error if the query returns more than one row.
- */
- if (pset.gset_prefix)
- fetch_count = 2;
- else
- fetch_count = pset.fetch_count;
-
- snprintf(fetch_cmd, sizeof(fetch_cmd),
- "FETCH FORWARD %d FROM _psql_cursor",
- fetch_count);
-
- /* prepare to write output to \g argument, if any */
- if (pset.gfname)
- {
- if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
- {
- OK = false;
- goto cleanup;
- }
- if (is_pipe)
- disable_sigpipe_trap();
- }
- else
- {
- fout = pset.queryFout;
- is_pipe = false; /* doesn't matter */
- }
-
- /* clear any pre-existing error indication on the output stream */
- clearerr(fout);
-
- for (;;)
- {
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /* get fetch_count tuples at a time */
- result = PQexec(pset.db, fetch_cmd);
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- if (PQresultStatus(result) != PGRES_TUPLES_OK)
- {
- /* shut down pager before printing error message */
- if (is_pager)
- {
- ClosePager(fout);
- is_pager = false;
- }
-
- OK = AcceptResult(result, true);
- Assert(!OK);
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- break;
- }
-
- if (pset.gset_prefix)
- {
- /* StoreQueryTuple will complain if not exactly one row */
- OK = StoreQueryTuple(result);
- ClearOrSaveResult(result);
- break;
- }
-
- /*
- * Note we do not deal with \gdesc, \gexec or \crosstabview modes here
- */
-
- ntuples = PQntuples(result);
- total_tuples += ntuples;
-
- if (ntuples < fetch_count)
- {
- /* this is the last result set, so allow footer decoration */
- my_popt.topt.stop_table = true;
- }
- else if (fout == stdout && !is_pager)
- {
- /*
- * If query requires multiple result sets, hack to ensure that
- * only one pager instance is used for the whole mess
- */
- fout = PageOutput(INT_MAX, &(my_popt.topt));
- is_pager = true;
- }
-
- printQuery(result, &my_popt, fout, is_pager, pset.logfile);
-
- ClearOrSaveResult(result);
-
- /* after the first result set, disallow header decoration */
- my_popt.topt.start_table = false;
- my_popt.topt.prior_records += ntuples;
-
- /*
- * Make sure to flush the output stream, so intermediate results are
- * visible to the client immediately. We check the results because if
- * the pager dies/exits/etc, there's no sense throwing more data at
- * it.
- */
- flush_error = fflush(fout);
-
- /*
- * Check if we are at the end, if a cancel was pressed, or if there
- * were any errors either trying to flush out the results, or more
- * generally on the output stream at all. If we hit any errors
- * writing things to the stream, we presume $PAGER has disappeared and
- * stop bothering to pull down more data.
- */
- if (ntuples < fetch_count || cancel_pressed || flush_error ||
- ferror(fout))
- break;
- }
-
- if (pset.gfname)
- {
- /* close \g argument file/pipe */
- if (is_pipe)
- {
- SetShellResultVariables(pclose(fout));
- restore_sigpipe_trap();
- }
- else
- fclose(fout);
- }
- else if (is_pager)
- {
- /* close transient pager */
- ClosePager(fout);
- }
-
- if (OK)
- {
- /*
- * We don't have a PGresult here, and even if we did it wouldn't have
- * the right row count, so fake SetResultVariables(). In error cases,
- * we already set the result variables above.
- */
- char buf[32];
-
- SetVariable(pset.vars, "ERROR", "false");
- SetVariable(pset.vars, "SQLSTATE", "00000");
- snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
- SetVariable(pset.vars, "ROW_COUNT", buf);
- }
-
-cleanup:
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /*
- * We try to close the cursor on either success or failure, but on failure
- * ignore the result (it's probably just a bleat about being in an aborted
- * transaction)
- */
- result = PQexec(pset.db, "CLOSE _psql_cursor");
- if (OK)
- {
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
- else
- PQclear(result);
-
- if (started_txn)
- {
- result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
- OK &= AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- return OK;
-}
-
-
/*
* Advance the given char pointer over white space and SQL comments.
*/
@@ -2243,43 +2103,6 @@ command_no_begin(const char *query)
}
-/*
- * Check whether the specified command is a SELECT (or VALUES).
- */
-static bool
-is_select_command(const char *query)
-{
- int wordlen;
-
- /*
- * First advance over any whitespace, comments and left parentheses.
- */
- for (;;)
- {
- query = skip_white_space(query);
- if (query[0] == '(')
- query++;
- else
- break;
- }
-
- /*
- * Check word length (since "selectx" is not "select").
- */
- wordlen = 0;
- while (isalpha((unsigned char) query[wordlen]))
- wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
-
- if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
- return true;
-
- if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
- return true;
-
- return false;
-}
-
-
/*
* Test if the current user is a database superuser.
*/
diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl
index 95f4e60ab2..62a5d0f383 100644
--- a/src/bin/psql/t/001_basic.pl
+++ b/src/bin/psql/t/001_basic.pl
@@ -161,7 +161,7 @@ psql_like(
'\errverbose with no previous error');
# There are three main ways to run a query that might affect
-# \errverbose: The normal way, using a cursor by setting FETCH_COUNT,
+# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT,
# and using \gdesc. Test them all.
like(
@@ -184,10 +184,10 @@ like(
"\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose",
on_error_stop => 0))[2],
qr/\A^psql:<stdin>:2: ERROR: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^LOCATION: .*$/m,
'\errverbose after FETCH_COUNT query with error');
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 13e4f6db7b..aa53f11682 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -4754,7 +4754,7 @@ number of rows: 0
last error message: syntax error at end of input
\echo 'last error code:' :LAST_ERROR_SQLSTATE
last error code: 42601
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
unique2
@@ -4786,7 +4786,7 @@ error: false
error code: 00000
\echo 'number of rows:' :ROW_COUNT
number of rows: 19
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
?column?
----------
@@ -4800,6 +4800,11 @@ select 1/(15-unique2) from tenk1 order by unique2 limit 19;
0
0
0
+ 0
+ 0
+ 0
+ 0
+ 1
ERROR: division by zero
\echo 'error:' :ERROR
error: true
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index 695c72d866..3c4e6962ba 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -1160,14 +1160,14 @@ SELECT 4 AS \gdesc
\echo 'last error message:' :LAST_ERROR_MESSAGE
\echo 'last error code:' :LAST_ERROR_SQLSTATE
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
\echo 'number of rows:' :ROW_COUNT
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
--
2.34.1
Hi,
PFA a rebased version.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
v5-0001-Implement-retrieval-of-results-in-chunks-with-lib.patchtext/plainDownload
From cd0fe1d517a0e31e031fbbea1e603a715c77ea97 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Tue, 2 Jan 2024 14:15:48 +0100
Subject: [PATCH v5 1/2] Implement retrieval of results in chunks with libpq.
This mode is similar to the single-row mode except that chunks
of results contain up to N rows instead of a single row.
It is meant to reduce the overhead of the row-by-row allocations
for large result sets.
The mode is selected with PQsetChunkedRowsMode(int maxRows) and results
have the new status code PGRES_TUPLES_CHUNK.
---
doc/src/sgml/libpq.sgml | 96 ++++++++++----
.../libpqwalreceiver/libpqwalreceiver.c | 1 +
src/bin/pg_amcheck/pg_amcheck.c | 1 +
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-exec.c | 117 +++++++++++++++---
src/interfaces/libpq/libpq-fe.h | 4 +-
src/interfaces/libpq/libpq-int.h | 7 +-
7 files changed, 184 insertions(+), 43 deletions(-)
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index ed88ac001a..8007bf67d8 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3537,7 +3537,20 @@ ExecStatusType PQresultStatus(const PGresult *res);
The <structname>PGresult</structname> contains a single result tuple
from the current command. This status occurs only when
single-row mode has been selected for the query
- (see <xref linkend="libpq-single-row-mode"/>).
+ (see <xref linkend="libpq-chunked-results-modes"/>).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pgres-tuples-chunk">
+ <term><literal>PGRES_TUPLES_CHUNK</literal></term>
+ <listitem>
+ <para>
+ The <structname>PGresult</structname> contains several tuples
+ from the current command. The count of tuples cannot exceed
+ the maximum passed to <xref linkend="libpq-PQsetChunkedRowsMode"/>.
+ This status occurs only when the chunked mode has been selected
+ for the query (see <xref linkend="libpq-chunked-results-modes"/>).
</para>
</listitem>
</varlistentry>
@@ -5187,8 +5200,8 @@ PGresult *PQgetResult(PGconn *conn);
<para>
Another frequently-desired feature that can be obtained with
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/>
- is retrieving large query results a row at a time. This is discussed
- in <xref linkend="libpq-single-row-mode"/>.
+ is retrieving large query results a limited number of rows at a time. This is discussed
+ in <xref linkend="libpq-chunked-results-modes"/>.
</para>
<para>
@@ -5551,12 +5564,13 @@ int PQflush(PGconn *conn);
</para>
<para>
- To enter single-row mode, call <function>PQsetSingleRowMode</function>
- before retrieving results with <function>PQgetResult</function>.
- This mode selection is effective only for the query currently
- being processed. For more information on the use of
- <function>PQsetSingleRowMode</function>,
- refer to <xref linkend="libpq-single-row-mode"/>.
+ To enter single-row or chunked modes, call
+ respectively <function>PQsetSingleRowMode</function>
+ or <function>PQsetChunkedRowsMode</function> before retrieving results
+ with <function>PQgetResult</function>. This mode selection is effective
+ only for the query currently being processed. For more information on the
+ use of these functions refer
+ to <xref linkend="libpq-chunked-results-modes" />.
</para>
<para>
@@ -5895,10 +5909,10 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</sect2>
</sect1>
- <sect1 id="libpq-single-row-mode">
- <title>Retrieving Query Results Row-by-Row</title>
+ <sect1 id="libpq-chunked-results-modes">
+ <title>Retrieving Query Results by chunks</title>
- <indexterm zone="libpq-single-row-mode">
+ <indexterm zone="libpq-chunked-results-modes">
<primary>libpq</primary>
<secondary>single-row mode</secondary>
</indexterm>
@@ -5909,13 +5923,15 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
<structname>PGresult</structname>. This can be unworkable for commands
that return a large number of rows. For such cases, applications can use
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/> in
- <firstterm>single-row mode</firstterm>. In this mode, the result row(s) are
- returned to the application one at a time, as they are received from the
- server.
+ <firstterm>single-row mode</firstterm> or <firstterm>chunked mode</firstterm>.
+ In these modes, the result row(s) are returned to the application one at a
+ time for the single-row mode and by chunks for the chunked mode, as they
+ are received from the server.
</para>
<para>
- To enter single-row mode, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ To enter these modes, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ or <xref linkend="libpq-PQsetChunkedRowsMode"/>
immediately after a successful call of <xref linkend="libpq-PQsendQuery"/>
(or a sibling function). This mode selection is effective only for the
currently executing query. Then call <xref linkend="libpq-PQgetResult"/>
@@ -5923,7 +5939,8 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
linkend="libpq-async"/>. If the query returns any rows, they are returned
as individual <structname>PGresult</structname> objects, which look like
normal query results except for having status code
- <literal>PGRES_SINGLE_TUPLE</literal> instead of
+ <literal>PGRES_SINGLE_TUPLE</literal> for the single-row mode and
+ <literal>PGRES_TUPLES_CHUNK</literal> for the chunked mode, instead of
<literal>PGRES_TUPLES_OK</literal>. After the last row, or immediately if
the query returns zero rows, a zero-row object with status
<literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
@@ -5936,9 +5953,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</para>
<para>
- When using pipeline mode, single-row mode needs to be activated for each
- query in the pipeline before retrieving results for that query
- with <function>PQgetResult</function>.
+ When using pipeline mode, the single-row or chunked mode need to be
+ activated for each query in the pipeline before retrieving results for that
+ query with <function>PQgetResult</function>.
See <xref linkend="libpq-pipeline-mode"/> for more information.
</para>
@@ -5972,14 +5989,49 @@ int PQsetSingleRowMode(PGconn *conn);
</variablelist>
</para>
+ <para>
+ <variablelist>
+ <varlistentry id="libpq-PQsetChunkedRowsMode">
+ <term><function>PQsetChunkedRowsMode</function>
+ <indexterm><primary>PQsetChunkedRowsMode</primary></indexterm></term>
+ <listitem>
+ <para>
+ Select the mode retrieving results in chunks for the currently-executing query.
+
+<synopsis>
+ int PQsetChunkedRowsMode(PGconn *conn,
+ int maxRows);
+</synopsis>
+ </para>
+
+ <para>
+ This function is similar to <xref linkend="libpq-PQsetSingleRowMode"/>,
+ except that it can retrieve a user-specified number of rows
+ per call to <xref linkend="libpq-PQgetResult"/>, instead of a single row.
+ This function can only be called immediately after
+ <xref linkend="libpq-PQsendQuery"/> or one of its sibling functions,
+ before any other operation on the connection such as
+ <xref linkend="libpq-PQconsumeInput"/> or
+ <xref linkend="libpq-PQgetResult"/>. If called at the correct time,
+ the function activates the chunked mode for the current query and
+ returns 1. Otherwise the mode stays unchanged and the function
+ returns 0. In any case, the mode reverts to normal after
+ completion of the current query.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
<caution>
<para>
While processing a query, the server may return some rows and then
encounter an error, causing the query to be aborted. Ordinarily,
<application>libpq</application> discards any such rows and reports only the
- error. But in single-row mode, those rows will have already been
+ error. But in single-row or chunked modes, those rows will have already been
returned to the application. Hence, the application will see some
- <literal>PGRES_SINGLE_TUPLE</literal> <structname>PGresult</structname>
+ <literal>PGRES_SINGLE_TUPLE</literal> or <literal>PGRES_TUPLES_CHUNK</literal>
+ <structname>PGresult</structname>
objects followed by a <literal>PGRES_FATAL_ERROR</literal> object. For
proper transactional behavior, the application must be designed to
discard or undo whatever has been done with the previously-processed
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 693b3669ba..7498ee8bf4 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1093,6 +1093,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
switch (PQresultStatus(pgres))
{
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_TUPLES_OK:
walres->status = WALRCV_OK_TUPLES;
libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index a6b3b56457..9c8a0916c7 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -989,6 +989,7 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return false;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 850734ac96..ae7c84247b 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -191,3 +191,4 @@ PQclosePrepared 188
PQclosePortal 189
PQsendClosePrepared 190
PQsendClosePortal 191
+PQsetChunkedRowsMode 192
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b9511df2c2..a02d97d006 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -41,7 +41,8 @@ char *const pgresStatus[] = {
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
- "PGRES_PIPELINE_ABORTED"
+ "PGRES_PIPELINE_ABORTED",
+ "PGRES_TUPLES_CHUNK"
};
/* We return this if we're unable to make a PGresult at all */
@@ -82,7 +83,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type,
static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
static int pqPipelineFlush(PGconn *conn);
-
+static bool canChangeRowMode(PGconn *conn);
/* ----------------
* Space management for PGresult.
@@ -199,6 +200,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
/* non-error cases */
break;
default:
@@ -910,8 +912,9 @@ pqPrepareAsyncResult(PGconn *conn)
/*
* Replace conn->result with next_result, if any. In the normal case
* there isn't a next result and we're just dropping ownership of the
- * current result. In single-row mode this restores the situation to what
- * it was before we created the current single-row result.
+ * current result. In single-row and chunked modes this restores the
+ * situation to what it was before we created the current single-row or
+ * chunk-of-rows result.
*/
conn->result = conn->next_result;
conn->error_result = false; /* next_result is never an error */
@@ -1197,10 +1200,11 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
* (Such a string should already be translated via libpq_gettext().)
* If it is left NULL, the error is presumed to be "out of memory".
*
- * In single-row mode, we create a new result holding just the current row,
- * stashing the previous result in conn->next_result so that it becomes
- * active again after pqPrepareAsyncResult(). This allows the result metadata
- * (column descriptions) to be carried forward to each result row.
+ * In single-row or chunked mode, we create a new result holding just the
+ * current set of rows, stashing the previous result in conn->next_result so
+ * that it becomes active again after pqPrepareAsyncResult(). This allows the
+ * result metadata (column descriptions) to be carried forward to each result
+ * row.
*/
int
pqRowProcessor(PGconn *conn, const char **errmsgp)
@@ -1225,6 +1229,28 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
if (!res)
return 0;
}
+ else if (conn->rowsChunkSize > 0)
+ {
+ /*
+ * In chunked mode, make a new PGresult that will hold N rows; the
+ * original conn->result is left unchanged, as in the single-row mode.
+ */
+ if (!conn->chunk_result)
+ {
+ /* Allocate and initialize the result to hold a chunk of rows */
+ res = PQcopyResult(res,
+ PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
+ PG_COPYRES_NOTICEHOOKS);
+ if (!res)
+ return 0;
+ /* Change result status to special chunk-of-rows value */
+ res->resultStatus = PGRES_TUPLES_CHUNK;
+ /* Keep this result to reuse for the next rows of the chunk */
+ conn->chunk_result = res;
+ }
+ else
+ res = conn->chunk_result; /* Use the current chunk */
+ }
/*
* Basically we just allocate space in the PGresult for each field and
@@ -1287,6 +1313,21 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->asyncStatus = PGASYNC_READY_MORE;
}
+ /*
+ * In chunked mode, if the count has reached the requested limit, make the
+ * rows of the current chunk available immediately.
+ */
+ else if (conn->rowsChunkSize > 0 && res->ntups >= conn->rowsChunkSize)
+ {
+ /* Stash old result for re-use later */
+ conn->next_result = conn->result;
+ conn->result = res;
+ /* Do not reuse that chunk of results */
+ conn->chunk_result = NULL;
+ /* And mark the result ready to return */
+ conn->asyncStatus = PGASYNC_READY_MORE;
+ }
+
return 1;
fail:
@@ -1742,8 +1783,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
*/
pqClearAsyncResult(conn);
- /* reset single-row processing mode */
+ /* reset row-by-row and chunked processing modes */
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
}
/* ready to send command message */
@@ -1927,25 +1969,51 @@ sendFailed:
*/
int
PQsetSingleRowMode(PGconn *conn)
+{
+ if (canChangeRowMode(conn))
+ {
+ conn->singleRowMode = true;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+/*
+ * Select chunked results processing mode
+ */
+int
+PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
+{
+ if (chunkSize >= 0 && canChangeRowMode(conn))
+ {
+ conn->rowsChunkSize = chunkSize;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+static
+bool
+canChangeRowMode(PGconn *conn)
{
/*
- * Only allow setting the flag when we have launched a query and not yet
- * received any results.
+ * Only allow setting the row-by-row or by-chunks modes when we have
+ * launched a query and not yet received any results.
*/
if (!conn)
- return 0;
+ return false;
if (conn->asyncStatus != PGASYNC_BUSY)
- return 0;
+ return false;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
- return 0;
+ return false;
if (pgHavePendingResult(conn))
- return 0;
+ return false;
- /* OK, set flag */
- conn->singleRowMode = true;
- return 1;
+ return true;
}
/*
@@ -2112,6 +2180,16 @@ PQgetResult(PGconn *conn)
break;
case PGASYNC_READY:
+ /*
+ * If there is a pending chunk of results, return it
+ */
+ if (conn->chunk_result != NULL)
+ {
+ res = conn->chunk_result;
+ conn->chunk_result = NULL;
+ break;
+ }
+
res = pqPrepareAsyncResult(conn);
/* Advance the queue as appropriate */
@@ -3170,10 +3248,11 @@ pqPipelineProcessQueue(PGconn *conn)
}
/*
- * Reset single-row processing mode. (Client has to set it up for each
+ * Reset to full result sets mode. (Client has to set it up for each
* query, if desired.)
*/
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
/*
* If there are no further commands to process in the queue, get us in
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 97762d56f5..002ed772c8 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -109,8 +109,9 @@ typedef enum
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
- PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort
+ PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
+ PGRES_TUPLES_CHUNK /* set of tuples from larger resultset */
} ExecStatusType;
typedef enum
@@ -463,6 +464,7 @@ extern int PQsendQueryPrepared(PGconn *conn,
const int *paramFormats,
int resultFormat);
extern int PQsetSingleRowMode(PGconn *conn);
+extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize);
extern PGresult *PQgetResult(PGconn *conn);
/* Routines for managing an asynchronous query */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 7888199b0d..da51fa1ee6 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -431,6 +431,8 @@ struct pg_conn
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
+ int rowsChunkSize; /* non-zero to return query results by chunks
+ * not exceeding that number of rows */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
PGnotify *notifyHead; /* oldest unreported Notify msg */
@@ -536,7 +538,10 @@ struct pg_conn
*/
PGresult *result; /* result being constructed */
bool error_result; /* do we need to make an ERROR result? */
- PGresult *next_result; /* next result (used in single-row mode) */
+ PGresult *next_result; /* next result (used in single-row and
+ * by-chunks modes) */
+ PGresult *chunk_result; /* current chunk of results (limited to
+ * rowsChunkSize) */
/* Assorted state for SASL, SSL, GSS, etc */
const pg_fe_sasl_mech *sasl;
--
2.34.1
v5-0002-Reimplement-FETCH_COUNT-with-the-chunked-mode-in-.patchtext/plainDownload
From 2b4c45804408f94835d4ed4a7869649de143193c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Tue, 2 Jan 2024 14:17:18 +0100
Subject: [PATCH v5 2/2] Reimplement FETCH_COUNT with the chunked mode in libpq
instead of cursors.
Cursors were used only when the command starts with the keyword "SELECT",
excluding queries that start with "WITH" or "UPDATE" or "INSERT" that
may also return large result sets.
---
src/bin/psql/common.c | 545 ++++++++++-------------------
src/bin/psql/t/001_basic.pl | 6 +-
src/test/regress/expected/psql.out | 9 +-
src/test/regress/sql/psql.sql | 4 +-
4 files changed, 196 insertions(+), 368 deletions(-)
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index daabf6f12b..adb915e5c2 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -31,7 +31,6 @@
#include "settings.h"
static bool DescribeQuery(const char *query, double *elapsed_msec);
-static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static int ExecQueryAndProcessResults(const char *query,
double *elapsed_msec,
bool *svpt_gone_p,
@@ -40,8 +39,6 @@ static int ExecQueryAndProcessResults(const char *query,
const printQueryOpt *opt,
FILE *printQueryFout);
static bool command_no_begin(const char *query);
-static bool is_select_command(const char *query);
-
/*
* openQueryOutputFile --- attempt to open a query output file
@@ -373,6 +370,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_TUPLES_CHUNK:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -1131,16 +1129,10 @@ SendQuery(const char *query)
/* Describe query's result columns, without executing it */
OK = DescribeQuery(query, &elapsed_msec);
}
- else if (pset.fetch_count <= 0 || pset.gexec_flag ||
- pset.crosstab_flag || !is_select_command(query))
- {
- /* Default fetch-it-all-and-print mode */
- OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
- }
else
{
- /* Fetch-in-segments mode */
- OK = ExecQueryUsingCursor(query, &elapsed_msec);
+ /* Default fetch-and-print mode */
+ OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
if (!OK && pset.echo == PSQL_ECHO_ERRORS)
@@ -1392,6 +1384,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_TUPLES_CHUNK ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ SetShellResultVariables(pclose(gfile_fout));
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1424,10 +1457,16 @@ ExecQueryAndProcessResults(const char *query,
bool return_early = false;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ int64 total_tuples = 0;
+ int flush_error = 0;
+ bool is_pager = false;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
@@ -1450,6 +1489,29 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-chunks fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetChunkedRowsMode(pset.db, fetch_count))
+ {
+ pg_log_warning("fetching results in chunks mode is unavailable");
+ fetch_count = 0;
+ }
+ }
+ else
+ fetch_count = 0; /* fetch one resultset per query */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1473,6 +1535,8 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ /* whether the output starts before results are fully fetched */
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1568,20 +1632,9 @@ ExecQueryAndProcessResults(const char *query,
}
else if (pset.gfname)
{
- /* send to \g file, which we may have opened already */
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- copy_stream = gfile_fout;
- }
- else
- success = false;
- }
- else
+ /* COPY followed by \g filename or \g |program */
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (success)
copy_stream = gfile_fout;
}
else
@@ -1599,6 +1652,90 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK)
+ {
+ FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
+ printQueryOpt my_popt = pset.popt;
+
+ total_tuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ /* pager: open at most once per resultset */
+ if (tuples_fout == stdout && !is_pager)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+ /* display the current chunk of results unless the output stream is not working */
+ if (!flush_error)
+ {
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += PQntuples(result);
+ total_tuples += PQntuples(result);
+
+ ClearOrSaveResult(result);
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the fetch sequence.
+ */
+ success = false;
+ if (is_pager)
+ ClosePager(tuples_fout);
+ break;
+ }
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /*
+ * The last row has been read. Display the footer.
+ */
+ my_popt.topt.stop_table = true;
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ total_tuples += PQntuples(result);
+
+ if (is_pager)
+ ClosePager(tuples_fout);
+ ClearOrSaveResult(result);
+ result = NULL;
+ break;
+ }
+ else if (PQresultStatus(result) != PGRES_TUPLES_CHUNK)
+ {
+ /*
+ * Error. We expect either PGRES_TUPLES_CHUNK or
+ * PGRES_TUPLES_OK.
+ */
+ if (is_pager)
+ ClosePager(tuples_fout);
+ success = false;
+ AcceptResult(result, true); /* display error whenever appropriate */
+ SetResultVariables(result, success);
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1627,7 +1764,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1636,32 +1773,33 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables from last result */
if (!is_watch && last)
- SetResultVariables(result, success);
+ {
+ if (!partial_display)
+ SetResultVariables(result, success);
+ else if (success)
+ {
+ /*
+ * fake SetResultVariables(). If an error occurred when
+ * retrieving chunks, these variables have been set already.
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ }
ClearOrSaveResult(result);
result = next_result;
@@ -1673,17 +1811,7 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- SetShellResultVariables(pclose(gfile_fout));
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
@@ -1696,274 +1824,6 @@ ExecQueryAndProcessResults(const char *query,
}
-/*
- * ExecQueryUsingCursor: run a SELECT-like query using a cursor
- *
- * This feature allows result sets larger than RAM to be dealt with.
- *
- * Returns true if the query executed successfully, false otherwise.
- *
- * If pset.timing is on, total query time (exclusive of result-printing) is
- * stored into *elapsed_msec.
- */
-static bool
-ExecQueryUsingCursor(const char *query, double *elapsed_msec)
-{
- bool OK = true;
- PGresult *result;
- PQExpBufferData buf;
- printQueryOpt my_popt = pset.popt;
- bool timing = pset.timing;
- FILE *fout;
- bool is_pipe;
- bool is_pager = false;
- bool started_txn = false;
- int64 total_tuples = 0;
- int ntuples;
- int fetch_count;
- char fetch_cmd[64];
- instr_time before,
- after;
- int flush_error;
-
- *elapsed_msec = 0;
-
- /* initialize print options for partial table output */
- my_popt.topt.start_table = true;
- my_popt.topt.stop_table = false;
- my_popt.topt.prior_records = 0;
-
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
- else
- INSTR_TIME_SET_ZERO(before);
-
- /* if we're not in a transaction, start one */
- if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
- {
- result = PQexec(pset.db, "BEGIN");
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- if (!OK)
- return false;
- started_txn = true;
- }
-
- /* Send DECLARE CURSOR */
- initPQExpBuffer(&buf);
- appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
- query);
-
- result = PQexec(pset.db, buf.data);
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- if (!OK)
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- termPQExpBuffer(&buf);
- if (!OK)
- goto cleanup;
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- /*
- * In \gset mode, we force the fetch count to be 2, so that we will throw
- * the appropriate error if the query returns more than one row.
- */
- if (pset.gset_prefix)
- fetch_count = 2;
- else
- fetch_count = pset.fetch_count;
-
- snprintf(fetch_cmd, sizeof(fetch_cmd),
- "FETCH FORWARD %d FROM _psql_cursor",
- fetch_count);
-
- /* prepare to write output to \g argument, if any */
- if (pset.gfname)
- {
- if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
- {
- OK = false;
- goto cleanup;
- }
- if (is_pipe)
- disable_sigpipe_trap();
- }
- else
- {
- fout = pset.queryFout;
- is_pipe = false; /* doesn't matter */
- }
-
- /* clear any pre-existing error indication on the output stream */
- clearerr(fout);
-
- for (;;)
- {
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /* get fetch_count tuples at a time */
- result = PQexec(pset.db, fetch_cmd);
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- if (PQresultStatus(result) != PGRES_TUPLES_OK)
- {
- /* shut down pager before printing error message */
- if (is_pager)
- {
- ClosePager(fout);
- is_pager = false;
- }
-
- OK = AcceptResult(result, true);
- Assert(!OK);
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- break;
- }
-
- if (pset.gset_prefix)
- {
- /* StoreQueryTuple will complain if not exactly one row */
- OK = StoreQueryTuple(result);
- ClearOrSaveResult(result);
- break;
- }
-
- /*
- * Note we do not deal with \gdesc, \gexec or \crosstabview modes here
- */
-
- ntuples = PQntuples(result);
- total_tuples += ntuples;
-
- if (ntuples < fetch_count)
- {
- /* this is the last result set, so allow footer decoration */
- my_popt.topt.stop_table = true;
- }
- else if (fout == stdout && !is_pager)
- {
- /*
- * If query requires multiple result sets, hack to ensure that
- * only one pager instance is used for the whole mess
- */
- fout = PageOutput(INT_MAX, &(my_popt.topt));
- is_pager = true;
- }
-
- printQuery(result, &my_popt, fout, is_pager, pset.logfile);
-
- ClearOrSaveResult(result);
-
- /* after the first result set, disallow header decoration */
- my_popt.topt.start_table = false;
- my_popt.topt.prior_records += ntuples;
-
- /*
- * Make sure to flush the output stream, so intermediate results are
- * visible to the client immediately. We check the results because if
- * the pager dies/exits/etc, there's no sense throwing more data at
- * it.
- */
- flush_error = fflush(fout);
-
- /*
- * Check if we are at the end, if a cancel was pressed, or if there
- * were any errors either trying to flush out the results, or more
- * generally on the output stream at all. If we hit any errors
- * writing things to the stream, we presume $PAGER has disappeared and
- * stop bothering to pull down more data.
- */
- if (ntuples < fetch_count || cancel_pressed || flush_error ||
- ferror(fout))
- break;
- }
-
- if (pset.gfname)
- {
- /* close \g argument file/pipe */
- if (is_pipe)
- {
- SetShellResultVariables(pclose(fout));
- restore_sigpipe_trap();
- }
- else
- fclose(fout);
- }
- else if (is_pager)
- {
- /* close transient pager */
- ClosePager(fout);
- }
-
- if (OK)
- {
- /*
- * We don't have a PGresult here, and even if we did it wouldn't have
- * the right row count, so fake SetResultVariables(). In error cases,
- * we already set the result variables above.
- */
- char buf[32];
-
- SetVariable(pset.vars, "ERROR", "false");
- SetVariable(pset.vars, "SQLSTATE", "00000");
- snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
- SetVariable(pset.vars, "ROW_COUNT", buf);
- }
-
-cleanup:
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /*
- * We try to close the cursor on either success or failure, but on failure
- * ignore the result (it's probably just a bleat about being in an aborted
- * transaction)
- */
- result = PQexec(pset.db, "CLOSE _psql_cursor");
- if (OK)
- {
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
- else
- PQclear(result);
-
- if (started_txn)
- {
- result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
- OK &= AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- return OK;
-}
-
-
/*
* Advance the given char pointer over white space and SQL comments.
*/
@@ -2243,43 +2103,6 @@ command_no_begin(const char *query)
}
-/*
- * Check whether the specified command is a SELECT (or VALUES).
- */
-static bool
-is_select_command(const char *query)
-{
- int wordlen;
-
- /*
- * First advance over any whitespace, comments and left parentheses.
- */
- for (;;)
- {
- query = skip_white_space(query);
- if (query[0] == '(')
- query++;
- else
- break;
- }
-
- /*
- * Check word length (since "selectx" is not "select").
- */
- wordlen = 0;
- while (isalpha((unsigned char) query[wordlen]))
- wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
-
- if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
- return true;
-
- if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
- return true;
-
- return false;
-}
-
-
/*
* Test if the current user is a database superuser.
*/
diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl
index 95f4e60ab2..62a5d0f383 100644
--- a/src/bin/psql/t/001_basic.pl
+++ b/src/bin/psql/t/001_basic.pl
@@ -161,7 +161,7 @@ psql_like(
'\errverbose with no previous error');
# There are three main ways to run a query that might affect
-# \errverbose: The normal way, using a cursor by setting FETCH_COUNT,
+# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT,
# and using \gdesc. Test them all.
like(
@@ -184,10 +184,10 @@ like(
"\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose",
on_error_stop => 0))[2],
qr/\A^psql:<stdin>:2: ERROR: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^LOCATION: .*$/m,
'\errverbose after FETCH_COUNT query with error');
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 5d61e4c7bb..0aa7a6dae1 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -4754,7 +4754,7 @@ number of rows: 0
last error message: syntax error at end of input
\echo 'last error code:' :LAST_ERROR_SQLSTATE
last error code: 42601
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
unique2
@@ -4786,7 +4786,7 @@ error: false
error code: 00000
\echo 'number of rows:' :ROW_COUNT
number of rows: 19
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
?column?
----------
@@ -4800,6 +4800,11 @@ select 1/(15-unique2) from tenk1 order by unique2 limit 19;
0
0
0
+ 0
+ 0
+ 0
+ 0
+ 1
ERROR: division by zero
\echo 'error:' :ERROR
error: true
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index f199d624d3..0d45dc2d95 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -1160,14 +1160,14 @@ SELECT 4 AS \gdesc
\echo 'last error message:' :LAST_ERROR_MESSAGE
\echo 'last error code:' :LAST_ERROR_SQLSTATE
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
\echo 'number of rows:' :ROW_COUNT
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
--
2.34.1
On Tue, 2 Jan 2024 at 20:28, Daniel Verite <daniel@manitou-mail.org> wrote:
Hi,
PFA a rebased version.
CFBot shows that the patch does not apply anymore as in [1]http://cfbot.cputube.org/patch_46_4233.log:
=== Applying patches on top of PostgreSQL commit ID
a3a836fb5e51183eae624d43225279306c2285b8 ===
=== applying patch
./v5-0001-Implement-retrieval-of-results-in-chunks-with-lib.patch
patching file doc/src/sgml/libpq.sgml
...
patching file src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
...
patching file src/interfaces/libpq/exports.txt
Hunk #1 FAILED at 191.
1 out of 1 hunk FAILED -- saving rejects to file
src/interfaces/libpq/exports.txt.rej
Please post an updated version for the same.
[1]: http://cfbot.cputube.org/patch_46_4233.log
Regards,
Vignesh
vignesh C wrote:
patching file src/interfaces/libpq/exports.txt
Hunk #1 FAILED at 191.
1 out of 1 hunk FAILED -- saving rejects to file
src/interfaces/libpq/exports.txt.rejPlease post an updated version for the same.
PFA a rebased version.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
v6-0001-Implement-retrieval-of-results-in-chunks-with-lib.patchtext/plainDownload
From 8cfb82b1e36e96996637948a231ae35b9af1e074 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Tue, 30 Jan 2024 14:38:21 +0100
Subject: [PATCH v6 1/2] Implement retrieval of results in chunks with libpq.
This mode is similar to the single-row mode except that chunks
of results contain up to N rows instead of a single row.
It is meant to reduce the overhead of the row-by-row allocations
for large result sets.
The mode is selected with PQsetChunkedRowsMode(int maxRows) and results
have the new status code PGRES_TUPLES_CHUNK.
---
doc/src/sgml/libpq.sgml | 96 ++++++++++----
.../libpqwalreceiver/libpqwalreceiver.c | 1 +
src/bin/pg_amcheck/pg_amcheck.c | 1 +
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-exec.c | 117 +++++++++++++++---
src/interfaces/libpq/libpq-fe.h | 4 +-
src/interfaces/libpq/libpq-int.h | 7 +-
7 files changed, 184 insertions(+), 43 deletions(-)
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d0d5aefadc..f7f5a04df6 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3537,7 +3537,20 @@ ExecStatusType PQresultStatus(const PGresult *res);
The <structname>PGresult</structname> contains a single result tuple
from the current command. This status occurs only when
single-row mode has been selected for the query
- (see <xref linkend="libpq-single-row-mode"/>).
+ (see <xref linkend="libpq-chunked-results-modes"/>).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pgres-tuples-chunk">
+ <term><literal>PGRES_TUPLES_CHUNK</literal></term>
+ <listitem>
+ <para>
+ The <structname>PGresult</structname> contains several tuples
+ from the current command. The count of tuples cannot exceed
+ the maximum passed to <xref linkend="libpq-PQsetChunkedRowsMode"/>.
+ This status occurs only when the chunked mode has been selected
+ for the query (see <xref linkend="libpq-chunked-results-modes"/>).
</para>
</listitem>
</varlistentry>
@@ -5189,8 +5202,8 @@ PGresult *PQgetResult(PGconn *conn);
<para>
Another frequently-desired feature that can be obtained with
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/>
- is retrieving large query results a row at a time. This is discussed
- in <xref linkend="libpq-single-row-mode"/>.
+ is retrieving large query results a limited number of rows at a time. This is discussed
+ in <xref linkend="libpq-chunked-results-modes"/>.
</para>
<para>
@@ -5554,12 +5567,13 @@ int PQflush(PGconn *conn);
</para>
<para>
- To enter single-row mode, call <function>PQsetSingleRowMode</function>
- before retrieving results with <function>PQgetResult</function>.
- This mode selection is effective only for the query currently
- being processed. For more information on the use of
- <function>PQsetSingleRowMode</function>,
- refer to <xref linkend="libpq-single-row-mode"/>.
+ To enter single-row or chunked modes, call
+ respectively <function>PQsetSingleRowMode</function>
+ or <function>PQsetChunkedRowsMode</function> before retrieving results
+ with <function>PQgetResult</function>. This mode selection is effective
+ only for the query currently being processed. For more information on the
+ use of these functions refer
+ to <xref linkend="libpq-chunked-results-modes" />.
</para>
<para>
@@ -5926,10 +5940,10 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</sect2>
</sect1>
- <sect1 id="libpq-single-row-mode">
- <title>Retrieving Query Results Row-by-Row</title>
+ <sect1 id="libpq-chunked-results-modes">
+ <title>Retrieving Query Results by chunks</title>
- <indexterm zone="libpq-single-row-mode">
+ <indexterm zone="libpq-chunked-results-modes">
<primary>libpq</primary>
<secondary>single-row mode</secondary>
</indexterm>
@@ -5940,13 +5954,15 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
<structname>PGresult</structname>. This can be unworkable for commands
that return a large number of rows. For such cases, applications can use
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/> in
- <firstterm>single-row mode</firstterm>. In this mode, the result row(s) are
- returned to the application one at a time, as they are received from the
- server.
+ <firstterm>single-row mode</firstterm> or <firstterm>chunked mode</firstterm>.
+ In these modes, the result row(s) are returned to the application one at a
+ time for the single-row mode and by chunks for the chunked mode, as they
+ are received from the server.
</para>
<para>
- To enter single-row mode, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ To enter these modes, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ or <xref linkend="libpq-PQsetChunkedRowsMode"/>
immediately after a successful call of <xref linkend="libpq-PQsendQuery"/>
(or a sibling function). This mode selection is effective only for the
currently executing query. Then call <xref linkend="libpq-PQgetResult"/>
@@ -5954,7 +5970,8 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
linkend="libpq-async"/>. If the query returns any rows, they are returned
as individual <structname>PGresult</structname> objects, which look like
normal query results except for having status code
- <literal>PGRES_SINGLE_TUPLE</literal> instead of
+ <literal>PGRES_SINGLE_TUPLE</literal> for the single-row mode and
+ <literal>PGRES_TUPLES_CHUNK</literal> for the chunked mode, instead of
<literal>PGRES_TUPLES_OK</literal>. After the last row, or immediately if
the query returns zero rows, a zero-row object with status
<literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
@@ -5967,9 +5984,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</para>
<para>
- When using pipeline mode, single-row mode needs to be activated for each
- query in the pipeline before retrieving results for that query
- with <function>PQgetResult</function>.
+ When using pipeline mode, the single-row or chunked mode need to be
+ activated for each query in the pipeline before retrieving results for that
+ query with <function>PQgetResult</function>.
See <xref linkend="libpq-pipeline-mode"/> for more information.
</para>
@@ -6003,14 +6020,49 @@ int PQsetSingleRowMode(PGconn *conn);
</variablelist>
</para>
+ <para>
+ <variablelist>
+ <varlistentry id="libpq-PQsetChunkedRowsMode">
+ <term><function>PQsetChunkedRowsMode</function>
+ <indexterm><primary>PQsetChunkedRowsMode</primary></indexterm></term>
+ <listitem>
+ <para>
+ Select the mode retrieving results in chunks for the currently-executing query.
+
+<synopsis>
+ int PQsetChunkedRowsMode(PGconn *conn,
+ int maxRows);
+</synopsis>
+ </para>
+
+ <para>
+ This function is similar to <xref linkend="libpq-PQsetSingleRowMode"/>,
+ except that it can retrieve a user-specified number of rows
+ per call to <xref linkend="libpq-PQgetResult"/>, instead of a single row.
+ This function can only be called immediately after
+ <xref linkend="libpq-PQsendQuery"/> or one of its sibling functions,
+ before any other operation on the connection such as
+ <xref linkend="libpq-PQconsumeInput"/> or
+ <xref linkend="libpq-PQgetResult"/>. If called at the correct time,
+ the function activates the chunked mode for the current query and
+ returns 1. Otherwise the mode stays unchanged and the function
+ returns 0. In any case, the mode reverts to normal after
+ completion of the current query.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
<caution>
<para>
While processing a query, the server may return some rows and then
encounter an error, causing the query to be aborted. Ordinarily,
<application>libpq</application> discards any such rows and reports only the
- error. But in single-row mode, those rows will have already been
+ error. But in single-row or chunked modes, those rows will have already been
returned to the application. Hence, the application will see some
- <literal>PGRES_SINGLE_TUPLE</literal> <structname>PGresult</structname>
+ <literal>PGRES_SINGLE_TUPLE</literal> or <literal>PGRES_TUPLES_CHUNK</literal>
+ <structname>PGresult</structname>
objects followed by a <literal>PGRES_FATAL_ERROR</literal> object. For
proper transactional behavior, the application must be designed to
discard or undo whatever has been done with the previously-processed
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 2439733b55..97111d966d 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1183,6 +1183,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
switch (PQresultStatus(pgres))
{
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_TUPLES_OK:
walres->status = WALRCV_OK_TUPLES;
libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index e5f9eedc47..728305a7cf 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -989,6 +989,7 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return false;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 088592deb1..20effc8337 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -193,3 +193,4 @@ PQsendClosePrepared 190
PQsendClosePortal 191
PQchangePassword 192
PQsendPipelineSync 193
+PQsetChunkedRowsMode 194
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index c02a9180b2..b9a73b583e 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -41,7 +41,8 @@ char *const pgresStatus[] = {
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
- "PGRES_PIPELINE_ABORTED"
+ "PGRES_PIPELINE_ABORTED",
+ "PGRES_TUPLES_CHUNK"
};
/* We return this if we're unable to make a PGresult at all */
@@ -83,7 +84,7 @@ static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush);
static int pqPipelineFlush(PGconn *conn);
-
+static bool canChangeRowMode(PGconn *conn);
/* ----------------
* Space management for PGresult.
@@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
/* non-error cases */
break;
default:
@@ -913,8 +915,9 @@ pqPrepareAsyncResult(PGconn *conn)
/*
* Replace conn->result with next_result, if any. In the normal case
* there isn't a next result and we're just dropping ownership of the
- * current result. In single-row mode this restores the situation to what
- * it was before we created the current single-row result.
+ * current result. In single-row and chunked modes this restores the
+ * situation to what it was before we created the current single-row or
+ * chunk-of-rows result.
*/
conn->result = conn->next_result;
conn->error_result = false; /* next_result is never an error */
@@ -1200,10 +1203,11 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
* (Such a string should already be translated via libpq_gettext().)
* If it is left NULL, the error is presumed to be "out of memory".
*
- * In single-row mode, we create a new result holding just the current row,
- * stashing the previous result in conn->next_result so that it becomes
- * active again after pqPrepareAsyncResult(). This allows the result metadata
- * (column descriptions) to be carried forward to each result row.
+ * In single-row or chunked mode, we create a new result holding just the
+ * current set of rows, stashing the previous result in conn->next_result so
+ * that it becomes active again after pqPrepareAsyncResult(). This allows the
+ * result metadata (column descriptions) to be carried forward to each result
+ * row.
*/
int
pqRowProcessor(PGconn *conn, const char **errmsgp)
@@ -1228,6 +1232,28 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
if (!res)
return 0;
}
+ else if (conn->rowsChunkSize > 0)
+ {
+ /*
+ * In chunked mode, make a new PGresult that will hold N rows; the
+ * original conn->result is left unchanged, as in the single-row mode.
+ */
+ if (!conn->chunk_result)
+ {
+ /* Allocate and initialize the result to hold a chunk of rows */
+ res = PQcopyResult(res,
+ PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
+ PG_COPYRES_NOTICEHOOKS);
+ if (!res)
+ return 0;
+ /* Change result status to special chunk-of-rows value */
+ res->resultStatus = PGRES_TUPLES_CHUNK;
+ /* Keep this result to reuse for the next rows of the chunk */
+ conn->chunk_result = res;
+ }
+ else
+ res = conn->chunk_result; /* Use the current chunk */
+ }
/*
* Basically we just allocate space in the PGresult for each field and
@@ -1290,6 +1316,21 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->asyncStatus = PGASYNC_READY_MORE;
}
+ /*
+ * In chunked mode, if the count has reached the requested limit, make the
+ * rows of the current chunk available immediately.
+ */
+ else if (conn->rowsChunkSize > 0 && res->ntups >= conn->rowsChunkSize)
+ {
+ /* Stash old result for re-use later */
+ conn->next_result = conn->result;
+ conn->result = res;
+ /* Do not reuse that chunk of results */
+ conn->chunk_result = NULL;
+ /* And mark the result ready to return */
+ conn->asyncStatus = PGASYNC_READY_MORE;
+ }
+
return 1;
fail:
@@ -1745,8 +1786,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
*/
pqClearAsyncResult(conn);
- /* reset single-row processing mode */
+ /* reset row-by-row and chunked processing modes */
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
}
/* ready to send command message */
@@ -1930,25 +1972,51 @@ sendFailed:
*/
int
PQsetSingleRowMode(PGconn *conn)
+{
+ if (canChangeRowMode(conn))
+ {
+ conn->singleRowMode = true;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+/*
+ * Select chunked results processing mode
+ */
+int
+PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
+{
+ if (chunkSize >= 0 && canChangeRowMode(conn))
+ {
+ conn->rowsChunkSize = chunkSize;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+static
+bool
+canChangeRowMode(PGconn *conn)
{
/*
- * Only allow setting the flag when we have launched a query and not yet
- * received any results.
+ * Only allow setting the row-by-row or by-chunks modes when we have
+ * launched a query and not yet received any results.
*/
if (!conn)
- return 0;
+ return false;
if (conn->asyncStatus != PGASYNC_BUSY)
- return 0;
+ return false;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
- return 0;
+ return false;
if (pgHavePendingResult(conn))
- return 0;
+ return false;
- /* OK, set flag */
- conn->singleRowMode = true;
- return 1;
+ return true;
}
/*
@@ -2115,6 +2183,16 @@ PQgetResult(PGconn *conn)
break;
case PGASYNC_READY:
+ /*
+ * If there is a pending chunk of results, return it
+ */
+ if (conn->chunk_result != NULL)
+ {
+ res = conn->chunk_result;
+ conn->chunk_result = NULL;
+ break;
+ }
+
res = pqPrepareAsyncResult(conn);
/* Advance the queue as appropriate */
@@ -3173,10 +3251,11 @@ pqPipelineProcessQueue(PGconn *conn)
}
/*
- * Reset single-row processing mode. (Client has to set it up for each
+ * Reset to full result sets mode. (Client has to set it up for each
* query, if desired.)
*/
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
/*
* If there are no further commands to process in the queue, get us in
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index defc415fa3..21de21fbe8 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -109,8 +109,9 @@ typedef enum
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
- PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort
+ PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
+ PGRES_TUPLES_CHUNK /* set of tuples from larger resultset */
} ExecStatusType;
typedef enum
@@ -463,6 +464,7 @@ extern int PQsendQueryPrepared(PGconn *conn,
const int *paramFormats,
int resultFormat);
extern int PQsetSingleRowMode(PGconn *conn);
+extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize);
extern PGresult *PQgetResult(PGconn *conn);
/* Routines for managing an asynchronous query */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index ff8e0dce77..76130e2912 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -431,6 +431,8 @@ struct pg_conn
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
+ int rowsChunkSize; /* non-zero to return query results by chunks
+ * not exceeding that number of rows */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
PGnotify *notifyHead; /* oldest unreported Notify msg */
@@ -536,7 +538,10 @@ struct pg_conn
*/
PGresult *result; /* result being constructed */
bool error_result; /* do we need to make an ERROR result? */
- PGresult *next_result; /* next result (used in single-row mode) */
+ PGresult *next_result; /* next result (used in single-row and
+ * by-chunks modes) */
+ PGresult *chunk_result; /* current chunk of results (limited to
+ * rowsChunkSize) */
/* Assorted state for SASL, SSL, GSS, etc */
const pg_fe_sasl_mech *sasl;
--
2.34.1
v6-0002-Reimplement-FETCH_COUNT-with-the-chunked-mode-in-.patchtext/plainDownload
From 0c551ad95357b524eed7e6d12da1332f3b1db5b9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Tue, 30 Jan 2024 14:40:12 +0100
Subject: [PATCH v6 2/2] Reimplement FETCH_COUNT with the chunked mode in
libpq.
Cursors were used only when the command starts with the keyword "SELECT",
excluding queries that start with "WITH" or "UPDATE" or "INSERT" that
may also return large result sets.
---
src/bin/psql/common.c | 545 ++++++++++-------------------
src/bin/psql/t/001_basic.pl | 6 +-
src/test/regress/expected/psql.out | 9 +-
src/test/regress/sql/psql.sql | 4 +-
4 files changed, 196 insertions(+), 368 deletions(-)
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 76e01b02a3..29e352ddde 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -31,7 +31,6 @@
#include "settings.h"
static bool DescribeQuery(const char *query, double *elapsed_msec);
-static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static int ExecQueryAndProcessResults(const char *query,
double *elapsed_msec,
bool *svpt_gone_p,
@@ -40,8 +39,6 @@ static int ExecQueryAndProcessResults(const char *query,
const printQueryOpt *opt,
FILE *printQueryFout);
static bool command_no_begin(const char *query);
-static bool is_select_command(const char *query);
-
/*
* openQueryOutputFile --- attempt to open a query output file
@@ -373,6 +370,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_TUPLES_CHUNK:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -1131,16 +1129,10 @@ SendQuery(const char *query)
/* Describe query's result columns, without executing it */
OK = DescribeQuery(query, &elapsed_msec);
}
- else if (pset.fetch_count <= 0 || pset.gexec_flag ||
- pset.crosstab_flag || !is_select_command(query))
- {
- /* Default fetch-it-all-and-print mode */
- OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
- }
else
{
- /* Fetch-in-segments mode */
- OK = ExecQueryUsingCursor(query, &elapsed_msec);
+ /* Default fetch-and-print mode */
+ OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
if (!OK && pset.echo == PSQL_ECHO_ERRORS)
@@ -1392,6 +1384,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_TUPLES_CHUNK ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ SetShellResultVariables(pclose(gfile_fout));
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1424,10 +1457,16 @@ ExecQueryAndProcessResults(const char *query,
bool return_early = false;
instr_time before,
after;
+ int fetch_count = pset.fetch_count;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ int64 total_tuples = 0;
+ int flush_error = 0;
+ bool is_pager = false;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
@@ -1450,6 +1489,29 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * If FETCH_COUNT is set and the context allows it, use the single row
+ * mode to fetch results and have no more than FETCH_COUNT rows in
+ * memory.
+ */
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ /*
+ * The row-by-chunks fetch is not enabled when SHOW_ALL_RESULTS is false,
+ * since we would need to accumulate all rows before knowing
+ * whether they need to be discarded or displayed, which contradicts
+ * FETCH_COUNT.
+ */
+ if (!PQsetChunkedRowsMode(pset.db, fetch_count))
+ {
+ pg_log_warning("fetching results in chunks mode is unavailable");
+ fetch_count = 0;
+ }
+ }
+ else
+ fetch_count = 0; /* fetch one resultset per query */
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1473,6 +1535,8 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ /* whether the output starts before results are fully fetched */
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1568,20 +1632,9 @@ ExecQueryAndProcessResults(const char *query,
}
else if (pset.gfname)
{
- /* send to \g file, which we may have opened already */
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- copy_stream = gfile_fout;
- }
- else
- success = false;
- }
- else
+ /* COPY followed by \g filename or \g |program */
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (success)
copy_stream = gfile_fout;
}
else
@@ -1599,6 +1652,90 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK)
+ {
+ FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
+ printQueryOpt my_popt = pset.popt;
+
+ total_tuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ /* pager: open at most once per resultset */
+ if (tuples_fout == stdout && !is_pager)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+ /* display the current chunk of results unless the output stream is not working */
+ if (!flush_error)
+ {
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += PQntuples(result);
+ total_tuples += PQntuples(result);
+
+ ClearOrSaveResult(result);
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the fetch sequence.
+ */
+ success = false;
+ if (is_pager)
+ ClosePager(tuples_fout);
+ break;
+ }
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /*
+ * The last row has been read. Display the footer.
+ */
+ my_popt.topt.stop_table = true;
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ total_tuples += PQntuples(result);
+
+ if (is_pager)
+ ClosePager(tuples_fout);
+ ClearOrSaveResult(result);
+ result = NULL;
+ break;
+ }
+ else if (PQresultStatus(result) != PGRES_TUPLES_CHUNK)
+ {
+ /*
+ * Error. We expect either PGRES_TUPLES_CHUNK or
+ * PGRES_TUPLES_OK.
+ */
+ if (is_pager)
+ ClosePager(tuples_fout);
+ success = false;
+ AcceptResult(result, true); /* display error whenever appropriate */
+ SetResultVariables(result, success);
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1627,7 +1764,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1636,32 +1773,33 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables from last result */
if (!is_watch && last)
- SetResultVariables(result, success);
+ {
+ if (!partial_display)
+ SetResultVariables(result, success);
+ else if (success)
+ {
+ /*
+ * fake SetResultVariables(). If an error occurred when
+ * retrieving chunks, these variables have been set already.
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ }
ClearOrSaveResult(result);
result = next_result;
@@ -1673,17 +1811,7 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- SetShellResultVariables(pclose(gfile_fout));
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
@@ -1696,274 +1824,6 @@ ExecQueryAndProcessResults(const char *query,
}
-/*
- * ExecQueryUsingCursor: run a SELECT-like query using a cursor
- *
- * This feature allows result sets larger than RAM to be dealt with.
- *
- * Returns true if the query executed successfully, false otherwise.
- *
- * If pset.timing is on, total query time (exclusive of result-printing) is
- * stored into *elapsed_msec.
- */
-static bool
-ExecQueryUsingCursor(const char *query, double *elapsed_msec)
-{
- bool OK = true;
- PGresult *result;
- PQExpBufferData buf;
- printQueryOpt my_popt = pset.popt;
- bool timing = pset.timing;
- FILE *fout;
- bool is_pipe;
- bool is_pager = false;
- bool started_txn = false;
- int64 total_tuples = 0;
- int ntuples;
- int fetch_count;
- char fetch_cmd[64];
- instr_time before,
- after;
- int flush_error;
-
- *elapsed_msec = 0;
-
- /* initialize print options for partial table output */
- my_popt.topt.start_table = true;
- my_popt.topt.stop_table = false;
- my_popt.topt.prior_records = 0;
-
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
- else
- INSTR_TIME_SET_ZERO(before);
-
- /* if we're not in a transaction, start one */
- if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
- {
- result = PQexec(pset.db, "BEGIN");
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- if (!OK)
- return false;
- started_txn = true;
- }
-
- /* Send DECLARE CURSOR */
- initPQExpBuffer(&buf);
- appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
- query);
-
- result = PQexec(pset.db, buf.data);
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- if (!OK)
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- termPQExpBuffer(&buf);
- if (!OK)
- goto cleanup;
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- /*
- * In \gset mode, we force the fetch count to be 2, so that we will throw
- * the appropriate error if the query returns more than one row.
- */
- if (pset.gset_prefix)
- fetch_count = 2;
- else
- fetch_count = pset.fetch_count;
-
- snprintf(fetch_cmd, sizeof(fetch_cmd),
- "FETCH FORWARD %d FROM _psql_cursor",
- fetch_count);
-
- /* prepare to write output to \g argument, if any */
- if (pset.gfname)
- {
- if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
- {
- OK = false;
- goto cleanup;
- }
- if (is_pipe)
- disable_sigpipe_trap();
- }
- else
- {
- fout = pset.queryFout;
- is_pipe = false; /* doesn't matter */
- }
-
- /* clear any pre-existing error indication on the output stream */
- clearerr(fout);
-
- for (;;)
- {
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /* get fetch_count tuples at a time */
- result = PQexec(pset.db, fetch_cmd);
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- if (PQresultStatus(result) != PGRES_TUPLES_OK)
- {
- /* shut down pager before printing error message */
- if (is_pager)
- {
- ClosePager(fout);
- is_pager = false;
- }
-
- OK = AcceptResult(result, true);
- Assert(!OK);
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- break;
- }
-
- if (pset.gset_prefix)
- {
- /* StoreQueryTuple will complain if not exactly one row */
- OK = StoreQueryTuple(result);
- ClearOrSaveResult(result);
- break;
- }
-
- /*
- * Note we do not deal with \gdesc, \gexec or \crosstabview modes here
- */
-
- ntuples = PQntuples(result);
- total_tuples += ntuples;
-
- if (ntuples < fetch_count)
- {
- /* this is the last result set, so allow footer decoration */
- my_popt.topt.stop_table = true;
- }
- else if (fout == stdout && !is_pager)
- {
- /*
- * If query requires multiple result sets, hack to ensure that
- * only one pager instance is used for the whole mess
- */
- fout = PageOutput(INT_MAX, &(my_popt.topt));
- is_pager = true;
- }
-
- printQuery(result, &my_popt, fout, is_pager, pset.logfile);
-
- ClearOrSaveResult(result);
-
- /* after the first result set, disallow header decoration */
- my_popt.topt.start_table = false;
- my_popt.topt.prior_records += ntuples;
-
- /*
- * Make sure to flush the output stream, so intermediate results are
- * visible to the client immediately. We check the results because if
- * the pager dies/exits/etc, there's no sense throwing more data at
- * it.
- */
- flush_error = fflush(fout);
-
- /*
- * Check if we are at the end, if a cancel was pressed, or if there
- * were any errors either trying to flush out the results, or more
- * generally on the output stream at all. If we hit any errors
- * writing things to the stream, we presume $PAGER has disappeared and
- * stop bothering to pull down more data.
- */
- if (ntuples < fetch_count || cancel_pressed || flush_error ||
- ferror(fout))
- break;
- }
-
- if (pset.gfname)
- {
- /* close \g argument file/pipe */
- if (is_pipe)
- {
- SetShellResultVariables(pclose(fout));
- restore_sigpipe_trap();
- }
- else
- fclose(fout);
- }
- else if (is_pager)
- {
- /* close transient pager */
- ClosePager(fout);
- }
-
- if (OK)
- {
- /*
- * We don't have a PGresult here, and even if we did it wouldn't have
- * the right row count, so fake SetResultVariables(). In error cases,
- * we already set the result variables above.
- */
- char buf[32];
-
- SetVariable(pset.vars, "ERROR", "false");
- SetVariable(pset.vars, "SQLSTATE", "00000");
- snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
- SetVariable(pset.vars, "ROW_COUNT", buf);
- }
-
-cleanup:
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /*
- * We try to close the cursor on either success or failure, but on failure
- * ignore the result (it's probably just a bleat about being in an aborted
- * transaction)
- */
- result = PQexec(pset.db, "CLOSE _psql_cursor");
- if (OK)
- {
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
- else
- PQclear(result);
-
- if (started_txn)
- {
- result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
- OK &= AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- return OK;
-}
-
-
/*
* Advance the given char pointer over white space and SQL comments.
*/
@@ -2243,43 +2103,6 @@ command_no_begin(const char *query)
}
-/*
- * Check whether the specified command is a SELECT (or VALUES).
- */
-static bool
-is_select_command(const char *query)
-{
- int wordlen;
-
- /*
- * First advance over any whitespace, comments and left parentheses.
- */
- for (;;)
- {
- query = skip_white_space(query);
- if (query[0] == '(')
- query++;
- else
- break;
- }
-
- /*
- * Check word length (since "selectx" is not "select").
- */
- wordlen = 0;
- while (isalpha((unsigned char) query[wordlen]))
- wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
-
- if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
- return true;
-
- if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
- return true;
-
- return false;
-}
-
-
/*
* Test if the current user is a database superuser.
*/
diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl
index 9f0b6cf8ca..b5fedbc091 100644
--- a/src/bin/psql/t/001_basic.pl
+++ b/src/bin/psql/t/001_basic.pl
@@ -161,7 +161,7 @@ psql_like(
'\errverbose with no previous error');
# There are three main ways to run a query that might affect
-# \errverbose: The normal way, using a cursor by setting FETCH_COUNT,
+# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT,
# and using \gdesc. Test them all.
like(
@@ -184,10 +184,10 @@ like(
"\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose",
on_error_stop => 0))[2],
qr/\A^psql:<stdin>:2: ERROR: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^LOCATION: .*$/m,
'\errverbose after FETCH_COUNT query with error');
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index ad02772562..46f998df12 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -4755,7 +4755,7 @@ number of rows: 0
last error message: syntax error at end of input
\echo 'last error code:' :LAST_ERROR_SQLSTATE
last error code: 42601
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
unique2
@@ -4787,7 +4787,7 @@ error: false
error code: 00000
\echo 'number of rows:' :ROW_COUNT
number of rows: 19
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
?column?
----------
@@ -4801,6 +4801,11 @@ select 1/(15-unique2) from tenk1 order by unique2 limit 19;
0
0
0
+ 0
+ 0
+ 0
+ 0
+ 1
ERROR: division by zero
\echo 'error:' :ERROR
error: true
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index 129f853353..33076cad79 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -1161,14 +1161,14 @@ SELECT 4 AS \gdesc
\echo 'last error message:' :LAST_ERROR_MESSAGE
\echo 'last error code:' :LAST_ERROR_SQLSTATE
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
\echo 'number of rows:' :ROW_COUNT
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
--
2.34.1
Hi Daniel,
On Tue, Jan 30, 2024 at 3:29 PM Daniel Verite <daniel@manitou-mail.org> wrote:
PFA a rebased version.
Thanks for the patch! I've tested it using my original reproducer and
it works great now against the original problem description. I've
taken a quick look at the patch, it looks good for me. I've tested
using -Werror for both gcc 10.2 and clang 11.0 and it was clean. I
have one slight doubt:
when I run with default pager (more or less):
\set FETCH_COUNT 1000
WITH data AS (SELECT generate_series(1, 20000000) as Total) select
repeat('a',100) || data.Total || repeat('b', 800) as total_pat from
data;
-- it enters pager, a skip couple of pages and then "q"
.. then - both backend and psql - go into 100% CPU as it were still
receiving (that doesn't happen e.g. with export PAGER=cat). So I'm
not sure, maybe ExecQueryAndProcessResults() should somewhat faster
abort when the $PAGER is exiting normally(?).
And oh , btw, in v6-0001 (so if you would be sending v7 for any other
reason -- other reviewers -- maybe worth realigning it as detail):
+ int PQsetChunkedRowsMode(PGconn *conn,
+ int maxRows);
but the code has (so "maxRows" != "chunkSize"):
+PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
-J.
Jakub Wartak wrote:
when I run with default pager (more or less):
\set FETCH_COUNT 1000
WITH data AS (SELECT generate_series(1, 20000000) as Total) select
repeat('a',100) || data.Total || repeat('b', 800) as total_pat from
data;
-- it enters pager, a skip couple of pages and then "q".. then - both backend and psql - go into 100% CPU as it were still
receiving
Thanks for looking into this patch!
What's happening after the pager has quit is that psql continues
to pump results from the server until there are no more results.
If the user wants to interrupt that, they should hit Ctrl+C to
cancel the query. I think psql should not cancel it implicitly
on their behalf, as it also cancels the transaction.
The behavior differs from the cursor implementation, because in
the cursor case, when the pager is displaying results, no query is
running. The previous FETCH results have been entirely
read, and the next FETCH has not been sent to the server yet.
This is why quitting the pager in the middle of this can
be dealt with instantly.
(that doesn't happen e.g. with export PAGER=cat). So I'm
not sure, maybe ExecQueryAndProcessResults() should somewhat
faster abort when the $PAGER is exiting normally(?).
I assume that when using PAGER=cat, you cancel the display
with Ctrl+C, which propagates to psql and have the effect
to also cancel the query. In that case it displays
"Cancel request sent",
and then shortly after it gets back from the server:
"ERROR: canceling statement due to user request".
That case corresponds to the generic query canceling flow.
OTOH if killing the "cat" process with kill -TERM I see the same
behavior than with "more" or "less", that is postgres running
the query to completion and psql pumping the results.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
On Tue, 2024-01-30 at 15:29 +0100, Daniel Verite wrote:
PFA a rebased version.
I had a look at patch 0001 (0002 will follow).
- <sect1 id="libpq-single-row-mode"> - <title>Retrieving Query Results Row-by-Row</title> + <sect1 id="libpq-chunked-results-modes"> + <title>Retrieving Query Results by chunks</title>
That should be "in chunks".
+ <para> + <variablelist> + <varlistentry id="libpq-PQsetChunkedRowsMode"> + <term><function>PQsetChunkedRowsMode</function> + <indexterm><primary>PQsetChunkedRowsMode</primary></indexterm></term> + <listitem> + <para> + Select the mode retrieving results in chunks for the currently-executing query.
That is questionable English. How about
Select to receive the results for the currently-executing query in chunks.
+ This function is similar to <xref linkend="libpq-PQsetSingleRowMode"/>, + except that it can retrieve a user-specified number of rows + per call to <xref linkend="libpq-PQgetResult"/>, instead of a single row.
The "user-specified number" is "maxRows". So a better wording would be:
... except that it can retrieve <replaceable>maxRows</replaceable> rows
per call to <xref linkend="libpq-PQgetResult"/> instead of a single row.
- error. But in single-row mode, those rows will have already been + error. But in single-row or chunked modes, those rows will have already been
I'd say it should be "in *the* single-row or chunk modes".
--- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" };
I think that PGRES_SINGLE_TUPLE and PGRES_TUPLES_CHUNK should be next to each
other, but that's no big thing.
The same applies to the change in src/interfaces/libpq/libpq-fe.h
I understand that we need to keep the single-row mode for compatibility
reasons. But I think that under the hood, "single-row mode" should be the
same as "chunk mode with chunk size one".
That should save some code repetition.
Yours,
Laurenz Albe
On Fri, 2024-03-29 at 14:07 +0100, Laurenz Albe wrote:
I had a look at patch 0001 (0002 will follow).
Here is the code review for patch number 2:
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
[...]
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
[...]
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
It makes sense to factor out this code.
But shouldn't these functions have a prototype at the beginning of the file?
+ /* + * If FETCH_COUNT is set and the context allows it, use the single row + * mode to fetch results and have no more than FETCH_COUNT rows in + * memory. + */
That comment talks about single-row mode, whey you are using chunked mode.
You probably forgot to modify the comment from a previous version of the patch.
+ if (fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch + && !pset.gset_prefix && pset.show_all_results) + { + /* + * The row-by-chunks fetch is not enabled when SHOW_ALL_RESULTS is false, + * since we would need to accumulate all rows before knowing + * whether they need to be discarded or displayed, which contradicts + * FETCH_COUNT. + */ + if (!PQsetChunkedRowsMode(pset.db, fetch_count)) + {
I think that comment should be before the "if" statement, not inside it.
Here is a suggestion for a consolidated comment:
Fetch the result in chunks if FETCH_COUNT is set. We don't enable chunking
if SHOW_ALL_RESULTS is false, since that requires us to accumulate all rows
before we can tell what should be displayed, which would counter the idea
of FETCH_COUNT. Chunk fetching is also disabled if \gset, \crosstab,
\gexec and \watch are used.
+ if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK)
Could it be that result_status == PGRES_TUPLES_CHUNK, but fetch_count is 0?
if not, perhaps there should be an Assert that verifies that, and the "if"
statement should only check for the latter condition.
--- a/src/bin/psql/t/001_basic.pl +++ b/src/bin/psql/t/001_basic.pl @@ -184,10 +184,10 @@ like( "\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose", on_error_stop => 0))[2], qr/\A^psql:<stdin>:2: ERROR: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$ ^ *^.*$ ^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$
Why does the output change? Perhaps there is a good and harmless
explanation, but the naïve expectation would be that it doesn't.
The patch does not apply any more because of a conflict with the
non-blocking PQcancel patch.
After fixing the problem manually, it builds without warning.
The regression tests pass, and the feature works as expected.
Yours,
Laurenz Albe
Laurenz Albe wrote:
I had a look at patch 0001 (0002 will follow).
Thanks for reviewing this!
I've implemented the suggested doc changes. A patch update
will follow with the next part of the review.
--- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" };I think that PGRES_SINGLE_TUPLE and PGRES_TUPLES_CHUNK should be next to
each other, but that's no big thing.
The same applies to the change in src/interfaces/libpq/libpq-fe.h
I assume we can't renumber/reorder existing values, otherwise it would be
an ABI break. We can only add new values.
I understand that we need to keep the single-row mode for compatibility
reasons. But I think that under the hood, "single-row mode" should be the
same as "chunk mode with chunk size one".
I've implemented it like that at first, and wasn't thrilled with the result.
libpq still has to return PGRES_SINGLE_TUPLE in single-row
mode and PGRES_TUPLES_CHUNK with chunks of size 1, so
the mutualization did not work that well in practice.
I also contemplated not creating PGRES_TUPLES_CHUNK
and instead using PGRES_SINGLE_TUPLE for N rows, but I found
it too ugly.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Laurenz Albe wrote:
Here is the code review for patch number 2:
+static void +CloseGOutput(FILE *gfile_fout, bool is_pipe)It makes sense to factor out this code.
But shouldn't these functions have a prototype at the beginning of the file?
Looking at the other static functions in psql/common.c, there
are 22 of them but only 3 have prototypes at the top of the file.
These 3 functions are called before being defined, so these prototypes
are mandatory.
The other static functions that are defined before being called happen
not to have forward declarations, so SetupGOutput() and CloseGOutput()
follow that model.
Here is a suggestion for a consolidated comment:
Fetch the result in chunks if FETCH_COUNT is set. We don't enable chunking
if SHOW_ALL_RESULTS is false, since that requires us to accumulate all rows
before we can tell what should be displayed, which would counter the idea
of FETCH_COUNT. Chunk fetching is also disabled if \gset, \crosstab,
\gexec and \watch are used.
OK, done like that.
+ if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK)
Could it be that result_status == PGRES_TUPLES_CHUNK, but fetch_count is 0?
if not, perhaps there should be an Assert that verifies that, and the "if"
statement should only check for the latter condition.
Good point. In fact it can be simplified to
if (result_status == PGRES_TUPLES_CHUNK),
and fetch_count as a variable can be removed from the function.
Done that way.
--- a/src/bin/psql/t/001_basic.pl +++ b/src/bin/psql/t/001_basic.pl @@ -184,10 +184,10 @@ like( "\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose", on_error_stop => 0))[2], qr/\A^psql:<stdin>:2: ERROR: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$
^ *^.*$ ^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$Why does the output change? Perhaps there is a good and harmless
explanation, but the naïve expectation would be that it doesn't.
Unpatched, psql builds this query:
DECLARE _psql_cursor NO SCROLL CURSOR FOR \n
<user-query>
therefore the user query starts at line 2.
With the patch, the user query is sent as-is, starting at line1,
hence the different error location.
After fixing the problem manually, it builds without warning.
The regression tests pass, and the feature works as expected.
Thanks for testing.
Updated patches are attached.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Attachments:
v7-0001-Implement-retrieval-of-results-in-chunks-with-lib.patchtext/plainDownload
From 0fefaee7d5b3003ad0d089ea9e92675c6f50245f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Mon, 1 Apr 2024 19:46:20 +0200
Subject: [PATCH v7 1/2] Implement retrieval of results in chunks with libpq.
This mode is similar to the single-row mode except that chunks
of results contain up to N rows instead of a single row.
It is meant to reduce the overhead of the row-by-row allocations
for large result sets.
The mode is selected with PQsetChunkedRowsMode(int maxRows) and results
have the new status code PGRES_TUPLES_CHUNK.
---
doc/src/sgml/libpq.sgml | 98 +++++++++++----
.../libpqwalreceiver/libpqwalreceiver.c | 1 +
src/bin/pg_amcheck/pg_amcheck.c | 1 +
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-exec.c | 117 +++++++++++++++---
src/interfaces/libpq/libpq-fe.h | 4 +-
src/interfaces/libpq/libpq-int.h | 7 +-
7 files changed, 185 insertions(+), 44 deletions(-)
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d3e87056f2..1814921d5a 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3545,7 +3545,20 @@ ExecStatusType PQresultStatus(const PGresult *res);
The <structname>PGresult</structname> contains a single result tuple
from the current command. This status occurs only when
single-row mode has been selected for the query
- (see <xref linkend="libpq-single-row-mode"/>).
+ (see <xref linkend="libpq-chunked-results-modes"/>).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pgres-tuples-chunk">
+ <term><literal>PGRES_TUPLES_CHUNK</literal></term>
+ <listitem>
+ <para>
+ The <structname>PGresult</structname> contains several tuples
+ from the current command. The count of tuples cannot exceed
+ the maximum passed to <xref linkend="libpq-PQsetChunkedRowsMode"/>.
+ This status occurs only when the chunked mode has been selected
+ for the query (see <xref linkend="libpq-chunked-results-modes"/>).
</para>
</listitem>
</varlistentry>
@@ -5197,8 +5210,8 @@ PGresult *PQgetResult(PGconn *conn);
<para>
Another frequently-desired feature that can be obtained with
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/>
- is retrieving large query results a row at a time. This is discussed
- in <xref linkend="libpq-single-row-mode"/>.
+ is retrieving large query results a limited number of rows at a time. This is discussed
+ in <xref linkend="libpq-chunked-results-modes"/>.
</para>
<para>
@@ -5562,12 +5575,13 @@ int PQflush(PGconn *conn);
</para>
<para>
- To enter single-row mode, call <function>PQsetSingleRowMode</function>
- before retrieving results with <function>PQgetResult</function>.
- This mode selection is effective only for the query currently
- being processed. For more information on the use of
- <function>PQsetSingleRowMode</function>,
- refer to <xref linkend="libpq-single-row-mode"/>.
+ To enter single-row or chunked modes, call
+ respectively <function>PQsetSingleRowMode</function>
+ or <function>PQsetChunkedRowsMode</function> before retrieving results
+ with <function>PQgetResult</function>. This mode selection is effective
+ only for the query currently being processed. For more information on the
+ use of these functions refer
+ to <xref linkend="libpq-chunked-results-modes" />.
</para>
<para>
@@ -5934,10 +5948,10 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</sect2>
</sect1>
- <sect1 id="libpq-single-row-mode">
- <title>Retrieving Query Results Row-by-Row</title>
+ <sect1 id="libpq-chunked-results-modes">
+ <title>Retrieving Query Results in chunks</title>
- <indexterm zone="libpq-single-row-mode">
+ <indexterm zone="libpq-chunked-results-modes">
<primary>libpq</primary>
<secondary>single-row mode</secondary>
</indexterm>
@@ -5948,13 +5962,15 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
<structname>PGresult</structname>. This can be unworkable for commands
that return a large number of rows. For such cases, applications can use
<xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/> in
- <firstterm>single-row mode</firstterm>. In this mode, the result row(s) are
- returned to the application one at a time, as they are received from the
- server.
+ <firstterm>single-row mode</firstterm> or <firstterm>chunked mode</firstterm>.
+ In these modes, the result row(s) are returned to the application one at a
+ time for the single-row mode and by chunks for the chunked mode, as they
+ are received from the server.
</para>
<para>
- To enter single-row mode, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ To enter these modes, call <xref linkend="libpq-PQsetSingleRowMode"/>
+ or <xref linkend="libpq-PQsetChunkedRowsMode"/>
immediately after a successful call of <xref linkend="libpq-PQsendQuery"/>
(or a sibling function). This mode selection is effective only for the
currently executing query. Then call <xref linkend="libpq-PQgetResult"/>
@@ -5962,7 +5978,8 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
linkend="libpq-async"/>. If the query returns any rows, they are returned
as individual <structname>PGresult</structname> objects, which look like
normal query results except for having status code
- <literal>PGRES_SINGLE_TUPLE</literal> instead of
+ <literal>PGRES_SINGLE_TUPLE</literal> for the single-row mode and
+ <literal>PGRES_TUPLES_CHUNK</literal> for the chunked mode, instead of
<literal>PGRES_TUPLES_OK</literal>. After the last row, or immediately if
the query returns zero rows, a zero-row object with status
<literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
@@ -5975,9 +5992,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42;
</para>
<para>
- When using pipeline mode, single-row mode needs to be activated for each
- query in the pipeline before retrieving results for that query
- with <function>PQgetResult</function>.
+ When using pipeline mode, the single-row or chunked mode need to be
+ activated for each query in the pipeline before retrieving results for that
+ query with <function>PQgetResult</function>.
See <xref linkend="libpq-pipeline-mode"/> for more information.
</para>
@@ -6011,14 +6028,49 @@ int PQsetSingleRowMode(PGconn *conn);
</variablelist>
</para>
+ <para>
+ <variablelist>
+ <varlistentry id="libpq-PQsetChunkedRowsMode">
+ <term><function>PQsetChunkedRowsMode</function>
+ <indexterm><primary>PQsetChunkedRowsMode</primary></indexterm></term>
+ <listitem>
+ <para>
+ Select to receive the results for the currently-executing query in chunks.
+
+<synopsis>
+ int PQsetChunkedRowsMode(PGconn *conn,
+ int maxRows);
+</synopsis>
+ </para>
+
+ <para>
+ This function is similar to <xref linkend="libpq-PQsetSingleRowMode"/>,
+ except that it can retrieve <replaceable>maxRows</replaceable> rows
+ per call to <xref linkend="libpq-PQgetResult"/> instead of a single row.
+ This function can only be called immediately after
+ <xref linkend="libpq-PQsendQuery"/> or one of its sibling functions,
+ before any other operation on the connection such as
+ <xref linkend="libpq-PQconsumeInput"/> or
+ <xref linkend="libpq-PQgetResult"/>. If called at the correct time,
+ the function activates the chunked mode for the current query and
+ returns 1. Otherwise the mode stays unchanged and the function
+ returns 0. In any case, the mode reverts to normal after
+ completion of the current query.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
<caution>
<para>
While processing a query, the server may return some rows and then
encounter an error, causing the query to be aborted. Ordinarily,
<application>libpq</application> discards any such rows and reports only the
- error. But in single-row mode, those rows will have already been
- returned to the application. Hence, the application will see some
- <literal>PGRES_SINGLE_TUPLE</literal> <structname>PGresult</structname>
+ error. But in the single-row or chunked modes, those rows will have already
+ been returned to the application. Hence, the application will see some
+ <literal>PGRES_SINGLE_TUPLE</literal> or <literal>PGRES_TUPLES_CHUNK</literal>
+ <structname>PGresult</structname>
objects followed by a <literal>PGRES_FATAL_ERROR</literal> object. For
proper transactional behavior, the application must be designed to
discard or undo whatever has been done with the previously-processed
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 761bf0f677..83a465a390 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1249,6 +1249,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
switch (PQresultStatus(pgres))
{
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_TUPLES_OK:
walres->status = WALRCV_OK_TUPLES;
libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index e5f9eedc47..728305a7cf 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -989,6 +989,7 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return false;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 9fbd3d3407..c7d01958ab 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -202,3 +202,4 @@ PQcancelSocket 199
PQcancelErrorMessage 200
PQcancelReset 201
PQcancelFinish 202
+PQsetChunkedRowsMode 203
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index c02a9180b2..b9a73b583e 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -41,7 +41,8 @@ char *const pgresStatus[] = {
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
- "PGRES_PIPELINE_ABORTED"
+ "PGRES_PIPELINE_ABORTED",
+ "PGRES_TUPLES_CHUNK"
};
/* We return this if we're unable to make a PGresult at all */
@@ -83,7 +84,7 @@ static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush);
static int pqPipelineFlush(PGconn *conn);
-
+static bool canChangeRowMode(PGconn *conn);
/* ----------------
* Space management for PGresult.
@@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_TUPLES_CHUNK:
/* non-error cases */
break;
default:
@@ -913,8 +915,9 @@ pqPrepareAsyncResult(PGconn *conn)
/*
* Replace conn->result with next_result, if any. In the normal case
* there isn't a next result and we're just dropping ownership of the
- * current result. In single-row mode this restores the situation to what
- * it was before we created the current single-row result.
+ * current result. In single-row and chunked modes this restores the
+ * situation to what it was before we created the current single-row or
+ * chunk-of-rows result.
*/
conn->result = conn->next_result;
conn->error_result = false; /* next_result is never an error */
@@ -1200,10 +1203,11 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
* (Such a string should already be translated via libpq_gettext().)
* If it is left NULL, the error is presumed to be "out of memory".
*
- * In single-row mode, we create a new result holding just the current row,
- * stashing the previous result in conn->next_result so that it becomes
- * active again after pqPrepareAsyncResult(). This allows the result metadata
- * (column descriptions) to be carried forward to each result row.
+ * In single-row or chunked mode, we create a new result holding just the
+ * current set of rows, stashing the previous result in conn->next_result so
+ * that it becomes active again after pqPrepareAsyncResult(). This allows the
+ * result metadata (column descriptions) to be carried forward to each result
+ * row.
*/
int
pqRowProcessor(PGconn *conn, const char **errmsgp)
@@ -1228,6 +1232,28 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
if (!res)
return 0;
}
+ else if (conn->rowsChunkSize > 0)
+ {
+ /*
+ * In chunked mode, make a new PGresult that will hold N rows; the
+ * original conn->result is left unchanged, as in the single-row mode.
+ */
+ if (!conn->chunk_result)
+ {
+ /* Allocate and initialize the result to hold a chunk of rows */
+ res = PQcopyResult(res,
+ PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
+ PG_COPYRES_NOTICEHOOKS);
+ if (!res)
+ return 0;
+ /* Change result status to special chunk-of-rows value */
+ res->resultStatus = PGRES_TUPLES_CHUNK;
+ /* Keep this result to reuse for the next rows of the chunk */
+ conn->chunk_result = res;
+ }
+ else
+ res = conn->chunk_result; /* Use the current chunk */
+ }
/*
* Basically we just allocate space in the PGresult for each field and
@@ -1290,6 +1316,21 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->asyncStatus = PGASYNC_READY_MORE;
}
+ /*
+ * In chunked mode, if the count has reached the requested limit, make the
+ * rows of the current chunk available immediately.
+ */
+ else if (conn->rowsChunkSize > 0 && res->ntups >= conn->rowsChunkSize)
+ {
+ /* Stash old result for re-use later */
+ conn->next_result = conn->result;
+ conn->result = res;
+ /* Do not reuse that chunk of results */
+ conn->chunk_result = NULL;
+ /* And mark the result ready to return */
+ conn->asyncStatus = PGASYNC_READY_MORE;
+ }
+
return 1;
fail:
@@ -1745,8 +1786,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
*/
pqClearAsyncResult(conn);
- /* reset single-row processing mode */
+ /* reset row-by-row and chunked processing modes */
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
}
/* ready to send command message */
@@ -1930,25 +1972,51 @@ sendFailed:
*/
int
PQsetSingleRowMode(PGconn *conn)
+{
+ if (canChangeRowMode(conn))
+ {
+ conn->singleRowMode = true;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+/*
+ * Select chunked results processing mode
+ */
+int
+PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
+{
+ if (chunkSize >= 0 && canChangeRowMode(conn))
+ {
+ conn->rowsChunkSize = chunkSize;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+static
+bool
+canChangeRowMode(PGconn *conn)
{
/*
- * Only allow setting the flag when we have launched a query and not yet
- * received any results.
+ * Only allow setting the row-by-row or by-chunks modes when we have
+ * launched a query and not yet received any results.
*/
if (!conn)
- return 0;
+ return false;
if (conn->asyncStatus != PGASYNC_BUSY)
- return 0;
+ return false;
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
- return 0;
+ return false;
if (pgHavePendingResult(conn))
- return 0;
+ return false;
- /* OK, set flag */
- conn->singleRowMode = true;
- return 1;
+ return true;
}
/*
@@ -2115,6 +2183,16 @@ PQgetResult(PGconn *conn)
break;
case PGASYNC_READY:
+ /*
+ * If there is a pending chunk of results, return it
+ */
+ if (conn->chunk_result != NULL)
+ {
+ res = conn->chunk_result;
+ conn->chunk_result = NULL;
+ break;
+ }
+
res = pqPrepareAsyncResult(conn);
/* Advance the queue as appropriate */
@@ -3173,10 +3251,11 @@ pqPipelineProcessQueue(PGconn *conn)
}
/*
- * Reset single-row processing mode. (Client has to set it up for each
+ * Reset to full result sets mode. (Client has to set it up for each
* query, if desired.)
*/
conn->singleRowMode = false;
+ conn->rowsChunkSize = 0;
/*
* If there are no further commands to process in the queue, get us in
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 09b485bd2b..0cea4f6b5b 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -112,8 +112,9 @@ typedef enum
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
- PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort
+ PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
+ PGRES_TUPLES_CHUNK /* set of tuples from larger resultset */
} ExecStatusType;
typedef enum
@@ -489,6 +490,7 @@ extern int PQsendQueryPrepared(PGconn *conn,
const int *paramFormats,
int resultFormat);
extern int PQsetSingleRowMode(PGconn *conn);
+extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize);
extern PGresult *PQgetResult(PGconn *conn);
/* Routines for managing an asynchronous query */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 9c05f11a6e..23c7a399ab 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -435,6 +435,8 @@ struct pg_conn
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
+ int rowsChunkSize; /* non-zero to return query results by chunks
+ * not exceeding that number of rows */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
PGnotify *notifyHead; /* oldest unreported Notify msg */
@@ -540,7 +542,10 @@ struct pg_conn
*/
PGresult *result; /* result being constructed */
bool error_result; /* do we need to make an ERROR result? */
- PGresult *next_result; /* next result (used in single-row mode) */
+ PGresult *next_result; /* next result (used in single-row and
+ * by-chunks modes) */
+ PGresult *chunk_result; /* current chunk of results (limited to
+ * rowsChunkSize) */
/* Assorted state for SASL, SSL, GSS, etc */
const pg_fe_sasl_mech *sasl;
--
2.34.1
v7-0002-Reimplement-FETCH_COUNT-with-the-chunked-mode-in-.patchtext/plainDownload
From 33f043aeaf3969e66f6c80af6ef6ea27499b4740 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <daniel@manitou-mail.org>
Date: Mon, 1 Apr 2024 19:46:42 +0200
Subject: [PATCH v7 2/2] Reimplement FETCH_COUNT with the chunked mode in
libpq.
Cursors were used only when the command starts with the keyword "SELECT",
excluding queries that start with "WITH" or "UPDATE" or "INSERT" that
may also return large result sets.
---
src/bin/psql/common.c | 538 ++++++++++-------------------
src/bin/psql/t/001_basic.pl | 6 +-
src/test/regress/expected/psql.out | 9 +-
src/test/regress/sql/psql.sql | 4 +-
4 files changed, 189 insertions(+), 368 deletions(-)
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 2830bde495..2112e1a423 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -31,7 +31,6 @@
#include "settings.h"
static bool DescribeQuery(const char *query, double *elapsed_msec);
-static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static int ExecQueryAndProcessResults(const char *query,
double *elapsed_msec,
bool *svpt_gone_p,
@@ -40,8 +39,6 @@ static int ExecQueryAndProcessResults(const char *query,
const printQueryOpt *opt,
FILE *printQueryFout);
static bool command_no_begin(const char *query);
-static bool is_select_command(const char *query);
-
/*
* openQueryOutputFile --- attempt to open a query output file
@@ -373,6 +370,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
+ case PGRES_TUPLES_CHUNK:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@@ -1135,16 +1133,10 @@ SendQuery(const char *query)
/* Describe query's result columns, without executing it */
OK = DescribeQuery(query, &elapsed_msec);
}
- else if (pset.fetch_count <= 0 || pset.gexec_flag ||
- pset.crosstab_flag || !is_select_command(query))
- {
- /* Default fetch-it-all-and-print mode */
- OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
- }
else
{
- /* Fetch-in-segments mode */
- OK = ExecQueryUsingCursor(query, &elapsed_msec);
+ /* Default fetch-and-print mode */
+ OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
if (!OK && pset.echo == PSQL_ECHO_ERRORS)
@@ -1396,6 +1388,47 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
+/*
+ * Check if an output stream for \g needs to be opened, and if
+ * yes, open it.
+ * Return false if an error occurred, true otherwise.
+ */
+static bool
+SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe)
+{
+ ExecStatusType status = PQresultStatus(result);
+ if (pset.gfname != NULL && /* there is a \g file or program */
+ *gfile_fout == NULL && /* and it's not already opened */
+ (status == PGRES_TUPLES_OK ||
+ status == PGRES_TUPLES_CHUNK ||
+ status == PGRES_COPY_OUT))
+ {
+ if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
+ {
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ return false;
+ }
+ return true;
+}
+
+static void
+CloseGOutput(FILE *gfile_fout, bool is_pipe)
+{
+ /* close \g file if we opened it */
+ if (gfile_fout)
+ {
+ if (is_pipe)
+ {
+ SetShellResultVariables(pclose(gfile_fout));
+ restore_sigpipe_trap();
+ }
+ else
+ fclose(gfile_fout);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1429,9 +1462,14 @@ ExecQueryAndProcessResults(const char *query,
instr_time before,
after;
PGresult *result;
+
FILE *gfile_fout = NULL;
bool gfile_is_pipe = false;
+ int64 total_tuples = 0;
+ int flush_error = 0;
+ bool is_pager = false;
+
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
@@ -1454,6 +1492,23 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
+ /*
+ * Fetch the result in chunks if FETCH_COUNT is set.
+ * We don't enable chunking if SHOW_ALL_RESULTS is false, since that
+ * requires us to accumulate all rows before we can tell what should be
+ * displayed, which would counter the idea of FETCH_COUNT.
+ * Chunk fetching is also disabled if \gset, \crosstab, \gexec and \watch
+ * are used.
+ */
+ if (pset.fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch
+ && !pset.gset_prefix && pset.show_all_results)
+ {
+ if (!PQsetChunkedRowsMode(pset.db, pset.fetch_count))
+ {
+ pg_log_warning("fetching results in chunks mode is unavailable");
+ }
+ }
+
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@@ -1477,6 +1532,8 @@ ExecQueryAndProcessResults(const char *query,
ExecStatusType result_status;
PGresult *next_result;
bool last;
+ /* whether the output starts before results are fully fetched */
+ bool partial_display = false;
if (!AcceptResult(result, false))
{
@@ -1572,20 +1629,9 @@ ExecQueryAndProcessResults(const char *query,
}
else if (pset.gfname)
{
- /* send to \g file, which we may have opened already */
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- copy_stream = gfile_fout;
- }
- else
- success = false;
- }
- else
+ /* COPY followed by \g filename or \g |program */
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (success)
copy_stream = gfile_fout;
}
else
@@ -1603,6 +1649,90 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
+ if (result_status == PGRES_TUPLES_CHUNK)
+ {
+ FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
+ printQueryOpt my_popt = pset.popt;
+
+ total_tuples = 0;
+ partial_display = true;
+
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
+ tuples_fout = gfile_fout;
+
+ /* initialize print options for partial table output */
+ my_popt.topt.start_table = true;
+ my_popt.topt.stop_table = false;
+ my_popt.topt.prior_records = 0;
+
+ while (success)
+ {
+ /* pager: open at most once per resultset */
+ if (tuples_fout == stdout && !is_pager)
+ {
+ tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
+ is_pager = true;
+ }
+ /* display the current chunk of results unless the output stream is not working */
+ if (!flush_error)
+ {
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ flush_error = fflush(tuples_fout);
+ }
+
+ /* after the first result set, disallow header decoration */
+ my_popt.topt.start_table = false;
+ my_popt.topt.prior_records += PQntuples(result);
+ total_tuples += PQntuples(result);
+
+ ClearOrSaveResult(result);
+
+ result = PQgetResult(pset.db);
+ if (result == NULL)
+ {
+ /*
+ * Error. We expect a PGRES_TUPLES_OK result with
+ * zero tuple in it to finish the fetch sequence.
+ */
+ success = false;
+ if (is_pager)
+ ClosePager(tuples_fout);
+ break;
+ }
+ else if (PQresultStatus(result) == PGRES_TUPLES_OK)
+ {
+ /*
+ * The last row has been read. Display the footer.
+ */
+ my_popt.topt.stop_table = true;
+ printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
+ total_tuples += PQntuples(result);
+
+ if (is_pager)
+ ClosePager(tuples_fout);
+ ClearOrSaveResult(result);
+ result = NULL;
+ break;
+ }
+ else if (PQresultStatus(result) != PGRES_TUPLES_CHUNK)
+ {
+ /*
+ * Error. We expect either PGRES_TUPLES_CHUNK or
+ * PGRES_TUPLES_OK.
+ */
+ if (is_pager)
+ ClosePager(tuples_fout);
+ success = false;
+ AcceptResult(result, true); /* display error whenever appropriate */
+ SetResultVariables(result, success);
+ break;
+ }
+ }
+ }
+ else
+ partial_display = false;
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@@ -1631,7 +1761,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* this may or may not print something depending on settings */
- if (result != NULL)
+ if (result != NULL && !partial_display)
{
/*
* If results need to be printed into the file specified by \g,
@@ -1640,32 +1770,33 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
- bool do_print = true;
-
- if (PQresultStatus(result) == PGRES_TUPLES_OK &&
- pset.gfname)
- {
- if (gfile_fout == NULL)
- {
- if (openQueryOutputFile(pset.gfname,
- &gfile_fout, &gfile_is_pipe))
- {
- if (gfile_is_pipe)
- disable_sigpipe_trap();
- }
- else
- success = do_print = false;
- }
+ success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe);
+ if (gfile_fout)
tuples_fout = gfile_fout;
- }
- if (do_print)
+ if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables from last result */
if (!is_watch && last)
- SetResultVariables(result, success);
+ {
+ if (!partial_display)
+ SetResultVariables(result, success);
+ else if (success)
+ {
+ /*
+ * fake SetResultVariables(). If an error occurred when
+ * retrieving chunks, these variables have been set already.
+ */
+ char buf[32];
+
+ SetVariable(pset.vars, "ERROR", "false");
+ SetVariable(pset.vars, "SQLSTATE", "00000");
+ snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
+ SetVariable(pset.vars, "ROW_COUNT", buf);
+ }
+ }
ClearOrSaveResult(result);
result = next_result;
@@ -1677,17 +1808,7 @@ ExecQueryAndProcessResults(const char *query,
}
}
- /* close \g file if we opened it */
- if (gfile_fout)
- {
- if (gfile_is_pipe)
- {
- SetShellResultVariables(pclose(gfile_fout));
- restore_sigpipe_trap();
- }
- else
- fclose(gfile_fout);
- }
+ CloseGOutput(gfile_fout, gfile_is_pipe);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
@@ -1700,274 +1821,6 @@ ExecQueryAndProcessResults(const char *query,
}
-/*
- * ExecQueryUsingCursor: run a SELECT-like query using a cursor
- *
- * This feature allows result sets larger than RAM to be dealt with.
- *
- * Returns true if the query executed successfully, false otherwise.
- *
- * If pset.timing is on, total query time (exclusive of result-printing) is
- * stored into *elapsed_msec.
- */
-static bool
-ExecQueryUsingCursor(const char *query, double *elapsed_msec)
-{
- bool OK = true;
- PGresult *result;
- PQExpBufferData buf;
- printQueryOpt my_popt = pset.popt;
- bool timing = pset.timing;
- FILE *fout;
- bool is_pipe;
- bool is_pager = false;
- bool started_txn = false;
- int64 total_tuples = 0;
- int ntuples;
- int fetch_count;
- char fetch_cmd[64];
- instr_time before,
- after;
- int flush_error;
-
- *elapsed_msec = 0;
-
- /* initialize print options for partial table output */
- my_popt.topt.start_table = true;
- my_popt.topt.stop_table = false;
- my_popt.topt.prior_records = 0;
-
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
- else
- INSTR_TIME_SET_ZERO(before);
-
- /* if we're not in a transaction, start one */
- if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
- {
- result = PQexec(pset.db, "BEGIN");
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- if (!OK)
- return false;
- started_txn = true;
- }
-
- /* Send DECLARE CURSOR */
- initPQExpBuffer(&buf);
- appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
- query);
-
- result = PQexec(pset.db, buf.data);
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- if (!OK)
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- termPQExpBuffer(&buf);
- if (!OK)
- goto cleanup;
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- /*
- * In \gset mode, we force the fetch count to be 2, so that we will throw
- * the appropriate error if the query returns more than one row.
- */
- if (pset.gset_prefix)
- fetch_count = 2;
- else
- fetch_count = pset.fetch_count;
-
- snprintf(fetch_cmd, sizeof(fetch_cmd),
- "FETCH FORWARD %d FROM _psql_cursor",
- fetch_count);
-
- /* prepare to write output to \g argument, if any */
- if (pset.gfname)
- {
- if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
- {
- OK = false;
- goto cleanup;
- }
- if (is_pipe)
- disable_sigpipe_trap();
- }
- else
- {
- fout = pset.queryFout;
- is_pipe = false; /* doesn't matter */
- }
-
- /* clear any pre-existing error indication on the output stream */
- clearerr(fout);
-
- for (;;)
- {
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /* get fetch_count tuples at a time */
- result = PQexec(pset.db, fetch_cmd);
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- if (PQresultStatus(result) != PGRES_TUPLES_OK)
- {
- /* shut down pager before printing error message */
- if (is_pager)
- {
- ClosePager(fout);
- is_pager = false;
- }
-
- OK = AcceptResult(result, true);
- Assert(!OK);
- SetResultVariables(result, OK);
- ClearOrSaveResult(result);
- break;
- }
-
- if (pset.gset_prefix)
- {
- /* StoreQueryTuple will complain if not exactly one row */
- OK = StoreQueryTuple(result);
- ClearOrSaveResult(result);
- break;
- }
-
- /*
- * Note we do not deal with \gdesc, \gexec or \crosstabview modes here
- */
-
- ntuples = PQntuples(result);
- total_tuples += ntuples;
-
- if (ntuples < fetch_count)
- {
- /* this is the last result set, so allow footer decoration */
- my_popt.topt.stop_table = true;
- }
- else if (fout == stdout && !is_pager)
- {
- /*
- * If query requires multiple result sets, hack to ensure that
- * only one pager instance is used for the whole mess
- */
- fout = PageOutput(INT_MAX, &(my_popt.topt));
- is_pager = true;
- }
-
- printQuery(result, &my_popt, fout, is_pager, pset.logfile);
-
- ClearOrSaveResult(result);
-
- /* after the first result set, disallow header decoration */
- my_popt.topt.start_table = false;
- my_popt.topt.prior_records += ntuples;
-
- /*
- * Make sure to flush the output stream, so intermediate results are
- * visible to the client immediately. We check the results because if
- * the pager dies/exits/etc, there's no sense throwing more data at
- * it.
- */
- flush_error = fflush(fout);
-
- /*
- * Check if we are at the end, if a cancel was pressed, or if there
- * were any errors either trying to flush out the results, or more
- * generally on the output stream at all. If we hit any errors
- * writing things to the stream, we presume $PAGER has disappeared and
- * stop bothering to pull down more data.
- */
- if (ntuples < fetch_count || cancel_pressed || flush_error ||
- ferror(fout))
- break;
- }
-
- if (pset.gfname)
- {
- /* close \g argument file/pipe */
- if (is_pipe)
- {
- SetShellResultVariables(pclose(fout));
- restore_sigpipe_trap();
- }
- else
- fclose(fout);
- }
- else if (is_pager)
- {
- /* close transient pager */
- ClosePager(fout);
- }
-
- if (OK)
- {
- /*
- * We don't have a PGresult here, and even if we did it wouldn't have
- * the right row count, so fake SetResultVariables(). In error cases,
- * we already set the result variables above.
- */
- char buf[32];
-
- SetVariable(pset.vars, "ERROR", "false");
- SetVariable(pset.vars, "SQLSTATE", "00000");
- snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
- SetVariable(pset.vars, "ROW_COUNT", buf);
- }
-
-cleanup:
- if (timing)
- INSTR_TIME_SET_CURRENT(before);
-
- /*
- * We try to close the cursor on either success or failure, but on failure
- * ignore the result (it's probably just a bleat about being in an aborted
- * transaction)
- */
- result = PQexec(pset.db, "CLOSE _psql_cursor");
- if (OK)
- {
- OK = AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
- else
- PQclear(result);
-
- if (started_txn)
- {
- result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
- OK &= AcceptResult(result, true) &&
- (PQresultStatus(result) == PGRES_COMMAND_OK);
- ClearOrSaveResult(result);
- }
-
- if (timing)
- {
- INSTR_TIME_SET_CURRENT(after);
- INSTR_TIME_SUBTRACT(after, before);
- *elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
- }
-
- return OK;
-}
-
-
/*
* Advance the given char pointer over white space and SQL comments.
*/
@@ -2247,43 +2100,6 @@ command_no_begin(const char *query)
}
-/*
- * Check whether the specified command is a SELECT (or VALUES).
- */
-static bool
-is_select_command(const char *query)
-{
- int wordlen;
-
- /*
- * First advance over any whitespace, comments and left parentheses.
- */
- for (;;)
- {
- query = skip_white_space(query);
- if (query[0] == '(')
- query++;
- else
- break;
- }
-
- /*
- * Check word length (since "selectx" is not "select").
- */
- wordlen = 0;
- while (isalpha((unsigned char) query[wordlen]))
- wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
-
- if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
- return true;
-
- if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
- return true;
-
- return false;
-}
-
-
/*
* Test if the current user is a database superuser.
*/
diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl
index 9f0b6cf8ca..b5fedbc091 100644
--- a/src/bin/psql/t/001_basic.pl
+++ b/src/bin/psql/t/001_basic.pl
@@ -161,7 +161,7 @@ psql_like(
'\errverbose with no previous error');
# There are three main ways to run a query that might affect
-# \errverbose: The normal way, using a cursor by setting FETCH_COUNT,
+# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT,
# and using \gdesc. Test them all.
like(
@@ -184,10 +184,10 @@ like(
"\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose",
on_error_stop => 0))[2],
qr/\A^psql:<stdin>:2: ERROR: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$
-^LINE 2: SELECT error;$
+^LINE 1: SELECT error;$
^ *^.*$
^LOCATION: .*$/m,
'\errverbose after FETCH_COUNT query with error');
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 69060fe3c0..8580db7c00 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -4755,7 +4755,7 @@ number of rows: 0
last error message: syntax error at end of input
\echo 'last error code:' :LAST_ERROR_SQLSTATE
last error code: 42601
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
unique2
@@ -4787,7 +4787,7 @@ error: false
error code: 00000
\echo 'number of rows:' :ROW_COUNT
number of rows: 19
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
?column?
----------
@@ -4801,6 +4801,11 @@ select 1/(15-unique2) from tenk1 order by unique2 limit 19;
0
0
0
+ 0
+ 0
+ 0
+ 0
+ 1
ERROR: division by zero
\echo 'error:' :ERROR
error: true
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index 129f853353..33076cad79 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -1161,14 +1161,14 @@ SELECT 4 AS \gdesc
\echo 'last error message:' :LAST_ERROR_MESSAGE
\echo 'last error code:' :LAST_ERROR_SQLSTATE
--- check row count for a cursor-fetched query
+-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
\echo 'number of rows:' :ROW_COUNT
--- cursor-fetched query with an error after the first group
+-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
--
2.34.1
"Daniel Verite" <daniel@manitou-mail.org> writes:
Updated patches are attached.
I started to look through this, and almost immediately noted
- <sect1 id="libpq-single-row-mode">
- <title>Retrieving Query Results Row-by-Row</title>
+ <sect1 id="libpq-chunked-results-modes">
+ <title>Retrieving Query Results in chunks</title>
This is a bit problematic, because changing the sect1 ID will
change the page's URL, eg
https://www.postgresql.org/docs/current/libpq-single-row-mode.html
Aside from possibly breaking people's bookmarks, I'm pretty sure this
will cause the web docs framework to not recognize any cross-version
commonality of the page. How ugly would it be if we left the ID
alone? Another idea could be to leave the whole page alone and add
a new <sect1> for chunked mode.
But ... TBH I'm not convinced that we need the chunked mode at all.
We explicitly rejected that idea back when single-row mode was
designed, see around here:
/messages/by-id/50173BF7.1070801@Yahoo.com
and I'm still very skeptical that there's much win to be had.
I do not buy that psql's FETCH_COUNT mode is a sufficient reason
to add it. FETCH_COUNT mode is not something you'd use
non-interactively, and there is enough overhead elsewhere in psql
(notably in result-set formatting) that it doesn't seem worth
micro-optimizing the part about fetching from libpq.
(I see that there was some discussion in that old thread about
micro-optimizing single-row mode internally to libpq by making
PGresult creation cheaper, which I don't think anyone ever got
back to doing. Maybe we should resurrect that.)
regards, tom lane
Tom Lane wrote:
I do not buy that psql's FETCH_COUNT mode is a sufficient reason
to add it. FETCH_COUNT mode is not something you'd use
non-interactively
I should say that I've noticed significant latency improvements with
FETCH_COUNT retrieving large resultsets, such that it would benefit
non-interactive use cases.
For instance, with the current v7 patch, a query like the OP's initial
case and batches of 1000 rows:
$ cat fetchcount-test.sql
select repeat('a', 100) || '-' ||
i || '-' || repeat('b', 500) as total_pat
from generate_series(1, 5000000) as i
\g /dev/null
$ export TIMEFORMAT=%R
$ for s in $(seq 1 10); do time /usr/local/pgsql/bin/psql -At \
-v FETCH_COUNT=1000 -f fetchcount-test.sql; done
3.597
3.413
3.362
3.612
3.377
3.416
3.346
3.368
3.504
3.413
=> Average elapsed time = 3.44s
Now without FETCH_COUNT, fetching the 5 million rows in one resultset:
$ for s in $(seq 1 10); do time /usr/local/pgsql/bin/psql -At \
-f fetchcount-test.sql; done
4.200
4.178
4.200
4.169
4.195
4.217
4.197
4.234
4.225
4.242
=> Average elapsed time = 4.20s
By comparison the unpatched version (cursor-based method)
gives these execution times with FETCH_COUNT=1000:
4.458
4.448
4.476
4.455
4.450
4.466
4.395
4.429
4.387
4.473
=> Average elapsed time = 4.43s
Now that's just one test, but don't these numbers look good?
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
"Daniel Verite" <daniel@manitou-mail.org> writes:
Tom Lane wrote:
I do not buy that psql's FETCH_COUNT mode is a sufficient reason
to add it. FETCH_COUNT mode is not something you'd use
non-interactively
I should say that I've noticed significant latency improvements with
FETCH_COUNT retrieving large resultsets, such that it would benefit
non-interactive use cases.
Do you have a theory for why that is? It's pretty counterintuitive
that it would help at all.
regards, tom lane
Tom Lane wrote:
I should say that I've noticed significant latency improvements with
FETCH_COUNT retrieving large resultsets, such that it would benefit
non-interactive use cases.Do you have a theory for why that is? It's pretty counterintuitive
that it would help at all.
I've been thinking that it's a kind of pipeline/parallelism effect.
When libpq accumulates all rows in one resultset, if the network
or the server are not fast enough, it spends a certain amount of
time waiting for the data to come in.
But when it accumulates fewer rows and gives back control
to the app to display intermediate results, during that time the
network buffers can fill in, resulting, I assume, in less time waiting
overall.
I think the benefit is similar to what we get with \copy. In fact
with the above-mentioned test, the execution times with
FETCH_COUNT=1000 look very close to \copy of the same query.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
So what was really bothering me about this patchset was that I
didn't think marginal performance gains were a sufficient reason
to put a whole different operating mode into libpq. However,
I've reconsidered after realizing that implementing FETCH_COUNT
atop traditional single-row mode would require either merging
single-row results into a bigger PGresult or persuading psql's
results-printing code to accept an array of PGresults not just
one. Either of those would be expensive and ugly, not to mention
needing chunks of code we don't have today.
Also, it doesn't really need to be a whole different operating mode.
There's no reason that single-row mode shouldn't be exactly equivalent
to chunk mode with chunk size 1, except for the result status code.
(We've got to keep PGRES_SINGLE_TUPLE for the old behavior, but
using that for a chunked result would be too confusing.)
So I whacked the patch around till I liked it better, and pushed it.
I hope my haste will not come back to bite me, but we are getting
pretty hard up against the feature-freeze deadline.
regards, tom lane
Tom Lane wrote:
I've reconsidered after realizing that implementing FETCH_COUNT
atop traditional single-row mode would require either merging
single-row results into a bigger PGresult or persuading psql's
results-printing code to accept an array of PGresults not just
one. Either of those would be expensive and ugly, not to mention
needing chunks of code we don't have today.
Yes, we must accumulate results because the aligned format needs to
know the columns widths for a entire "page", and the row-by-row logic
does not fit that well in that case.
One of the posted patches implemented this with an array of PGresult
in single-row mode [1]/messages/by-id/092583fb-97c5-428f-8d99-fd31be4a5290@manitou-mail.org but I'm confident that the newer version you
pushed with the libpq changes is a better approach.
So I whacked the patch around till I liked it better, and pushed it.
Thanks for taking care of this!
[1]: /messages/by-id/092583fb-97c5-428f-8d99-fd31be4a5290@manitou-mail.org
/messages/by-id/092583fb-97c5-428f-8d99-fd31be4a5290@manitou-mail.org
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Hello Daniel and Tom,
08.04.2024 17:25, Daniel Verite wrote:
So I whacked the patch around till I liked it better, and pushed it.
Thanks for taking care of this!
Now that ExecQueryUsingCursor() is gone, it's not clear, what does
the following comment mean:?
* We must turn off gexec_flag to avoid infinite recursion. Note that
* this allows ExecQueryUsingCursor to be applied to the individual query
* results.
Shouldn't it be removed?
Best regards,
Alexander
Alexander Lakhin <exclusion@gmail.com> writes:
Now that ExecQueryUsingCursor() is gone, it's not clear, what does
the following comment mean:?
* We must turn off gexec_flag to avoid infinite recursion. Note that
* this allows ExecQueryUsingCursor to be applied to the individual query
* results.
Hmm, the point about recursion is still valid isn't it? I agree the
reference to ExecQueryUsingCursor is obsolete, but I think we need to
reconstruct what this comment is actually talking about. It's
certainly pretty obscure ...
regards, tom lane
08.04.2024 18:08, Tom Lane wrote:
Alexander Lakhin <exclusion@gmail.com> writes:
Now that ExecQueryUsingCursor() is gone, it's not clear, what does
the following comment mean:?
* We must turn off gexec_flag to avoid infinite recursion. Note that
* this allows ExecQueryUsingCursor to be applied to the individual query
* results.Hmm, the point about recursion is still valid isn't it? I agree the
reference to ExecQueryUsingCursor is obsolete, but I think we need to
reconstruct what this comment is actually talking about. It's
certainly pretty obscure ...
Sorry, I wasn't clear enough, I meant to remove only that reference, not
the quoted comment altogether.
Best regards,
Alexander
Alexander Lakhin wrote:
Now that ExecQueryUsingCursor() is gone, it's not clear, what does
the following comment mean:?
* We must turn off gexec_flag to avoid infinite recursion. Note that
* this allows ExecQueryUsingCursor to be applied to the individual query
* results.Hmm, the point about recursion is still valid isn't it? I agree the
reference to ExecQueryUsingCursor is obsolete, but I think we need to
reconstruct what this comment is actually talking about. It's
certainly pretty obscure ...Sorry, I wasn't clear enough, I meant to remove only that reference, not
the quoted comment altogether.
The comment might want to stress the fact that psql honors
FETCH_COUNT "on top of" \gset, so if the user issues for instance:
select 'select ' || i from generate_series(1,<N>) as i \gexec
what's going to be sent to the server is a series of:
BEGIN
DECLARE _psql_cursor NO SCROLL CURSOR FOR
select <i>
FETCH FORWARD <FETCH_COUNT> FROM _psql_cursor (possibly repeated)
CLOSE _psql_cursor
COMMIT
Another choice would be to ignore FETCH_COUNT and send exactly the
queries that \gset produces, with the assumption that it better
matches the user's expectation. Maybe that alternative was considered
and the comment reflects the decision.
Since the new implementation doesn't rewrite the user-supplied queries,
the point is moot, and this part should be removed:
"Note that this allows ExecQueryUsingCursor to be applied to the
individual query results"
I'll wait a bit for other comments and submit a patch.
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
Alexander Lakhin <exclusion@gmail.com> writes:
08.04.2024 18:08, Tom Lane wrote:
Hmm, the point about recursion is still valid isn't it? I agree the
reference to ExecQueryUsingCursor is obsolete, but I think we need to
reconstruct what this comment is actually talking about. It's
certainly pretty obscure ...
Sorry, I wasn't clear enough, I meant to remove only that reference, not
the quoted comment altogether.
After looking at it, I realized that the comment's last sentence was
also out of date, since SendQuery() isn't where the check of
gexec_flag happens any more. I concluded that documenting the
behavior of other functions here isn't such a hot idea, and removed
both sentences in favor of expanding the relevant comments in
ExecQueryAndProcessResults.
While doing that, I compared the normal and chunked-fetch code paths
in ExecQueryAndProcessResults more carefully, and realized that the
patch was a few other bricks shy of a load:
* it didn't honor pset.queryFout;
* it ignored the passed-in "printQueryOpt *opt" (maybe that's always
NULL, but doesn't seem like a great assumption);
* it failed to call PrintQueryStatus, so that INSERT RETURNING
and the like would print a status line only in non-FETCH_COUNT
mode.
I cleaned all that up at c21d4c416.
BTW, I had to reverse-engineer the exact reasoning for the cases
where we don't honor FETCH_COUNT. Most of them are clear enough,
but I'm not totally sure about \watch. I wrote:
+ * * We're doing \watch: users probably don't want us to force use of the
+ * pager for that, plus chunking could break the min_rows check.
It would not be terribly hard to make the chunked-fetch code path
handle min_rows correctly, and AFAICS the only other thing that
is_watch does differently is to not do SetResultVariables, which
we could match easily enough. So this is really down to whether
forcing pager mode is okay for a \watch'd query. I wonder if
that was actually Daniel's reasoning for excluding \watch, and
how strong that argument really is.
regards, tom lane