foreign table batch inserts
Hi,
I realized that inserts into foreign tables are only done row by row.
Consider copying data from one local table to a foreign table with
INSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM local_table;
When the foreign server is for example in another datacenter with long latency,
this as an enormous performance trade off.
Wouldn’t it make sense to do the insert batch wise e.g. 100 rows ?
Are there any plans doing that or am I miss something?
regards
Manuel Kniep
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 18, 2016 at 6:00 AM, Manuel Kniep <m.kniep@web.de> wrote:
I realized that inserts into foreign tables are only done row by row.
Consider copying data from one local table to a foreign table withINSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM local_table;
When the foreign server is for example in another datacenter with long latency,
this as an enormous performance trade off.Wouldn’t it make sense to do the insert batch wise e.g. 100 rows ?
Using a single query string with multiple values, perhaps, but after
that comes into consideration query string limit particularly for
large text values... The query used for the insertion is a prepared
statement since writable queries are supported in 9.3, which makes the
code quite simple actually.
Are there any plans doing that or am I miss something?
Not that I know of. I am adding Fujita-san in the loop here, he is
quite involved with postgres_fdw these days so perhaps he has some
input to offer.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 18 May 2016 at 06:08, Michael Paquier <michael.paquier@gmail.com> wrote:
Wouldn’t it make sense to do the insert batch wise e.g. 100 rows ?
Using a single query string with multiple values, perhaps, but after
that comes into consideration query string limit particularly for
large text values... The query used for the insertion is a prepared
statement since writable queries are supported in 9.3, which makes the
code quite simple actually.
This should be done how PgJDBC does batches. It'd require a libpq
enhancement, but it's one we IMO need anyway: allow pipelined query
execution from libpq.
[design follows]
What this should be doing is:
- send Parse to create an unnamed prepared statement; then
- loop, and:
- send a Bind & an Execute for the query with values if the send buffer
is not full
- If there are no more values to send, send a Sync message
- Receive and process results if the receive buffer is not empty
- Check each result and mark it off against the list of dispatched queries
- If an ERROR is received, bail out
- If a Sync is received, check that all results have been retrieved as
expected then return OK
This would require libpq to be smarter about how it tracks queries. Right
now it keeps track of current query, query results, etc directly in the
connection object, and it sends a Sync after each operation then expects to
wait in a busy state until it gets the results from that operation.
Instead we'd have to have a FIFO queue of messages libpq expects responses
for. Variants of
PQsendPrepare, PQsendQueryPrepared, PQsendDescribePrepared, etc would not
send a Sync message and would append an entry to the expected result queue
instead of setting the current query, etc on the connection. They'd still
mark the connection as busy, so no non-queue-aware calls could be run until
the queue is consumed and empty.
These functions might return some kind of handle value that can be used to
identify the queue entry they created; it'd be pretty useless at the
moment, but needed if we ever get "cancel queries up to X" functionality on
the protocol or if we later added buffering of multiple query results.
A new PQsendSync or similar would be added to send a synchronisation point,
which would go into the FIFO. Clients would call that after enqueueing a
batch of work, e.g. after sending a commit for a batched xact. That's
required for error recovery.
Clients would use PQgetResults as before. When it returns null, they'd call
a new PQnextResult(...) function to initiate processing of the next
operation's input; this would pop the next operaiton from the FIFO, or
return null if there's nothing more in the queue. PQisBusy returns true
until there are no items left in the queue.
We'd still use the connection object for result sets, fetching rows, etc,
as there can still only be one "current" query for which a response is
being received from the server. Nothing much would change with PQgetResult
etc. There wouldn't be any PQgetResult variant to wait for results of the
nth query or for some kind of query handle, because we need the client to
consume the results of all prior queries. The client must process query
results in FIFO order. We could have per-query result buffers, etc, but it
seems pretty pointless; the client can do this for its self if it wants.
If the server sends an error, libpq would pop popping queries off the queue
until we get to the Sync there and consume input on the socketuntil we get
to a Sync on the wire. PQgetResult for each queued operation so skipped
would return a state indicating that it didn't execute because of an error
in a prior operation.
Such an API would benefit immensely from the "cancel up to" functionality
we discussed here recently; without it, it's hard to cancel anything
reliably and know what exactly you're cancelling, but it doesn't need it.
The cancel problem isn't much worse than before.
If we wanted to allow batch execution from the sync API we'd need a new
function that takes a prepared query and an array of values and manages the
send and receive buffer polling using the async API internally, since we
need to use nonblocking sockets to avoid deadlocking.
I don't think this would look that different to current libpq code to the
user. Ignoring the details about error handling on command dispatch, etc.
The app would just call a series of PQqueuePrepare, PQqueueQueryPrepared,
etc (bikeshed as desired) then PQsendSync(...). Then it'd call PQgetResults
until it returns null, call PQgetNextResult(...) and resume calling
PQgetResults(...). Repeat until PQgetNextResult(...) returns null, and
check that the most recent result was a PGRES_SYNC_OK, which is what we'll
return from processing a PQsendSync(...) result.
If the client wants to be totally nonblocking it can do the PQconsumeInput
and PQflush dance as normal. It's strongly preferable for the client to use
non-blocking writes, because if it doesn't then it risks creating a
deadlock where the server and client are both blocked on writes. The client
is trying to write to its send buffer, but the server will never consume it
because the server's blocked writing results to its own send buffer, which
the client won't consume because it's blocked. It's still safe to pipeline
writes in blocking mode if you know you'll never write anything close to
the send buffer before you send a sync and switch to reading results,
though.
If we had this in libpq, FDWs could just prepare an insert then send the
data values in an efficient, pipelined manner. It's not quite as fast as
COPY, but it's a whole lot faster than the current round-trip-heavy
approach, and unlike COPY it can be used for update/delete too.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Wed, May 18, 2016 at 12:27 PM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 18 May 2016 at 06:08, Michael Paquier <michael.paquier@gmail.com> wrote:
Wouldn’t it make sense to do the insert batch wise e.g. 100 rows ?
Using a single query string with multiple values, perhaps, but after
that comes into consideration query string limit particularly for
large text values... The query used for the insertion is a prepared
statement since writable queries are supported in 9.3, which makes the
code quite simple actually.This should be done how PgJDBC does batches. It'd require a libpq
enhancement, but it's one we IMO need anyway: allow pipelined query
execution from libpq.
That's also something that would be useful for the ODBC driver. Since
it is using libpq as a hard dependency and does not speak the protocol
directly, it is doing additional round trips to the server for this
exact reason when preparing a statement.
[design follows]
This would require libpq to be smarter about how it tracks queries. Right
now it keeps track of current query, query results, etc directly in the
connection object, and it sends a Sync after each operation then expects to
wait in a busy state until it gets the results from that operation.
Yep.
Instead we'd have to have a FIFO queue of messages libpq expects responses
for. Variants of PQsendPrepare, PQsendQueryPrepared, PQsendDescribePrepared,
etc would not send a Sync message and would append an entry to the expected
result queue instead of setting the current query, etc on the connection.
They'd still mark the connection as busy, so no non-queue-aware calls could
be run until the queue is consumed and empty.
Yep. That's exactly the ODBC regression, which become a huge problem
with more latency.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 19 May 2016 at 01:39, Michael Paquier <michael.paquier@gmail.com> wrote:
On Wed, May 18, 2016 at 12:27 PM, Craig Ringer <craig@2ndquadrant.com>
wrote:On 18 May 2016 at 06:08, Michael Paquier <michael.paquier@gmail.com>
wrote:
Wouldn’t it make sense to do the insert batch wise e.g. 100 rows ?
Using a single query string with multiple values, perhaps, but after
that comes into consideration query string limit particularly for
large text values... The query used for the insertion is a prepared
statement since writable queries are supported in 9.3, which makes the
code quite simple actually.This should be done how PgJDBC does batches. It'd require a libpq
enhancement, but it's one we IMO need anyway: allow pipelined query
execution from libpq.That's also something that would be useful for the ODBC driver. Since
it is using libpq as a hard dependency and does not speak the protocol
directly, it is doing additional round trips to the server for this
exact reason when preparing a statement.
Good to know. It'll hurt especially badly when statement level rollback is
enabled, since psqlODBC does savepoints then and it'd be able to get rid of
an extra pair of round trips.
It looks like there's plenty of use for this. FDWs, psqlODBC, client
applications doing batches, and postgres XL would benefit from it too.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
From: pgsql-hackers-owner@postgresql.org [mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Craig Ringer
On 19 May 2016 at 01:39, Michael Paquier <michael.paquier@gmail.com> wrote:
On Wed, May 18, 2016 at 12:27 PM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 18 May 2016 at 06:08, Michael Paquier <michael.paquier@gmail.com> wrote:
Wouldn’t it make sense to do the insert batch wise e.g. 100 rows ?
Using a single query string with multiple values, perhaps, but after
that comes into consideration query string limit particularly for
large text values... The query used for the insertion is a prepared
statement since writable queries are supported in 9.3, which makes the
code quite simple actually.This should be done how PgJDBC does batches. It'd require a libpq
enhancement, but it's one we IMO need anyway: allow pipelined query
execution from libpq.
That's also something that would be useful for the ODBC driver. Since
it is using libpq as a hard dependency and does not speak the protocol
directly, it is doing additional round trips to the server for this
exact reason when preparing a statement.
Yes, I want FE-BE protocol-level batch inserts/updates/deletes, too. I was just about to start thinking of how to implement it because of recent user question in pgsql-odbc. The OP uses Microsoft SQL Server Integration Service (SSIS) to migrate data to PostgreSQL. He asked for a method to speed up multi-row inserts, because the ODBC's multi-row insert API takes as long a time as when performing single-row inserts separately. This may prevent the migration to PostgreSQL.
And it's also useful for ECPG. Our customer wanted ECPG to support multi-row insert to migrate to PostgreSQL, because their embedded-SQL apps use the feature with a commercial database.
If you challenge this feature, I can help you by reviewing and testing, implementing the ODBC and ECPG sides, etc.
Regards
Takayuki Tsunakawa
On 19 May 2016 at 14:08, Tsunakawa, Takayuki <tsunakawa.takay@jp.fujitsu.com
wrote:
Yes, I want FE-BE protocol-level batch inserts/updates/deletes, too. I
was just about to start thinking of how to implement it because of recent
user question in pgsql-odbc. The OP uses Microsoft SQL Server Integration
Service (SSIS) to migrate data to PostgreSQL. He asked for a method to
speed up multi-row inserts, because the ODBC's multi-row insert API takes
as long a time as when performing single-row inserts separately. This may
prevent the migration to PostgreSQL.
Well, there's FE/BE level batching/pipelining already. Just no access to it
from libpq.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
From: pgsql-hackers-owner@postgresql.org [mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Craig Ringer
Well, there's FE/BE level batching/pipelining already. Just no access to it from libpq.
Oh, really. The Bind ('B') appears to take one set of parameter values, not multiple sets (array). Anyway, I had to say "I want batch update API in libpq" to use it in ODBC and ECPG.
Regards
Takayuki Tsunakawa
On 20 May 2016 at 08:47, Tsunakawa, Takayuki <tsunakawa.takay@jp.fujitsu.com
wrote:
From: pgsql-hackers-owner@postgresql.org [mailto:
pgsql-hackers-owner@postgresql.org] On Behalf Of Craig RingerWell, there's FE/BE level batching/pipelining already. Just no access to
it from libpq.Oh, really. The Bind ('B') appears to take one set of parameter values,
not multiple sets (array). Anyway, I had to say "I want batch update API
in libpq" to use it in ODBC and ECPG.
Right, and there's no protocol level support for array-valued batches.
You can, however, omit Sync from between messages and send a series of
protocol messages, like
Parse/Bind/Execute/Bind/Execute/Bind/Execute/Sync
to avoid round-trip overheads.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On 20 May 2016 at 15:35, Craig Ringer <craig@2ndquadrant.com> wrote:
You can, however, omit Sync from between messages and send a series of
protocol messages, likeParse/Bind/Execute/Bind/Execute/Bind/Execute/Sync
to avoid round-trip overheads.
I implemented what I think is a pretty solid proof of concept of this for
kicks this evening. Attached, including basic test program. Patch attached.
The performance difference over higher latency links is huge, see below.
Demo/test program in src/test/examples/testlibpqbatch.c.
github: https://github.com/2ndQuadrant/postgres/tree/dev/libpq-async-batch
I still need to add the logic for handling an error during a batch by
discarding all input until the next Sync, but otherwise I think it's pretty
reasonable.
The time difference for 10k inserts on the local host over a unix socket
shows a solid improvement:
batch insert elapsed: 0.244293s
sequential insert elapsed: 0.375402s
... but over, say, a connection to a random AWS RDS instance fired up for
the purpose that lives about 320ms away the difference is huge:
batch insert elapsed: 9.029995s
sequential insert elapsed: (I got bored after 10 minutes; it should take a
bit less then an hour based on the latency numbers)
With 500 rows on the remote AWS RDS instance, once the I/O quota is already
saturated:
batch insert elapsed: 1.229024s
sequential insert elapsed: 156.962180s
which is an improvement by a factor of over 120
I didn't compare vs COPY. I'm sure COPY will be faster, but COPY doesn't
let you do INSERT ... ON CONFLICT, do UPDATE, do DELETE, etc. Not without
temp tables and a bunch of hoop jumping anyway. If COPY solved everything
there'd be no point having pipelining.
No docs yet, but if folks think the interface is reasonable I can add them
easily since the comments on each of the new functoins should be easy to
adapt into the SGML docs.
With a bit of polishing I think this can probably go in the next CF, though
I only wrote it as an experiment. Can I get opinions on the API?
The TL;DR API, using the usual async libpq routines, is:
PQbeginBatchMode(conn);
PQsendQueryParams(conn, "BEGIN", 0, NULL, NULL, NULL, NULL, 0);
PQsendPrepare(conn, "my_update", "UPDATE ...");
PQsetnonblocking(conn, 1);
while (!all_responses_received)
{
select(...)
if (can-write)
{
if (app-has-more-data-to-send)
{
PQsendQueryPrepared(conn, "my_update", params-go-here);
}
else if (havent-sent-commit-yet)
{
PQsendQueryParams(conn, "COMMIT", ...);
}
else if (havent-sent-endbatch-yet)
{
PqEndBatch(conn);
}
PQflush(conn);
}
if (can-read)
{
PQconsumeInput(conn);
if (PQisBusy(conn))
continue;
res = PQgetResult(conn);
if (res == NULL)
{
PQgetNextQuery(conn);
continue;
}
/* process results in the same order we sent the commands */
/* client keeps track of that, libpq just supplies the results */
...
}
}
PQendBatch(conn);
Note that:
* PQsendQuery cannot be used as it uses simple query protocol, use
PQsendQueryParams instead;
* Batch supports PQsendQueryParams, PQsendPrepare, PQsendQueryPrepared,
PQsendDescribePrepared, PQsendDescribePortal;
* You don't call PQgetResult after dispatching each query
* Multiple batches may be pipelined, you don't have to wait for one to end
to start another (an advantage over JDBC's API)
* non-blocking mode isn't required, but is strongly advised
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Draft-of-libpq-async-pipelining-support.patchtext/x-patch; charset=US-ASCII; name=0001-Draft-of-libpq-async-pipelining-support.patchDownload
From f0ca25bdc2bacf65530e4f180fdfc7c219866541 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Fri, 20 May 2016 12:45:18 +0800
Subject: [PATCH] Draft of libpq async pipelining support
Now with test in src/test/examples/testlibpqbatch.c
---
src/interfaces/libpq/exports.txt | 6 +
src/interfaces/libpq/fe-connect.c | 16 +
src/interfaces/libpq/fe-exec.c | 540 ++++++++++++++++++++++++++--
src/interfaces/libpq/fe-protocol2.c | 6 +
src/interfaces/libpq/fe-protocol3.c | 13 +-
src/interfaces/libpq/libpq-fe.h | 12 +-
src/interfaces/libpq/libpq-int.h | 36 +-
src/test/examples/Makefile | 2 +-
src/test/examples/testlibpqbatch.c | 690 ++++++++++++++++++++++++++++++++++++
9 files changed, 1278 insertions(+), 43 deletions(-)
create mode 100644 src/test/examples/testlibpqbatch.c
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 21dd772..e297c4b 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -171,3 +171,9 @@ PQsslAttributeNames 168
PQsslAttribute 169
PQsetErrorContextVisibility 170
PQresultVerboseErrorMessage 171
+PQisInBatchMode 172
+PQqueriesInBatch 173
+PQbeginBatchMode 174
+PQendBatchMode 175
+PQendBatch 176
+PQgetNextQuery 177
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 9b2839b..78154e2 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2949,6 +2949,7 @@ static void
closePGconn(PGconn *conn)
{
PGnotify *notify;
+ PGcommandQueueEntry *queue;
pgParameterStatus *pstatus;
/*
@@ -2995,6 +2996,21 @@ closePGconn(PGconn *conn)
free(prev);
}
conn->notifyHead = conn->notifyTail = NULL;
+ queue = conn->cmd_queue_head;
+ while (queue != NULL)
+ {
+ PGcommandQueueEntry *prev = queue;
+ queue = queue->next;
+ free(prev);
+ }
+ conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+ queue = conn->cmd_queue_recycle;
+ {
+ PGcommandQueueEntry *prev = queue;
+ queue = queue->next;
+ free(prev);
+ }
+ conn->cmd_queue_recycle = NULL;
pstatus = conn->pstatus;
while (pstatus != NULL)
{
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 2621767..c12cb2c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char *const pgresStatus[] = {
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
"PGRES_COPY_BOTH",
- "PGRES_SINGLE_TUPLE"
+ "PGRES_SINGLE_TUPLE",
+ "PGRES_BATCH_OK",
+ "PGRES_BATCH_ABORTED"
};
/*
@@ -69,6 +71,9 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry* PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
/* ----------------
@@ -1090,7 +1095,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
- conn->asyncStatus = PGASYNC_READY;
+ conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
@@ -1113,6 +1118,13 @@ fail:
int
PQsendQuery(PGconn *conn, const char *query)
{
+ if (conn->in_batch)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+ return false;
+ }
+
if (!PQsendQueryStart(conn))
return 0;
@@ -1211,9 +1223,29 @@ PQsendPrepare(PGconn *conn,
const char *stmtName, const char *query,
int nParams, const Oid *paramTypes)
{
+ PGcommandQueueEntry *pipeCmd = NULL;
+ char **last_query;
+ PGQueryClass *queryclass;
+
if (!PQsendQueryStart(conn))
return 0;
+ if (conn->in_batch)
+ {
+ pipeCmd = PQmakePipelinedCommand(conn);
+
+ if (pipeCmd == NULL)
+ return 0; /* error msg already set */
+
+ last_query = &pipeCmd->query;
+ queryclass = &pipeCmd->queryclass;
+ }
+ else
+ {
+ last_query = &conn->last_query;
+ queryclass = &conn->queryclass;
+ }
+
/* check the arguments */
if (!stmtName)
{
@@ -1269,18 +1301,21 @@ PQsendPrepare(PGconn *conn,
goto sendFailed;
/* construct the Sync message */
- if (pqPutMsgStart('S', false, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ if (!conn->in_batch)
+ {
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are doing just a Parse */
- conn->queryclass = PGQUERY_PREPARE;
+ *queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ if (*last_query)
+ free(*last_query);
+ *last_query = strdup(query);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
@@ -1290,10 +1325,14 @@ PQsendPrepare(PGconn *conn,
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->in_batch)
+ PQappendPipelinedCommand(conn, pipeCmd);
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ PQrecyclePipelinedCommand(conn, pipeCmd);
pqHandleSendFailure(conn);
return 0;
}
@@ -1340,6 +1379,81 @@ PQsendQueryPrepared(PGconn *conn,
resultFormat);
}
+/* Get a new command queue entry, allocating it if required. Doesn't add it to
+ * the tail of the queue yet, use PQappendPipelinedCommand once the command has
+ * been written for that. If a command fails once it's called this, it should
+ * use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry*
+PQmakePipelinedCommand(PGconn *conn)
+{
+ PGcommandQueueEntry *entry;
+
+ if (conn->cmd_queue_recycle == NULL)
+ {
+ entry = (PGcommandQueueEntry*) malloc(sizeof(PGcommandQueueEntry));
+ if (entry == NULL)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ return NULL;
+ }
+ }
+ else
+ {
+ entry = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry->next;
+ }
+ entry->next = NULL;
+ entry->query = NULL;
+
+ return entry;
+}
+
+/* Append a precreated command queue entry to the queue after it's been
+ * sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_head = entry;
+ else
+ conn->cmd_queue_tail->next = entry;
+ conn->cmd_queue_tail = entry;
+}
+
+/* Push a command queue entry onto the freelist. It must be a dangling entry
+ * with null next pointer and not referenced by any other entry's next pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+ if (entry == NULL)
+ return;
+ if (entry->next != NULL)
+ {
+ fprintf(stderr, "tried to recycle non-dangling command queue entry");
+ abort();
+ }
+ entry->next = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry;
+}
+
+/* Set up for processing a new query's results */
+static void
+PQstartProcessingNewQuery(PGconn *conn)
+{
+ /* initialize async result-accumulation state */
+ conn->result = NULL;
+ conn->next_result = NULL;
+
+ /* reset single-row processing mode */
+ conn->singleRowMode = false;
+}
+
/*
* Common startup code for PQsendQuery and sibling routines
*/
@@ -1359,20 +1473,52 @@ PQsendQueryStart(PGconn *conn)
libpq_gettext("no connection to the server\n"));
return false;
}
- /* Can't send while already busy, either. */
- if (conn->asyncStatus != PGASYNC_IDLE)
+
+ /* Can't send while already busy, either, unless enqueuing for later */
+ if (conn->asyncStatus != PGASYNC_IDLE && !conn->in_batch)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
- /* initialize async result-accumulation state */
- conn->result = NULL;
- conn->next_result = NULL;
-
- /* reset single-row processing mode */
- conn->singleRowMode = false;
+ if (conn->in_batch)
+ {
+ /* When enqueuing a message we don't change much of the connection
+ * state since it's already in use for the current command. The
+ * connection state will get updated when PQgetNextQuery(...) advances
+ * to start processing the queued message.
+ *
+ * Just make sure we can safely enqueue given the current connection
+ * state. We can enqueue behind another queue item, or behind a
+ * non-queue command (one that sends its own sync), but we can't
+ * enqueue if the connection is in a copy state.
+ */
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_QUEUED:
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* ok to queue */
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot queue commands during COPY\n"));
+ return false;
+ case PGASYNC_IDLE:
+ fprintf(stderr, "internal error, idle state in batch mode");
+ abort();
+ break;
+ }
+ }
+ else
+ {
+ /* This command's results will come in immediately */
+ PQstartProcessingNewQuery(conn);
+ }
/* ready to send command message */
return true;
@@ -1397,6 +1543,10 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat)
{
int i;
+ PGcommandQueueEntry *pipeCmd = NULL;
+ char **last_query;
+ PGQueryClass *queryclass;
+
/* This isn't gonna work on a 2.0 server */
if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1406,6 +1556,23 @@ PQsendQueryGuts(PGconn *conn,
return 0;
}
+ if (conn->in_batch)
+ {
+ pipeCmd = PQmakePipelinedCommand(conn);
+
+ if (pipeCmd == NULL)
+ return 0; /* error msg already set */
+
+ last_query = &pipeCmd->query;
+ queryclass = &pipeCmd->queryclass;
+ }
+ else
+ {
+ last_query = &conn->last_query;
+ queryclass = &conn->queryclass;
+ }
+
+
/*
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
* using specified statement name and the unnamed portal.
@@ -1518,22 +1685,25 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd(conn) < 0)
goto sendFailed;
- /* construct the Sync message */
- if (pqPutMsgStart('S', false, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ if (!conn->in_batch)
+ {
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are using extended query protocol */
- conn->queryclass = PGQUERY_EXTENDED;
+ *queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
+ if (*last_query)
+ free(*last_query);
if (command)
- conn->last_query = strdup(command);
+ *last_query = strdup(command);
else
- conn->last_query = NULL;
+ *last_query = NULL;
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
@@ -1543,10 +1713,15 @@ PQsendQueryGuts(PGconn *conn,
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->in_batch)
+ PQappendPipelinedCommand(conn, pipeCmd);
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
+
return 1;
sendFailed:
+ PQrecyclePipelinedCommand(conn, pipeCmd);
pqHandleSendFailure(conn);
return 0;
}
@@ -1578,6 +1753,9 @@ pqHandleSendFailure(PGconn *conn)
* but it prevents buffer bloat if there's a lot of data available.)
*/
parseInput(conn);
+
+ /* TODO: handle pipelined queries by sending a sync and flushing until
+ * the next sync?? */
}
/*
@@ -1673,6 +1851,245 @@ PQisBusy(PGconn *conn)
return conn->asyncStatus == PGASYNC_BUSY;
}
+/* PQisInBatchMode
+ * Return true if currently in batch mode
+ */
+int
+PQisInBatchMode(PGconn *conn)
+{
+ if (!conn)
+ return FALSE;
+
+ return conn->in_batch;
+}
+
+/* PQqueriesInBatch
+ * Return true if there are queries currently pending in batch mode
+ */
+int
+PQqueriesInBatch(PGconn *conn)
+{
+ if (!PQisInBatchMode(conn))
+ return false;
+
+ return conn->cmd_queue_head != NULL;
+}
+
+/* Put an idle connection in batch mode. Commands submitted after this
+ * can be pipelined on the connection, there's no requirement to wait for
+ * one to finish before the next is dispatched.
+ *
+ * COPY is not permitted in batch mode.
+ *
+ * A set of commands is terminated by a PQendBatch. Multiple sets of batched
+ * commands may be sent while in batch mode. Batch mode can be exited by
+ * calling PQendBatchMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQbeginBatchMode(PGconn *conn)
+{
+ if (!conn)
+ return false;
+
+ if (conn->in_batch)
+ return true;
+
+ if (conn->asyncStatus != PGASYNC_IDLE)
+ return false;
+
+ conn->in_batch = true;
+ conn->asyncStatus = PGASYNC_QUEUED;
+
+ return true;
+}
+
+/* End batch mode and return to normal command mode.
+ *
+ * Has no effect unless the client has processed all results
+ * from all outstanding batches and the connection is idle,
+ * i.e. PQisInBatch() &&
+ *
+ * Returns true if batch mode ended.
+ */
+int
+PQendBatchMode(PGconn *conn)
+{
+ if (!conn)
+ return false;
+
+ if (!conn->in_batch)
+ return true;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_IDLE:
+ fprintf(stderr, "internal error, IDLE in batch mode");
+ abort();
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ fprintf(stderr, "internal error, COPY in batch mode");
+ abort();
+ break;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* can't end batch while busy */
+ return false;
+ case PGASYNC_QUEUED:
+ break;
+ }
+
+ /* still work to process */
+ if (conn->cmd_queue_head != NULL)
+ return false;
+
+ conn->in_batch = false;
+ conn->asyncStatus = PGASYNC_IDLE;
+
+ return true;
+}
+
+/* End a batch submission by sending a protocol sync. The connection will
+ * remain in batch mode and unavailable for new non-batch commands until all
+ * results from the batch are processed by the client.
+ *
+ * It's legal to start submitting another batch immediately, without waiting
+ * for the results of the current batch. There's no need to end batch mode
+ * and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and including
+ * the PQendBatch command result gets set to PGRES_BATCH_ABORTED state. If the
+ * whole batch is processed without error, a PGresult with PGRES_BATCH_OK is
+ * produced.
+ */
+int
+PQendBatch(PGconn *conn)
+{
+ PGcommandQueueEntry *entry;
+
+ if (!conn)
+ return false;
+
+ if (!conn->in_batch)
+ return false;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_IDLE:
+ fprintf(stderr, "internal error, IDLE in batch mode");
+ abort();
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ fprintf(stderr, "internal error, COPY in batch mode");
+ abort();
+ break;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ case PGASYNC_QUEUED:
+ /* can send sync to end this batch of cmds */
+ break;
+ }
+
+ entry = PQmakePipelinedCommand(conn);
+ entry->queryclass = PGQUERY_SYNC;
+ entry->query = NULL;
+
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ PQappendPipelinedCommand(conn, entry);
+
+ return true;
+
+sendFailed:
+ PQrecyclePipelinedCommand(conn, entry);
+ pqHandleSendFailure(conn);
+ return false;
+}
+
+/* PQgetNextQuery
+ * In batch mode, start processing the next query in the queue.
+ *
+ * Returns true if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns false if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+int
+PQgetNextQuery(PGconn *conn)
+{
+ PGcommandQueueEntry *next_query;
+
+ if (!conn)
+ return FALSE;
+
+ if (!conn->in_batch)
+ return false;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ fprintf(stderr, "internal error, COPY in batch mode");
+ abort();
+ break;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* client still has to process current query or results */
+ return false;
+ break;
+ case PGASYNC_IDLE:
+ fprintf(stderr, "internal error, idle in batch mode");
+ abort();
+ break;
+ case PGASYNC_QUEUED:
+ /* next query please */
+ break;
+ }
+
+ if (conn->cmd_queue_head == NULL)
+ {
+ /* In batch mode but nothing left on the queue; caller can submit
+ * more work or PQendBatchMode() now. */
+ return false;
+ }
+
+ /* Pop the next query from the queue and set up the connection state
+ * as if it'd just been dispatched from a non-batched call */
+ next_query = conn->cmd_queue_head;
+ conn->cmd_queue_head = next_query->next;
+ next_query->next = NULL;
+
+ PQstartProcessingNewQuery(conn);
+
+ conn->last_query = next_query->query;
+ next_query->query = NULL;
+ conn->queryclass = next_query->queryclass;
+
+ PQrecyclePipelinedCommand(conn, next_query);
+
+ /* Allow parsing of the query */
+ conn->asyncStatus = PGASYNC_BUSY;
+
+ /* Parse any available data */
+ parseInput(conn);
+
+ return true;
+}
+
/*
* PQgetResult
@@ -1718,6 +2135,8 @@ PQgetResult(PGconn *conn)
/*
* conn->errorMessage has been set by pqWait or pqReadData. We
* want to append it to any already-received error message.
+ *
+ * TODO: handle purging the queue here
*/
pqSaveErrorResult(conn);
conn->asyncStatus = PGASYNC_IDLE;
@@ -1732,13 +2151,33 @@ PQgetResult(PGconn *conn)
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
+ case PGASYNC_QUEUED:
res = NULL; /* query is complete */
break;
case PGASYNC_READY:
res = pqPrepareAsyncResult(conn);
+ if (conn->in_batch)
+ {
+ /* batched queries aren't followed by a Sync to put us back in
+ * PGASYNC_IDLE state, and when we do get a sync we could still
+ * have another batch coming after this one.
+ *
+ * The connection isn't idle since we can't submit new
+ * nonbatched commands. It isn't also busy since the current
+ * command is done and we need to process a new one.
+ */
+ conn->asyncStatus = PGASYNC_QUEUED;
+ }
+ else
+ {
+ /* Set the state back to BUSY, allowing parsing to proceed. */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+ break;
+ case PGASYNC_READY_MORE:
+ res = pqPrepareAsyncResult(conn);
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
- break;
case PGASYNC_COPY_IN:
res = getCopyResult(conn, PGRES_COPY_IN);
break;
@@ -1915,6 +2354,13 @@ PQexecStart(PGconn *conn)
if (!conn)
return false;
+ if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot PQexec in batch mode\n"));
+ return false;
+ }
+
/*
* Silently discard any prior query result that application didn't eat.
* This is probably poor design, but it's here for backward compatibility.
@@ -2109,6 +2555,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
{
+ PGcommandQueueEntry *pipeCmd = NULL;
+ PGQueryClass *queryclass;
+
/* Treat null desc_target as empty string */
if (!desc_target)
desc_target = "";
@@ -2124,6 +2573,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
return 0;
}
+ if (conn->in_batch)
+ {
+ pipeCmd = PQmakePipelinedCommand(conn);
+
+ if (pipeCmd == NULL)
+ return 0; /* error msg already set */
+
+ queryclass = &pipeCmd->queryclass;
+ }
+ else
+ {
+ queryclass = &conn->queryclass;
+ }
+
/* construct the Describe message */
if (pqPutMsgStart('D', false, conn) < 0 ||
pqPutc(desc_type, conn) < 0 ||
@@ -2132,15 +2595,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* construct the Sync message */
- if (pqPutMsgStart('S', false, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ if (!conn->in_batch)
+ {
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are doing a Describe */
- conn->queryclass = PGQUERY_DESCRIBE;
+ *queryclass = PGQUERY_DESCRIBE;
/* reset last-query string (not relevant now) */
- if (conn->last_query)
+ if (conn->last_query && !conn->in_batch)
{
free(conn->last_query);
conn->last_query = NULL;
@@ -2154,10 +2620,14 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->in_batch)
+ PQappendPipelinedCommand(conn, pipeCmd);
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ PQrecyclePipelinedCommand(conn, pipeCmd);
pqHandleSendFailure(conn);
return 0;
}
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index f1b90f3..e824f36 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -412,6 +412,12 @@ pqParseInput2(PGconn *conn)
{
char id;
+ if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch)
+ {
+ fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+ abort();
+ }
+
/*
* Loop to parse successive complete messages available in the buffer.
*/
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 0b8c62f..30c8dfd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -220,10 +220,17 @@ pqParseInput3(PGconn *conn)
return;
conn->asyncStatus = PGASYNC_READY;
break;
- case 'Z': /* backend is ready for new query */
+ case 'Z': /* sync response, backend is ready for new query */
if (getReadyForQuery(conn))
return;
- conn->asyncStatus = PGASYNC_IDLE;
+ if (conn->in_batch)
+ {
+ conn->result = PQmakeEmptyPGresult(conn,
+ PGRES_BATCH_OK);
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ else
+ conn->asyncStatus = PGASYNC_IDLE;
break;
case 'I': /* empty query */
if (conn->result == NULL)
@@ -305,7 +312,7 @@ pqParseInput3(PGconn *conn)
* parsing until the application accepts the current
* result.
*/
- conn->asyncStatus = PGASYNC_READY;
+ conn->asyncStatus = PGASYNC_READY_MORE;
return;
}
break;
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 9ca0756..a210e3e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -91,7 +91,9 @@ typedef enum
PGRES_NONFATAL_ERROR, /* notice or warning message */
PGRES_FATAL_ERROR, /* query failed */
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
- PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
+ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
+ PGRES_BATCH_OK, /* successful end of a batch of commands */
+ PGRES_BATCH_ABORTED, /* Command didn't run because of an abort earlier in a batch */
} ExecStatusType;
typedef enum
@@ -421,6 +423,14 @@ extern PGresult *PQgetResult(PGconn *conn);
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
+/* Routines for batch mode management */
+extern int PQisInBatchMode(PGconn *conn);
+extern int PQqueriesInBatch(PGconn *conn);
+extern int PQbeginBatchMode(PGconn *conn);
+extern int PQendBatchMode(PGconn *conn);
+extern int PQendBatch(PGconn *conn);
+extern int PQgetNextQuery(PGconn *conn);
+
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1183323..6caed9f 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,13 @@ typedef enum
{
PGASYNC_IDLE, /* nothing's happening, dude */
PGASYNC_BUSY, /* query in progress */
- PGASYNC_READY, /* result ready for PQgetResult */
+ PGASYNC_READY, /* query done, waiting for client to fetch result */
+ PGASYNC_READY_MORE, /* query done, waiting for client to fetch result,
+ More results expected from this query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
- PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGASYNC_QUEUED /* Current query done, more in queue */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +232,8 @@ typedef enum
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
- PGQUERY_DESCRIBE /* Describe Statement or Portal */
+ PGQUERY_DESCRIBE, /* Describe Statement or Portal */
+ PGQUERY_SYNC /* A protocol sync to end a batch */
} PGQueryClass;
/* PGSetenvStatusType defines the state of the PQSetenv state machine */
@@ -292,6 +296,22 @@ typedef struct pgDataValue
const char *value; /* data value, without zero-termination */
} PGdataValue;
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+ PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */
+ char *query; /* SQL command, or NULL if unknown */
+ struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
/*
* PGconn stores all the state data associated with a single connection
* to a backend.
@@ -356,6 +376,7 @@ struct pg_conn
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
+ bool in_batch; /* connection is in batch (pipelined) mode */
bool singleRowMode; /* return current query result row-by-row? */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY
@@ -363,6 +384,15 @@ struct pg_conn
PGnotify *notifyHead; /* oldest unreported Notify msg */
PGnotify *notifyTail; /* newest unreported Notify msg */
+ /* The command queue
+ *
+ * head is the next pending cmd, tail is where we append new commands.
+ * Freed entries for recycling go on the recycle linked list.
+ */
+ PGcommandQueueEntry *cmd_queue_head;
+ PGcommandQueueEntry *cmd_queue_tail;
+ PGcommandQueueEntry *cmd_queue_recycle;
+
/* Connection data */
/* See PQconnectPoll() for how we use 'int' and not 'pgsocket'. */
pgsocket sock; /* FD for socket, PGINVALID_SOCKET if
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index 31da210..92a6faf 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
override LDLIBS := $(libpq_pgport) $(LDLIBS)
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqbatch
all: $(PROGS)
diff --git a/src/test/examples/testlibpqbatch.c b/src/test/examples/testlibpqbatch.c
new file mode 100644
index 0000000..f82bce2
--- /dev/null
+++ b/src/test/examples/testlibpqbatch.c
@@ -0,0 +1,690 @@
+/*
+ * src/test/examples/testlibpqbatch.c
+ *
+ *
+ * testlibpqbatch.c
+ * Test of batch execution funtionality
+ */
+
+#ifdef WIN32
+#include <windows.h>
+#endif
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include "libpq-fe.h"
+
+static void exit_nicely(PGconn *conn);
+static void simple_batch(PGconn *conn);
+static void test_disallowed_in_batch(PGconn *conn);
+static void batch_insert_pipelined(PGconn *conn, int n_rows);
+static void batch_insert_sequential(PGconn *conn, int n_rows);
+
+#ifndef VERBOSE
+#define VERBOSE 0
+#endif
+
+static const Oid INT4OID = 23;
+
+static void
+exit_nicely(PGconn *conn)
+{
+ PQfinish(conn);
+ exit(1);
+}
+
+static void
+simple_batch(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ /*
+ * Enter batch mode and dispatch a set of operations, which we'll then process
+ * the results of as they come in.
+ *
+ * For a simple case we should be able to do this without interim processing
+ * of results since our out buffer will give us enough slush to work with
+ * and we won't block on sending. So blocking mode is fine.
+ */
+ if (PQisnonblocking(conn))
+ {
+ fprintf(stderr, "Expected blocking connection mode\n");
+ goto fail;
+ }
+
+ if (!PQbeginBatchMode(conn))
+ {
+ fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (!PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0))
+ {
+ fprintf(stderr, "dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (PQendBatchMode(conn))
+ {
+ fprintf(stderr, "exiting batch mode with work in progress should fail, but succeeded\n");
+ goto fail;
+ }
+
+ if (!PQendBatch(conn))
+ {
+ fprintf(stderr, "Ending a batch failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ /* in batch mode we have to ask for the first result to be processed; until we
+ * do PQgetResult will return null: */
+ if (PQgetResult(conn) != NULL)
+ {
+ fprintf(stderr, "PQgetResult returned something in a batch before first PQgetNextQuery() call\n");
+ goto fail;
+ }
+
+ if (!PQgetNextQuery(conn))
+ {
+ fprintf(stderr, "PQgetNextQuery() failed at first batch entry: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ {
+ fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, "Unexpected result code %s from first batch item\n",
+ PQresStatus(PQresultStatus(res)));
+ goto fail;
+ }
+
+ PQclear(res);
+
+ if (PQgetResult(conn) != NULL)
+ {
+ fprintf(stderr, "PQgetResult returned something extra after first result before PQgetNextQuery() call\n");
+ goto fail;
+ }
+
+ /* Even though we've processed the result there's still a sync to come and we
+ * can't exit batch mode yet */
+ if (PQendBatchMode(conn))
+ {
+ fprintf(stderr, "exiting batch mode after query but before sync succeeded incorrectly\n");
+ goto fail;
+ }
+
+ /* should now get an explicit sync result */
+ if (!PQgetNextQuery(conn))
+ {
+ fprintf(stderr, "PQgetNextQuery() failed at sync after first batch entry: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ {
+ fprintf(stderr, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (PQresultStatus(res) != PGRES_BATCH_OK)
+ {
+ fprintf(stderr, "Unexpected result code %s instead of sync result, error: %s\n",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ goto fail;
+ }
+
+ PQclear(res);
+
+ /* We're still in a batch... */
+ if (!PQisInBatchMode(conn))
+ {
+ fprintf(stderr, "Fell out of batch mode somehow\n");
+ goto fail;
+ }
+
+ /* until we end it, which we can safely do now */
+ if (!PQendBatchMode(conn))
+ {
+ fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (PQisInBatchMode(conn))
+ {
+ fprintf(stderr, "exiting batch mode didn't seem to work\n");
+ goto fail;
+ }
+
+ return;
+
+fail:
+ PQclear(res);
+ exit_nicely(conn);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+ PGresult *res = NULL;
+
+ if (PQisnonblocking(conn))
+ {
+ fprintf(stderr, "Expected blocking connection mode: %u\n", __LINE__);
+ goto fail;
+ }
+
+ if (!PQbeginBatchMode(conn))
+ {
+ fprintf(stderr, "Unable to enter batch mode\n");
+ goto fail;
+ }
+
+ if (!PQisInBatchMode(conn))
+ {
+ fprintf(stderr, "Batch mode not activated properly\n");
+ goto fail;
+ }
+
+ /* PQexec should fail in batch mode */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ {
+ fprintf(stderr, "PQexec should fail in batch mode but succeeded\n");
+ goto fail;
+ }
+
+ /* So should PQsendQuery */
+ if (PQsendQuery(conn, "SELECT 1") != 0)
+ {
+ fprintf(stderr, "PQsendQuery should fail in batch mode but succeeded\n");
+ goto fail;
+ }
+
+ /* Entering batch mode when already in batch mode is OK */
+ if (!PQbeginBatchMode(conn))
+ {
+ fprintf(stderr, "re-entering batch mode should be a no-op but failed\n");
+ goto fail;
+ }
+
+ if (PQisBusy(conn))
+ {
+ fprintf(stderr, "PQisBusy should return false when idle in batch, returned true\n");
+ goto fail;
+ }
+
+ /* ok, back to normal command mode */
+ if (!PQendBatchMode(conn))
+ {
+ fprintf(stderr, "couldn't exit idle empty batch mode\n");
+ goto fail;
+ }
+
+ if (PQisInBatchMode(conn))
+ {
+ fprintf(stderr, "Batch mode not terminated properly\n");
+ goto fail;
+ }
+
+ /* exiting batch mode when not in batch mode should be a no-op */
+ if (!PQendBatchMode(conn))
+ {
+ fprintf(stderr, "batch mode exit when not in batch mode should succeed but failed\n");
+ goto fail;
+ }
+
+ /* can now PQexec again */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, "PQexec should succeed after exiting batch mode but failed with: %s\n",
+ PQerrorMessage(conn));
+ goto fail;
+ }
+
+ return;
+
+fail:
+ PQclear(res);
+ exit_nicely(conn);
+}
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+/* State machine enums for batch insert */
+typedef enum BatchInsertStep
+{
+ BI_BEGIN_TX,
+ BI_DROP_TABLE,
+ BI_CREATE_TABLE,
+ BI_PREPARE,
+ BI_INSERT_ROWS,
+ BI_COMMIT_TX,
+ BI_SYNC,
+ BI_DONE
+} BatchInsertStep;
+
+static const char * const drop_table_sql
+ = "DROP TABLE IF EXISTS batch_demo";
+static const char * const create_table_sql
+ = "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char * const insert_sql
+ = "INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+static void
+batch_insert_pipelined(PGconn *conn, int n_rows)
+{
+ PGresult *res = NULL;
+ const char *insert_params[1];
+ Oid insert_param_oids[1] = {INT4OID};
+ char insert_param_0[MAXINTLEN];
+ BatchInsertStep send_step = BI_BEGIN_TX, recv_step = BI_BEGIN_TX;
+ int rows_to_send, rows_to_receive;
+
+ insert_params[0] = &insert_param_0[0];
+
+ rows_to_send = rows_to_receive = n_rows;
+
+ /*
+ * Do a batched insert into a table created at the start of the batch
+ */
+ if (!PQbeginBatchMode(conn))
+ {
+ fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (!PQsendQueryParams(conn, "BEGIN",
+ 0, NULL, NULL, NULL, NULL, 0))
+ {
+ fprintf(stderr, "xact start failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ #if VERBOSE
+ fprintf(stdout, "sent BEGIN\n");
+ #endif
+ send_step = BI_DROP_TABLE;
+
+ if (!PQsendQueryParams(conn, drop_table_sql,
+ 0, NULL, NULL, NULL, NULL, 0))
+ {
+ fprintf(stderr, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ #if VERBOSE
+ fprintf(stdout, "sent DROP\n");
+ #endif
+ send_step = BI_CREATE_TABLE;
+
+ if (!PQsendQueryParams(conn, create_table_sql,
+ 0, NULL, NULL, NULL, NULL, 0))
+ {
+ fprintf(stderr, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ #if VERBOSE
+ fprintf(stdout, "sent CREATE\n");
+ #endif
+ send_step = BI_PREPARE;
+
+ if (!PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids))
+ {
+ fprintf(stderr, "dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ #if VERBOSE
+ fprintf(stdout, "sent PREPARE\n");
+ #endif
+ send_step = BI_INSERT_ROWS;
+
+ /* Now we start inserting. We'll be sending enough data that we could fill
+ * our out buffer, so to avoid deadlocking we need to enter nonblocking
+ * mode and consume input while we send more output. As results of each query are
+ * processed we should pop them to allow processing of the next query. There's
+ * no need to finish the batch before processing results.
+ */
+ if (PQsetnonblocking(conn, 1) != 0)
+ {
+ fprintf(stderr, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ while (recv_step != BI_DONE)
+ {
+ int sock;
+ fd_set input_mask;
+ fd_set output_mask;
+
+ sock = PQsocket(conn);
+
+ if (sock < 0)
+ break; /* shouldn't happen */
+
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ FD_ZERO(&output_mask);
+ FD_SET(sock, &output_mask);
+
+ if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+
+ /* Process any results, so we keep the server's out buffer free flowing
+ * and it can continue to process input */
+ if (FD_ISSET(sock, &input_mask))
+ {
+ PQconsumeInput(conn);
+
+ /* Read until we'd block if we tried to read */
+ while (!PQisBusy(conn) && recv_step < BI_DONE)
+ {
+ const char * cmdtag;
+ const char * description = NULL;
+ int status;
+ BatchInsertStep next_step;
+
+
+ res = PQgetResult(conn);
+
+ if (res == NULL)
+ {
+ /* No more results from this query, advance to the next result */
+ if (!PQgetNextQuery(conn))
+ {
+ fprintf(stderr, "Expected next query result but unable to dequeue: %s\n",
+ PQerrorMessage(conn));
+ goto fail;
+ }
+ #if VERBOSE
+ fprintf(stdout, "next query!\n");
+ #endif
+ continue;
+ }
+
+ status = PGRES_COMMAND_OK;
+ next_step = recv_step + 1;
+ switch (recv_step)
+ {
+ case BI_BEGIN_TX:
+ cmdtag = "BEGIN";
+ break;
+ case BI_DROP_TABLE:
+ cmdtag = "DROP TABLE";
+ break;
+ case BI_CREATE_TABLE:
+ cmdtag = "CREATE TABLE";
+ break;
+ case BI_PREPARE:
+ cmdtag = "";
+ description = "PREPARE";
+ break;
+ case BI_INSERT_ROWS:
+ cmdtag = "INSERT";
+ rows_to_receive --;
+ if (rows_to_receive > 0)
+ next_step = BI_INSERT_ROWS;
+ break;
+ case BI_COMMIT_TX:
+ cmdtag = "COMMIT";
+ break;
+ case BI_SYNC:
+ cmdtag = "";
+ description = "SYNC";
+ status = PGRES_BATCH_OK;
+ break;
+ case BI_DONE:
+ /* unreachable */
+ abort();
+ }
+ if (description == NULL)
+ description = cmdtag;
+
+ #if VERBOSE
+ fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n",
+ recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step);
+ #endif
+
+ if (PQresultStatus(res) != status)
+ {
+ fprintf(stderr, "%s reported status %s, expected %s. Error msg is [%s]\n",
+ description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn));
+ goto fail;
+ }
+ if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+ {
+ fprintf(stderr, "%s expected command tag '%s', got '%s'\n",
+ description, cmdtag, PQcmdStatus(res));
+ goto fail;
+ }
+ #if VERBOSE
+ fprintf(stdout, "Got %s OK\n", cmdtag);
+ #endif
+ recv_step = next_step;
+
+ PQclear(res);
+ }
+ }
+
+ /* Write more rows and/or the end batch message, if needed */
+ if (FD_ISSET(sock, &output_mask))
+ {
+ PQflush(conn);
+
+ if (send_step == BI_INSERT_ROWS)
+ {
+ snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+ insert_param_0[MAXINTLEN-1] = '\0';
+
+ if (PQsendQueryPrepared(conn, "my_insert",
+ 1, insert_params, NULL, NULL, 0))
+ {
+ #if VERBOSE
+ fprintf(stdout, "sent row %d\n", rows_to_send);
+ #endif
+ rows_to_send --;
+ if (rows_to_send == 0)
+ send_step = BI_COMMIT_TX;
+ }
+ else
+ {
+ /* in nonblocking mode, so it's OK for an insert to fail to send */
+ fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+ rows_to_send, PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_COMMIT_TX)
+ {
+ if (PQsendQueryParams(conn, "COMMIT",
+ 0, NULL, NULL, NULL, NULL, 0))
+ {
+ #if VERBOSE
+ fprintf(stdout, "sent COMMIT\n");
+ #endif
+ send_step = BI_SYNC;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: failed to send commit: %s\n",
+ PQerrorMessage(conn));
+ }
+ } else if (send_step == BI_SYNC)
+ {
+ if (PQendBatch(conn))
+ {
+ #if VERBOSE
+ fprintf(stdout, "Dispatched end batch message\n");
+ #endif
+ send_step = BI_DONE;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ }
+
+ }
+
+ /* We've got the sync message and the batch should be done */
+ if (!PQendBatchMode(conn))
+ {
+ fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ if (PQsetnonblocking(conn, 0) != 0)
+ {
+ fprintf(stderr, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+
+ return;
+
+fail:
+ PQclear(res);
+ exit_nicely(conn);
+}
+
+
+static void
+batch_insert_sequential(PGconn *conn, int nrows)
+{
+ PGresult *res = NULL;
+ const char *insert_params[1];
+ Oid insert_param_oids[1] = {INT4OID};
+ char insert_param_0[MAXINTLEN];
+
+ insert_params[0] = &insert_param_0[0];
+
+ res = PQexec(conn, "BEGIN");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "BEGIN failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ PQclear(res);
+
+ res = PQexec(conn, drop_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ PQclear(res);
+
+ res =PQexec(conn, create_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ PQclear(res);
+
+ res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "prepare failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ PQclear(res);
+
+ while (nrows > 0)
+ {
+ snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows);
+ insert_param_0[MAXINTLEN-1] = '\0';
+
+ res = PQexecPrepared(conn, "my_insert2",
+ 1, insert_params, NULL, NULL, 0);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "INSERT failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ PQclear(res);
+ nrows --;
+ }
+
+ res = PQexec(conn, "COMMIT");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "COMMIT failed: %s\n", PQerrorMessage(conn));
+ goto fail;
+ }
+ PQclear(res);
+
+ return;
+
+fail:
+ PQclear(res);
+}
+
+
+int
+main(int argc, char **argv)
+{
+ const char *conninfo;
+ PGconn *conn;
+ struct timeval start_time, end_time, elapsed_time;
+
+ /*
+ * If the user supplies a parameter on the command line, use it as the
+ * conninfo string; otherwise default to setting dbname=postgres and using
+ * environment variables or defaults for all other connection parameters.
+ */
+ if (argc > 1)
+ conninfo = argv[1];
+ else
+ conninfo = "dbname = postgres";
+
+ /* Make a connection to the database */
+ conn = PQconnectdb(conninfo);
+
+ /* Check to see that the backend connection was successfully made */
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ fprintf(stderr, "Connection to database failed: %s\n",
+ PQerrorMessage(conn));
+ exit_nicely(conn);
+ }
+
+ test_disallowed_in_batch(conn);
+ simple_batch(conn);
+
+ gettimeofday(&start_time, NULL);
+ batch_insert_pipelined(conn, 10000);
+ gettimeofday(&end_time, NULL);
+ timersub(&end_time, &start_time, &elapsed_time);
+ printf("batch insert elapsed: %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec);
+
+ gettimeofday(&start_time, NULL);
+ batch_insert_sequential(conn, 10000);
+ gettimeofday(&end_time, NULL);
+ timersub(&end_time, &start_time, &elapsed_time);
+ printf("sequential insert elapsed: %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec);
+
+ fprintf(stderr, "Done.\n");
+
+
+ /* close the connection to the database and cleanup */
+ PQfinish(conn);
+
+ return 0;
+}
--
2.5.5
On 20 May 2016 at 23:18, Craig Ringer <craig@2ndquadrant.com> wrote:
On 20 May 2016 at 15:35, Craig Ringer <craig@2ndquadrant.com> wrote:
You can, however, omit Sync from between messages and send a series of
protocol messages, likeParse/Bind/Execute/Bind/Execute/Bind/Execute/Sync
to avoid round-trip overheads.
I implemented what I think is a pretty solid proof of concept of this for
kicks this evening. Attached, including basic test program. Patch attached.
The performance difference over higher latency links is huge, see below.
I finished it off and submitted it.
/messages/by-id/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com
https://commitfest.postgresql.org/10/634/
I'll use the other thread for the patch from now on.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On 2016/05/18 7:08, Michael Paquier wrote:
On Wed, May 18, 2016 at 6:00 AM, Manuel Kniep <m.kniep@web.de> wrote:
I realized that inserts into foreign tables are only done row by row.
Consider copying data from one local table to a foreign table withINSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM local_table;
When the foreign server is for example in another datacenter with long latency,
this as an enormous performance trade off.
I am adding Fujita-san in the loop here, he is
quite involved with postgres_fdw these days so perhaps he has some
input to offer.
Honestly, I didn't have any idea for executing such an insert
efficiently, but I was thinking to execute an insert into a foreign
table efficiently, by sending the whole insert to the remote server, if
possible. For example, if the insert is of the form:
INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4, 5, 6) or
INSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM foreign_table2
where foreign_table and foreign_table2 belong to the same foreign
server, then we could send the whole insert to the remote server.
Wouldn't that make sense?
Best regards,
Etsuro Fujita
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 26, 2016 at 4:25 AM, Etsuro Fujita
<fujita.etsuro@lab.ntt.co.jp> wrote:
On 2016/05/18 7:08, Michael Paquier wrote:
On Wed, May 18, 2016 at 6:00 AM, Manuel Kniep <m.kniep@web.de> wrote:
I realized that inserts into foreign tables are only done row by row.
Consider copying data from one local table to a foreign table withINSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM local_table;
When the foreign server is for example in another datacenter with long
latency,
this as an enormous performance trade off.I am adding Fujita-san in the loop here, he is
quite involved with postgres_fdw these days so perhaps he has some
input to offer.Honestly, I didn't have any idea for executing such an insert efficiently,
but I was thinking to execute an insert into a foreign table efficiently, by
sending the whole insert to the remote server, if possible. For example, if
the insert is of the form:INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4, 5, 6) or
INSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM foreign_table2where foreign_table and foreign_table2 belong to the same foreign server,
then we could send the whole insert to the remote server.Wouldn't that make sense?
Query strings have a limited length, and this assumption is true for
many code paths in the backend code, so doing that with a long string
would introduce more pain in the logic than anything else, as this
would become more data type sensitive.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2016/05/27 8:49, Michael Paquier wrote:
On Thu, May 26, 2016 at 4:25 AM, Etsuro Fujita
<fujita.etsuro@lab.ntt.co.jp> wrote:
Honestly, I didn't have any idea for executing such an insert efficiently,
but I was thinking to execute an insert into a foreign table efficiently, by
sending the whole insert to the remote server, if possible. For example, if
the insert is of the form:INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4, 5, 6) or
INSERT INTO foreign_table(a,b,c) SELECT a,b,c FROM foreign_table2where foreign_table and foreign_table2 belong to the same foreign server,
then we could send the whole insert to the remote server.Wouldn't that make sense?
Query strings have a limited length, and this assumption is true for
many code paths in the backend code, so doing that with a long string
would introduce more pain in the logic than anything else, as this
would become more data type sensitive.
That's a good point, but the basic idea is to send the local query
almost-as-is to the remote server if possible. For example, if the
local query is "INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4,
5, 6)", send the remote query "INSERT INTO remote_table(a,b,c) VALUES
(1, 2, 3), (4, 5, 6)" to the remote server where remote_table is the
table name for the foreign table on the remote server. So, wouldn't the
query string length be a problem in many cases? Maybe I'm missing
something, though.
Best regards,
Etsuro Fujita
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 30 May 2016 at 16:17, Etsuro Fujita <fujita.etsuro@lab.ntt.co.jp> wrote:
That's a good point, but the basic idea is to send the local query
almost-as-is to the remote server if possible. For example, if the local
query is "INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4, 5, 6)",
send the remote query "INSERT INTO remote_table(a,b,c) VALUES (1, 2, 3),
(4, 5, 6)" to the remote server where remote_table is the table name for
the foreign table on the remote server. So, wouldn't the query string
length be a problem in many cases? Maybe I'm missing something, though.
<http://www.postgresql.org/mailpref/pgsql-hackers>
FDWs don't operate at that level. They don't see the original query string.
They're plan nodes that operate with a row-by-row push/pull model. The
foreign table node in question has no idea you're doing a multivalued
insert and doesn't care if it's INSERT INTO ... SELECT, INSERT INTO ...
VALUES, or COPY.
That's why I think using batching is the way to go here. Each operation
remains isolated, but you don't force a round trip for each one, you just
queue them up on the wire and you flush only at end-of-statement. A failure
will cause the statement to ERROR and abort the tx, so the effect is the
same, though the failure might be a bit later than if you forced a flush
each time.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On 2016/05/30 22:59, Craig Ringer wrote:
On 30 May 2016 at 16:17, Etsuro Fujita <fujita.etsuro@lab.ntt.co.jp> wrote:
That's a good point, but the basic idea is to send the local query
almost-as-is to the remote server if possible. For example, if the local
query is "INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4, 5, 6)",
send the remote query "INSERT INTO remote_table(a,b,c) VALUES (1, 2, 3),
(4, 5, 6)" to the remote server where remote_table is the table name for
the foreign table on the remote server. So, wouldn't the query string
length be a problem in many cases? Maybe I'm missing something, though.
<http://www.postgresql.org/mailpref/pgsql-hackers>FDWs don't operate at that level. They don't see the original query string.
They're plan nodes that operate with a row-by-row push/pull model. The
foreign table node in question has no idea you're doing a multivalued
insert and doesn't care if it's INSERT INTO ... SELECT, INSERT INTO ...
VALUES, or COPY.
IIUC, what Fujita-san seems to be referring to here is safe push-down of a
insert's query or values expression (and hence the whole insert itself)
considered during the *planning* step. Although that sounds like a
different optimization from what's being discussed on this thread. The
latter certainly seems to have its benefits in case of push-down failure
and might as well be the majority of cases.
Thanks,
Amit
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2016/05/31 14:53, Amit Langote wrote:
On 2016/05/30 22:59, Craig Ringer wrote:
On 30 May 2016 at 16:17, Etsuro Fujita <fujita.etsuro@lab.ntt.co.jp> wrote:
That's a good point, but the basic idea is to send the local query
almost-as-is to the remote server if possible. For example, if the local
query is "INSERT INTO foreign_table(a,b,c) VALUES (1, 2, 3), (4, 5, 6)",
send the remote query "INSERT INTO remote_table(a,b,c) VALUES (1, 2, 3),
(4, 5, 6)" to the remote server where remote_table is the table name for
the foreign table on the remote server. So, wouldn't the query string
length be a problem in many cases? Maybe I'm missing something, though.
<http://www.postgresql.org/mailpref/pgsql-hackers>
FDWs don't operate at that level. They don't see the original query string.
They're plan nodes that operate with a row-by-row push/pull model. The
foreign table node in question has no idea you're doing a multivalued
insert and doesn't care if it's INSERT INTO ... SELECT, INSERT INTO ...
VALUES, or COPY.
IIUC, what Fujita-san seems to be referring to here is safe push-down of a
insert's query or values expression (and hence the whole insert itself)
considered during the *planning* step.
That's really what I have in mind. Thanks for the explanation!
Although that sounds like a
different optimization from what's being discussed on this thread. The
latter certainly seems to have its benefits in case of push-down failure
and might as well be the majority of cases.
Agreed.
Best regards,
Etsuro Fujita
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers