Pipeline mode and PQpipelineSync()

Started by Boris Kolpackovover 4 years ago41 messages
#1Boris Kolpackov
boris@codesynthesis.com

I am trying to add bulk operation support to ODB (a C++ ORM) using
the new pipeline mode added to libpq in PostgreSQL 14. However, things
don't seem to be working according to the documentation (or perhaps I
am misunderstanding something). Specifically, the documentation[1]https://www.postgresql.org/docs/14/libpq-pipeline-mode.html
makes it sound like the use of PQpipelineSync() is optional (34.5.1.1
"Issuing Queries"):

"After entering pipeline mode, the application dispatches requests using
PQsendQuery, PQsendQueryParams, or its prepared-query sibling
PQsendQueryPrepared. These requests are queued on the client-side until
flushed to the server; this occurs when PQpipelineSync is used to establish a
synchronization point in the pipeline, or when PQflush is called. [...]

The server executes statements, and returns results, in the order the client
sends them. The server will begin executing the commands in the pipeline
immediately, not waiting for the end of the pipeline. [...]"

Based on this I expect to be able to queue a single prepared INSERT
statement with PQsendQueryPrepared() and then call PQflush() and
PQconsumeInput() to send/receive the data. This, however, does not
work: the client gets blocked because there is no data to read. Here
is the call sequence:

select() # socket is writable
PQsendQueryPrepared() # success
PQflush() # returns 0 (queue is now empty)
select() # blocked here indefinitely

In contrast, if I add the PQpipelineSync() call after PQsendQueryPrepared(),
then everything starts functioning as expected:

select() # socket is writable
PQsendQueryPrepared() # success
PQpipelineSync() # success
PQflush() # returns 0 (queue is now empty)
select() # socket is readable
PQconsumeInput() # success
PQgetResult() # INSERT result
PQgetResult() # NULL
PQgetResult() # PGRES_PIPELINE_SYNC

So to me it looks like, contrary to the documentation, the server does
not start executing the statements immediately, instead waiting for the
synchronization point. Or am I missing something here?

The above tests were performed using libpq from 14beta1 running against
PostgreSQL server version 9.5. If you would like to take a look at the
actual code, you can find it here[2]https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n771 (the PIPELINE_SYNC macro controls
whether PQpipelineSync() is used).

On a related note, I've been using libpq_pipeline.c[3]https://doxygen.postgresql.org/libpq__pipeline_8c_source.html as a reference
and I believe it has a busy loop calling PQflush() repeatedly on line
721 since once everything has been sent and we are waiting for the
result, select() will keep returning with an indication that the socket
is writable (you can find one way to fix this in [2]https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n771).

[1]: https://www.postgresql.org/docs/14/libpq-pipeline-mode.html
[2]: https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n771
[3]: https://doxygen.postgresql.org/libpq__pipeline_8c_source.html

#2Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#1)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-16, Boris Kolpackov wrote:

Specifically, the documentation[1]
makes it sound like the use of PQpipelineSync() is optional (34.5.1.1
"Issuing Queries"):

Hmm. My intention here was to indicate that you should have
PQpipelineSync *somewhere*, but that the server was free to start
executing some commands even before that, if the buffered commands
happened to reach the server somehow -- but not necessarily that the
results from those commands would reach the client immediately.

I'll experiment a bit more to be sure that what I'm saying is correct.
But if it is, then I think the documentation you quote is misleading:

"After entering pipeline mode, the application dispatches requests using
PQsendQuery, PQsendQueryParams, or its prepared-query sibling
PQsendQueryPrepared. These requests are queued on the client-side until
flushed to the server; this occurs when PQpipelineSync is used to establish a
synchronization point in the pipeline, or when PQflush is called. [...]

The server executes statements, and returns results, in the order the client
sends them. The server will begin executing the commands in the pipeline
immediately, not waiting for the end of the pipeline. [...]"

... because it'll lead people to do what you've done, only to discover
that it doesn't really work.

I think I should rephrase this to say that PQpipelineSync() is needed
where the user needs the server to start executing commands; and
separately indicate that it is possible (but not promised) that the
server would start executing commands ahead of time because $reasons.

Do I have it right that other than this documentation problem, you've
been able to use pipeline mode successfully?

So to me it looks like, contrary to the documentation, the server does
not start executing the statements immediately, instead waiting for the
synchronization point. Or am I missing something here?

I don't think you are.

The above tests were performed using libpq from 14beta1 running against
PostgreSQL server version 9.5. If you would like to take a look at the
actual code, you can find it here[2] (the PIPELINE_SYNC macro controls
whether PQpipelineSync() is used).

Thanks.

On a related note, I've been using libpq_pipeline.c[3] as a reference
and I believe it has a busy loop calling PQflush() repeatedly on line
721 since once everything has been sent and we are waiting for the
result, select() will keep returning with an indication that the socket
is writable

Oops, thanks, will look at fixing this too.

(you can find one way to fix this in [2]).
[2] https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n771

Neat, can do likewise I suppose.

--
�lvaro Herrera 39�49'30"S 73�17'W

#3Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#2)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

I think I should rephrase this to say that PQpipelineSync() is needed
where the user needs the server to start executing commands; and
separately indicate that it is possible (but not promised) that the
server would start executing commands ahead of time because $reasons.

I think always requiring PQpipelineSync() is fine since it also serves
as an error recovery boundary. But the fact that the server waits until
the sync message to start executing the pipeline is surprising. To me
this seems to go contrary to the idea of a "pipeline".

In fact, I see the following ways the server could behave:

1. The server starts executing queries and sending their results before
receiving the sync message.

2. The server starts executing queries before receiving the sync message
but buffers the results until it receives the sync message.

3. The server buffers the queries and only starts executing them and
sending the results after receiving the sync message.

My observations suggest that the server behaves as (3) but it could
also be (2).

While it can be tempting to say that this is an implementation detail,
this affects the way one writes a client. For example, I currently have
the following comment in my code:

// Send queries until we get blocked. This feels like a better
// overall strategy to keep the server busy compared to sending one
// query at a time and then re-checking if there is anything to read
// because the results of INSERT/UPDATE/DELETE are presumably small
// and quite a few of them can get buffered before the server gets
// blocked.

This would be a good strategy for behavior (1) but not (3) (where it
would make more sense to queue the queries on the client side). So I
think it would be useful to clarify the server behavior and specify
it in the documentation.

Do I have it right that other than this documentation problem, you've
been able to use pipeline mode successfully?

So far I've only tried it in a simple prototype (single INSERT statement).
But I am busy plugging it into ODB's bulk operation support (that we
already have for Oracle and MSSQL) and once that's done I should be
able to exercise things in more meaningful ways.

#4Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#3)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-21, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

I think I should rephrase this to say that PQpipelineSync() is needed
where the user needs the server to start executing commands; and
separately indicate that it is possible (but not promised) that the
server would start executing commands ahead of time because $reasons.

I think always requiring PQpipelineSync() is fine since it also serves
as an error recovery boundary. But the fact that the server waits until
the sync message to start executing the pipeline is surprising. To me
this seems to go contrary to the idea of a "pipeline".

But does that actually happen? There's a very easy test we can do by
sending queries that sleep. If my libpq program sends a "SELECT
pg_sleep(2)", then PQflush(), then sleep in the client program two more
seconds without sending the sync; and *then* send the sync, I find that
the program takes 2 seconds, not four. This shows that both client and
server slept in parallel, even though I didn't send the Sync until after
the client was done sleeping.

In order to see this, I patched libpq_pipeline.c with the attached, and
ran it under time:

time ./libpq_pipeline simple_pipeline -t simple.trace
simple pipeline... sent and flushed the sleep. Sleeping 2s here:
client sleep done
ok

real 0m2,008s
user 0m0,000s
sys 0m0,003s

So I see things happening as you describe in (1):

In fact, I see the following ways the server could behave:

1. The server starts executing queries and sending their results before
receiving the sync message.

I am completely at a loss on how to explain a server that behaves in any
other way, given how the protocol is designed. There is no buffering on
the server side.

While it can be tempting to say that this is an implementation detail,
this affects the way one writes a client. For example, I currently have
the following comment in my code:

// Send queries until we get blocked. This feels like a better
// overall strategy to keep the server busy compared to sending one
// query at a time and then re-checking if there is anything to read
// because the results of INSERT/UPDATE/DELETE are presumably small
// and quite a few of them can get buffered before the server gets
// blocked.

This would be a good strategy for behavior (1) but not (3) (where it
would make more sense to queue the queries on the client side).

Agreed, that's the kind of strategy I would have thought was the most
reasonable, given my understanding of how the protocol works.

I wonder if your program is being affected by something else. Maybe the
socket is nonblocking (though I don't quite understand how that would
affect the client behavior in just this way), or your program is
buffering elsewhere. I don't do C++ much so I can't help you with that.

So I think it would be useful to clarify the server behavior and
specify it in the documentation.

I'll see about improving the docs on these points.

Do I have it right that other than this documentation problem, you've
been able to use pipeline mode successfully?

So far I've only tried it in a simple prototype (single INSERT statement).
But I am busy plugging it into ODB's bulk operation support (that we
already have for Oracle and MSSQL) and once that's done I should be
able to exercise things in more meaningful ways.

Fair enough.

--
�lvaro Herrera 39�49'30"S 73�17'W

#5Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Alvaro Herrera (#4)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-22, Alvaro Herrera wrote:

So I think it would be useful to clarify the server behavior and
specify it in the documentation.

I'll see about improving the docs on these points.

So I started to modify the second paragraph to indicate that the client
would send data on PQflush/buffer full/PQpipelineSync, only to realize
that the first paragraph already explains this. So I'm not sure if any
changes are needed.

Maybe your complaint is only based on disagreement about what does libpq
do regarding queueing commands; and as far as I can tell in quick
experimentation with libpq, it works as the docs state already.

--
�lvaro Herrera Valdivia, Chile

#6Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#4)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

I think always requiring PQpipelineSync() is fine since it also serves
as an error recovery boundary. But the fact that the server waits until
the sync message to start executing the pipeline is surprising. To me
this seems to go contrary to the idea of a "pipeline".

But does that actually happen? There's a very easy test we can do by
sending queries that sleep. If my libpq program sends a "SELECT
pg_sleep(2)", then PQflush(), then sleep in the client program two more
seconds without sending the sync; and *then* send the sync, I find that
the program takes 2 seconds, not four. This shows that both client and
server slept in parallel, even though I didn't send the Sync until after
the client was done sleeping.

Thanks for looking into it. My experiments were with INSERT and I now
was able to try things with larger pipelines. I can now see the server
starts sending results after ~400 statements. So I think you are right,
the server does start executing the pipeline before receiving the sync
message, though there is still something strange going on (but probably
on the client side):

I have a pipeline of say 500 INSERTs. If I "execute" this pipeline by first
sending all the statements and then reading the results, then everything
works as expected. This is the call sequence I am talking about:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #500
PQpipelineSync()
PQflush()
PQconsumeInput()
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...
PQgetResult() # INSERT #500
PQgetResult() # NULL
PQgetResult() # PGRES_PIPELINE_SYNC

If, however, I execute it by checking for results before sending the
next INSERT, I get the following call sequence:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #~400
PQflush()
PQconsumeInput() # At this point select() indicates we can read.
PQgetResult() # NULL (???)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...

What's strange here is that the first PQgetResult() call (marked with ???)
returns NULL instead of result for INSERT #1 as in the first call sequence.
Interestingly, if I skip it, the rest seems to progress as expected.

Any idea what might be going on here? My hunch is that there is an issue
with libpq's state machine. In particular, in the second case, PQgetResult()
is called before the sync message is sent. Did you have a chance to test
such a scenario (i.e., a large pipeline where the first result is processed
before the PQpipelineSync() call)? Of course, this could very well be a bug
on my side or me misunderstanding something.

#7Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#5)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

On 2021-Jun-22, Alvaro Herrera wrote:

So I think it would be useful to clarify the server behavior and
specify it in the documentation.

I'll see about improving the docs on these points.

So I started to modify the second paragraph to indicate that the client
would send data on PQflush/buffer full/PQpipelineSync, only to realize
that the first paragraph already explains this. So I'm not sure if any
changes are needed.

Maybe your complaint is only based on disagreement about what does libpq
do regarding queueing commands; and as far as I can tell in quick
experimentation with libpq, it works as the docs state already.

I think one change that is definitely needed is to make it clear that
the PQpipelineSync() call is not optional.

I would also add a note saying that while the server starts processing
the pipeline immediately, it may buffer the results and the only way
to flush them out is to call PQpipelineSync().

#8Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#7)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-23, Boris Kolpackov wrote:

I think one change that is definitely needed is to make it clear that
the PQpipelineSync() call is not optional.

I would also add a note saying that while the server starts processing
the pipeline immediately, it may buffer the results and the only way
to flush them out is to call PQpipelineSync().

Curious -- I just noticed that the server understands a message 'H' that
requests a flush of the server buffer. However, libpq has no way to
generate that message as far as I can see. I think you could use that
to request results from the pipeline, without the sync point.

I wonder if it's worth adding an entry point to libpq to allow access to
this. PQrequestFlush() or something like that ... Prior to pipeline
mode this has no use (since everything ends with ReadyForQuery which
involves a flush) but it does seem to have use in pipeline mode.

--
�lvaro Herrera 39�49'30"S 73�17'W

#9Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#7)
1 attachment(s)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-23, Boris Kolpackov wrote:

I think one change that is definitely needed is to make it clear that
the PQpipelineSync() call is not optional.

I would also add a note saying that while the server starts processing
the pipeline immediately, it may buffer the results and the only way
to flush them out is to call PQpipelineSync().

Aren't those two things one and the same? I propose the attached.

--
�lvaro Herrera Valdivia, Chile
"El hombre nunca sabe de lo que es capaz hasta que lo intenta" (C. Dickens)

Attachments:

0001-Clarify-that-pipeline-sync-is-mandatory.patchtext/x-diff; charset=us-asciiDownload
From d9c2b07d7ef8d85ae387fdf6c9a04bf2c712b6fc Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 23 Jun 2021 12:40:15 -0400
Subject: [PATCH] Clarify that pipeline sync is mandatory

---
 doc/src/sgml/libpq.sgml | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 441cc0da3a..0217f8d8c7 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5103,10 +5103,12 @@ int PQflush(PGconn *conn);
      The server executes statements, and returns results, in the order the
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
+     Do note that results are buffered on the server side; a synchronization
+     point, establshied with <function>PQpipelineSync</function>, is necessary
+     in order for all results to be flushed to the client.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
-     until the next synchronization point established by
-     <function>PQpipelineSync</function>;
+     until the next synchronization point;
      a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
      each such command.
      (This remains true even if the commands in the pipeline would rollback
-- 
2.20.1

#10Boris Kolpackov
boris@codesynthesis.com
In reply to: Boris Kolpackov (#6)
Re: Pipeline mode and PQpipelineSync()

Boris Kolpackov <boris@codesynthesis.com> writes:

What's strange here is that the first PQgetResult() call (marked with ???)
returns NULL instead of result for INSERT #1 as in the first call sequence.

I've hit another similar case except now an unexpected NULL result is
returned in the middle of PGRES_PIPELINE_ABORTED result sequence. The
call sequence is as follows:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #251 -- insert duplicate PK
PQflush()
...
PQsendQueryPrepared() # INSERT #343
PQflush()
PQconsumeInput() # At this point select() indicates we can read.
PQgetResult() # NULL -- unexpected but skipped (see prev. email)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...
PQgetResult() # INSERT #251 error result, SQLSTATE 23505
PQgetResult() # NULL
PQgetResult() # INSERT #252 PGRES_PIPELINE_ABORTED
PQgetResult() # NULL
PQgetResult() # INSERT #253 PGRES_PIPELINE_ABORTED
PQgetResult() # NULL
...
PQgetResult() # INSERT #343 NULL (???)

Notice that result #343 corresponds to the last PQsendQueryPrepared()
call made before the socket became readable (it's not always 343 but
around there).

For completeness, the statement in question is:

INSERT INTO pgsql_bulk_object (id, idata, sdata) VALUES ($1, $2, $3)

The table:

CREATE TABLE pgsql_bulk_object (
id BIGINT NOT NULL PRIMARY KEY,
idata BIGINT NOT NULL,
sdata TEXT NOT NULL);

And the data inserted is in the form:

1, 1, "1"
2, 2, "2"
...

#11Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#8)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Curious -- I just noticed that the server understands a message 'H' that
requests a flush of the server buffer. However, libpq has no way to
generate that message as far as I can see. I think you could use that
to request results from the pipeline, without the sync point.

I wonder if it's worth adding an entry point to libpq to allow access to
this.

Yes, I think this can be useful. For example, an application may wish
to receive the result as soon as possible in case it's used as input
to some further computation.

PQrequestFlush() or something like that ...

I think I would prefer PQflushResult() or something along these
lines ("request" is easy to misinterpret as "client request").

#12Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#9)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Subject: [PATCH] Clarify that pipeline sync is mandatory

---
doc/src/sgml/libpq.sgml | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 441cc0da3a..0217f8d8c7 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5103,10 +5103,12 @@ int PQflush(PGconn *conn);
The server executes statements, and returns results, in the order the
client sends them.  The server will begin executing the commands in the
pipeline immediately, not waiting for the end of the pipeline.
+     Do note that results are buffered on the server side; a synchronization
+     point, establshied with <function>PQpipelineSync</function>, is necessary
+     in order for all results to be flushed to the client.

s/establshied/established/

Otherwise, LGTM, thanks!

#13Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#6)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-23, Boris Kolpackov wrote:

If, however, I execute it by checking for results before sending the
next INSERT, I get the following call sequence:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #~400
PQflush()
PQconsumeInput() # At this point select() indicates we can read.
PQgetResult() # NULL (???)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...

What's strange here is that the first PQgetResult() call (marked with ???)
returns NULL instead of result for INSERT #1 as in the first call sequence.
Interestingly, if I skip it, the rest seems to progress as expected.

Yeah, I agree that there's a problem in the libpq state machine. I'm
looking into it now.

--
�lvaro Herrera 39�49'30"S 73�17'W

#14Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#6)
1 attachment(s)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-23, Boris Kolpackov wrote:

If, however, I execute it by checking for results before sending the
next INSERT, I get the following call sequence:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #~400
PQflush()
PQconsumeInput() # At this point select() indicates we can read.
PQgetResult() # NULL (???)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL

IIUC the problem is that PQgetResult is indeed not prepared to deal with
a result the first time until after the queue has been "prepared", and
this happens on calling PQpipelineSync. But I think the formulation in
the attached patch works too, and the resulting code is less surprising.

I wrote a test case that works as you describe, and indeed with the
original code it gets a NULL initially; that disappears with the
attached patch. Can you give it a try?

Thanks

--
�lvaro Herrera Valdivia, Chile
"Escucha y olvidar�s; ve y recordar�s; haz y entender�s" (Confucio)

Attachments:

0001-Fix-libpq-state-machine.patchtext/x-diff; charset=us-asciiDownload
From 4073abc0ef8b4404915b949b3058ba47b7c51ba9 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 24 Jun 2021 18:24:04 -0400
Subject: [PATCH] Fix libpq state machine

---
 src/interfaces/libpq/fe-exec.c | 19 +++++--------------
 1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 7bd5b3a7b9..8403a9bbca 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1375,8 +1375,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -1513,8 +1512,7 @@ PQsendPrepare(PGconn *conn,
 
 	pqAppendCmdQueueEntry(conn, entry);
 
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 
 	/*
 	 * Give the data a push (in pipeline mode, only if we're past the size
@@ -1817,8 +1815,7 @@ PQsendQueryGuts(PGconn *conn,
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -2448,8 +2445,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -3084,12 +3080,7 @@ PQpipelineSync(PGconn *conn)
 	 */
 	if (PQflush(conn) < 0)
 		goto sendFailed;
-
-	/*
-	 * Call pqPipelineProcessQueue so the user can call start calling
-	 * PQgetResult.
-	 */
-	pqPipelineProcessQueue(conn);
+	conn->asyncStatus = PGASYNC_BUSY;
 
 	return 1;
 
-- 
2.30.2

#15Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#14)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

IIUC the problem is that PQgetResult is indeed not prepared to deal with
a result the first time until after the queue has been "prepared", and
this happens on calling PQpipelineSync. But I think the formulation in
the attached patch works too, and the resulting code is less surprising.

I wrote a test case that works as you describe, and indeed with the
original code it gets a NULL initially; that disappears with the
attached patch. Can you give it a try?

Yes, I can confirm this appears to have addressed the first issue,
thanks! The second issue [1]/messages/by-id/boris.20210624103805@codesynthesis.com, however, is still there even with
this patch.

[1]: /messages/by-id/boris.20210624103805@codesynthesis.com

#16Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#10)
4 attachment(s)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-24, Boris Kolpackov wrote:

Boris Kolpackov <boris@codesynthesis.com> writes:

What's strange here is that the first PQgetResult() call (marked with ???)
returns NULL instead of result for INSERT #1 as in the first call sequence.

I've hit another similar case except now an unexpected NULL result is
returned in the middle of PGRES_PIPELINE_ABORTED result sequence. The
call sequence is as follows:

I haven't been able to get this to break for me yet, and I probably
won't today. In the meantime, here's patches for the first one. The
test added by 0003 fails, and then 0004 fixes it.

--
�lvaro Herrera 39�49'30"S 73�17'W

Attachments:

v2-0001-Clarify-that-pipeline-sync-is-mandatory.patchtext/x-diff; charset=us-asciiDownload
From 40023992dd73edaf7947796e0e4e36065fcd908c Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 23 Jun 2021 12:40:15 -0400
Subject: [PATCH v2 1/4] Clarify that pipeline sync is mandatory

---
 doc/src/sgml/libpq.sgml | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 641970f2a6..7f8a4db089 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5102,10 +5102,12 @@ int PQflush(PGconn *conn);
      The server executes statements, and returns results, in the order the
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
+     Do note that results are buffered on the server side; a synchronization
+     point, established with <function>PQpipelineSync</function>, is necessary
+     in order for all results to be flushed to the client.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
-     until the next synchronization point established by
-     <function>PQpipelineSync</function>;
+     until the next synchronization point;
      a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
      each such command.
      (This remains true even if the commands in the pipeline would rollback
-- 
2.30.2

v2-0002-Add-PQrequestFlush.patchtext/x-diff; charset=us-asciiDownload
From 071757645ee0f9f15f57e43447d7c234deb062c0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 25 Jun 2021 16:02:00 -0400
Subject: [PATCH v2 2/4] Add PQrequestFlush()

---
 doc/src/sgml/libpq.sgml          | 17 +++++++++++++++
 src/interfaces/libpq/exports.txt |  1 +
 src/interfaces/libpq/fe-exec.c   | 37 ++++++++++++++++++++++++++++++++
 src/interfaces/libpq/libpq-fe.h  |  1 +
 4 files changed, 56 insertions(+)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 7f8a4db089..4d203f51e1 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5401,6 +5401,23 @@ int PQpipelineSync(PGconn *conn);
       </para>
      </listitem>
     </varlistentry>
+
+    <varlistentry id="libpq-PQrequestFlush">
+     <term><function>PQrequestFlush</function><indexterm><primary>PQrequestFlush</primary></indexterm></term>
+
+      <listitem>
+       <para>
+        Requests the server to flush its output buffer.  The server does
+        that automatically upon <function>PQpipelineSync</function> or
+        any request when not in pipeline mode; this function is useful
+        if results are expected without establishing a synchronization
+        point.  Returns 1 for success and 0 on failure.
+<synopsis>
+int PQrequestFlush(PGconn *conn);
+</synopsis>
+       </para>
+      </listitem>
+     </varlistentry>
    </variablelist>
   </sect2>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 824a03ffbd..c616059f58 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -185,3 +185,4 @@ PQpipelineSync            182
 PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded              185
+PQrequestFlush            186
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 7bd5b3a7b9..00d744eaa7 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -3696,6 +3696,43 @@ PQflush(PGconn *conn)
 	return pqFlush(conn);
 }
 
+/*
+ * Send request for server to flush its buffer
+ */
+int
+PQrequestFlush(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	/* Don't try to send if we know there's no live connection. */
+	if (conn->status != CONNECTION_OK)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("no connection to the server\n"));
+		return 0;
+	}
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE &&
+		conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("another command is already in progress\n"));
+		return false;
+	}
+
+	if (pqPutMsgStart('H', conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+	{
+		return 0;
+	}
+	/* XXX useless without a flush ...? */
+	pqFlush(conn);
+
+	return 1;
+}
+
 /*
  * pqPipelineFlush
  *
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index cc6032b15b..cf7cbe942e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -496,6 +496,7 @@ extern PGPing PQpingParams(const char *const *keywords,
 
 /* Force the write buffer to be written (or at least try) */
 extern int	PQflush(PGconn *conn);
+extern int	PQrequestFlush(PGconn *conn);
 
 /*
  * "Fast path" interface --- not really recommended for application
-- 
2.30.2

v2-0003-test-nosync.patchtext/x-diff; charset=us-asciiDownload
From 035781482d139face53c81f70ca4591b5292c4b4 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 25 Jun 2021 13:26:43 -0400
Subject: [PATCH v2 3/4] test nosync

---
 .../modules/libpq_pipeline/libpq_pipeline.c   | 92 +++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71eedb6dbb..00d6b9f401 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -230,6 +230,95 @@ test_multi_pipelines(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+	int		numqueries = 0;
+	int		results = 0;
+	int		sock = PQsocket(conn);
+
+	fprintf(stderr, "nosync... ");
+
+	if (sock < 0)
+		pg_fatal("invalid socket");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("could not enter pipeline mode");
+	for (;;)
+	{
+		fd_set		input_mask;
+		struct timeval	tv;
+
+		if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+							  0, NULL, NULL, NULL, NULL, 0) != 1)
+			pg_fatal("error sending select: %s", PQerrorMessage(conn));
+		PQflush(conn);
+
+		numqueries++;
+
+		/*
+		 * If the server has written anything to us, read (some of) it now.
+		 */
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		tv.tv_sec = 0;
+		tv.tv_usec = 0;
+		if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+		if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+			pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+
+		if (numqueries >= 500)
+			break;
+	}
+
+	PQrequestFlush(conn);
+
+	/* Now read all results */
+	for (;;)
+	{
+		PGresult *res;
+
+		res = PQgetResult(conn);
+
+		/* NULL results are only expected after a TUPLES_OK */
+		if (res == NULL)
+			pg_fatal("got unexpected NULL result after %d results", results);
+
+		/* We expect exacly one TUPLES_OK result for each query we sent */
+		if (PQresultStatus(res) == PGRES_TUPLES_OK)
+		{
+			PGresult *res2;
+
+			/* and one NULL result should follow each */
+			res2 = PQgetResult(conn);
+			if (res2 != NULL)
+				pg_fatal("expected NULL, got %s",
+						 PQresStatus(PQresultStatus(res2)));
+			PQclear(res);
+			results++;
+
+			/* if we're done, we're done */
+			if (results == numqueries)
+				break;
+
+			continue;
+		}
+
+		/* anything else is unexpected */
+		pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+	}
+
+	fprintf(stderr, "ok\n");
+}
+
 /*
  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
  * still have to get results for each pipeline item, but the item will just be
@@ -1237,6 +1326,7 @@ print_test_list(void)
 {
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
+	printf("nosync\n");
 	printf("pipeline_abort\n");
 	printf("pipelined_insert\n");
 	printf("prepared\n");
@@ -1334,6 +1424,8 @@ main(int argc, char **argv)
 		test_disallowed_in_pipeline(conn);
 	else if (strcmp(testname, "multi_pipelines") == 0)
 		test_multi_pipelines(conn);
+	else if (strcmp(testname, "nosync") == 0)
+		test_nosync(conn);
 	else if (strcmp(testname, "pipeline_abort") == 0)
 		test_pipeline_abort(conn);
 	else if (strcmp(testname, "pipelined_insert") == 0)
-- 
2.30.2

v2-0004-Fix-libpq-state-machine.patchtext/x-diff; charset=us-asciiDownload
From 4e26c887512609935aa3251cc6b31c234507c855 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 24 Jun 2021 18:24:04 -0400
Subject: [PATCH v2 4/4] Fix libpq state machine

---
 src/interfaces/libpq/fe-exec.c | 19 +++++--------------
 1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 00d744eaa7..cb5734c3cd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1375,8 +1375,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -1513,8 +1512,7 @@ PQsendPrepare(PGconn *conn,
 
 	pqAppendCmdQueueEntry(conn, entry);
 
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 
 	/*
 	 * Give the data a push (in pipeline mode, only if we're past the size
@@ -1817,8 +1815,7 @@ PQsendQueryGuts(PGconn *conn,
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -2448,8 +2445,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -3084,12 +3080,7 @@ PQpipelineSync(PGconn *conn)
 	 */
 	if (PQflush(conn) < 0)
 		goto sendFailed;
-
-	/*
-	 * Call pqPipelineProcessQueue so the user can call start calling
-	 * PQgetResult.
-	 */
-	pqPipelineProcessQueue(conn);
+	conn->asyncStatus = PGASYNC_BUSY;
 
 	return 1;
 
-- 
2.30.2

#17Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Alvaro Herrera (#16)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-25, Alvaro Herrera wrote:

From 071757645ee0f9f15f57e43447d7c234deb062c0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 25 Jun 2021 16:02:00 -0400
Subject: [PATCH v2 2/4] Add PQrequestFlush()

I forgot to mention:

+/*
+ * Send request for server to flush its buffer
+ */
+int
+PQrequestFlush(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	/* Don't try to send if we know there's no live connection. */
+	if (conn->status != CONNECTION_OK)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("no connection to the server\n"));
+		return 0;
+	}
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE &&
+		conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("another command is already in progress\n"));
+		return false;
+	}
+
+	if (pqPutMsgStart('H', conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+	{
+		return 0;
+	}
+	/* XXX useless without a flush ...? */
+	pqFlush(conn);
+
+	return 1;
+}

I'm not sure if it's a good idea for PQrequestFlush to itself flush
libpq's buffer. We can just document that PQflush is required ...
opinions?

(I didn't try PQrequestFlush in any scenarios other than the test case I
added.)

--
�lvaro Herrera Valdivia, Chile
Voy a acabar con todos los humanos / con los humanos yo acabar�
voy a acabar con todos (bis) / con todos los humanos acabar� �acabar�! (Bender)

#18Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Alvaro Herrera (#16)
Re: Pipeline mode and PQpipelineSync()

It hadn't occurred to me that I should ask the release management team
about adding a new function to libpq this late in the cycle.

Please do note that the message type used in the new routine is currenly
unused and uncovered -- see line 4660 here:

https://coverage.postgresql.org/src/backend/tcop/postgres.c.gcov.html

--
�lvaro Herrera Valdivia, Chile
"I'm impressed how quickly you are fixing this obscure issue. I came from
MS SQL and it would be hard for me to put into words how much of a better job
you all are doing on [PostgreSQL]."
Steve Midgley, http://archives.postgresql.org/pgsql-sql/2008-08/msg00000.php

#19Michael Paquier
michael@paquier.xyz
In reply to: Alvaro Herrera (#18)
Re: Pipeline mode and PQpipelineSync()

On Sat, Jun 26, 2021 at 05:40:15PM -0400, Alvaro Herrera wrote:

It hadn't occurred to me that I should ask the release management team
about adding a new function to libpq this late in the cycle.

I have not followed the thread in details, but if you think that this
improves the feature in the long term even for 14, I have no
personally no objections to the addition of a new function, or even a
change of behavior in one of the existing functions. The beta cycle
is here for such adjustments.
--
Michael

#20Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#17)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

I forgot to mention:

+	/* XXX useless without a flush ...? */
+	pqFlush(conn);
+
+	return 1;
+}

I'm not sure if it's a good idea for PQrequestFlush to itself flush
libpq's buffer. We can just document that PQflush is required ...
opinions?

Yes, I think not calling PQflush() gives more flexibility. For example,
an application may "insert" them periodically after a certain number of
queries but call PQflush() at different intervals.

#21Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#10)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-24, Boris Kolpackov wrote:

I've hit another similar case except now an unexpected NULL result is
returned in the middle of PGRES_PIPELINE_ABORTED result sequence. The
call sequence is as follows:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #251 -- insert duplicate PK
PQflush()
...
PQsendQueryPrepared() # INSERT #343
PQflush()
PQconsumeInput() # At this point select() indicates we can read.
PQgetResult() # NULL -- unexpected but skipped (see prev. email)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...
PQgetResult() # INSERT #251 error result, SQLSTATE 23505
PQgetResult() # NULL
PQgetResult() # INSERT #252 PGRES_PIPELINE_ABORTED
PQgetResult() # NULL
PQgetResult() # INSERT #253 PGRES_PIPELINE_ABORTED
PQgetResult() # NULL
...
PQgetResult() # INSERT #343 NULL (???)

Notice that result #343 corresponds to the last PQsendQueryPrepared()
call made before the socket became readable (it's not always 343 but
around there).

No luck reproducing any problems with this sequence as yet.

--
�lvaro Herrera 39�49'30"S 73�17'W

#22Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#21)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

No luck reproducing any problems with this sequence as yet.

Can you try to recreate the call flow as implemented here (it's
pretty much plain old C if you ignore error handling):

https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n789

Except replacing `continue` on line 966 with `break` (that will
make the code read-biased which I find triggers the error more
readily, though I was able to trigger it both ways).

Then in an explicit transaction send 500 prepared insert statements
(see previous email for details) with 250'th having a duplicate
primary key.

#23Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#22)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-29, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

No luck reproducing any problems with this sequence as yet.

Can you try to recreate the call flow as implemented here (it's
pretty much plain old C if you ignore error handling):

https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n789

Hmm, I can't see what's different there than what I get on my test
program. Can you please do PQtrace() on the connection and send the
resulting trace file? Maybe I can compare the traffic to understand
what's the difference.

(I do see that you're doing PQisBusy that I'm not. Going to try adding
it next.)

Thanks

--
�lvaro Herrera 39�49'30"S 73�17'W

#24Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Alvaro Herrera (#23)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-29, Alvaro Herrera wrote:

(I do see that you're doing PQisBusy that I'm not. Going to try adding
it next.)

Ah, yes it does. I can reproduce this now. I thought PQconsumeInput
was sufficient, but it's not: you have to have the PQgetResult in there
too. Looking ...

--
�lvaro Herrera 39�49'30"S 73�17'W

#25Ranier Vilela
ranier.vf@gmail.com
In reply to: Alvaro Herrera (#24)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-29, Alvaro Herrera wrote:

Ah, yes it does. I can reproduce this now. I thought PQconsumeInput
was sufficient, but it's not: you have to have the PQgetResult in there
too. Looking ...

I think that has an oversight with a719232

return false shouldn't be return 0?

regards,

Ranier Vilela

#26Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Ranier Vilela (#25)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jun-29, Ranier Vilela wrote:

On 2021-Jun-29, Alvaro Herrera wrote:

Ah, yes it does. I can reproduce this now. I thought PQconsumeInput
was sufficient, but it's not: you have to have the PQgetResult in there
too. Looking ...

I think that has an oversight with a719232

return false shouldn't be return 0?

Hah, yeah, it should. Will fix

--
�lvaro Herrera Valdivia, Chile

#27Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#24)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Ah, yes it does. I can reproduce this now. I thought PQconsumeInput
was sufficient, but it's not: you have to have the PQgetResult in there
too. Looking ...

Any progress on fixing this?

#28Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#27)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-06, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Ah, yes it does. I can reproduce this now. I thought PQconsumeInput
was sufficient, but it's not: you have to have the PQgetResult in there
too. Looking ...

Any progress on fixing this?

Yeah ... the problem as I understand it is that the state transition in
libpq when the connection is in pipeline aborted state is bogus. I'll
post a patch in a bit.

--
Álvaro Herrera Valdivia, Chile — https://www.EnterpriseDB.com/
#error "Operator lives in the wrong universe"
("Use of cookies in real-time system development", M. Gleixner, M. Mc Guire)

#29Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#27)
1 attachment(s)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-06, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Ah, yes it does. I can reproduce this now. I thought PQconsumeInput
was sufficient, but it's not: you have to have the PQgetResult in there
too. Looking ...

Any progress on fixing this?

Can you please try with this patch?

Thanks

--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/

Attachments:

0001-fix-libpq-state-machine.patchtext/x-diff; charset=utf-8Download
From c84b66821249631ba1654b22866ca54c49f238c4 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 6 Jul 2021 12:46:42 -0400
Subject: [PATCH] fix libpq state machine

---
 src/interfaces/libpq/fe-exec.c | 46 ++++++++++++++++++++++++++--------
 1 file changed, 35 insertions(+), 11 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b13ddab393..1295a417c1 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
 
 /*
  * pqAppendCmdQueueEntry
- *		Append a caller-allocated command queue entry to the queue.
+ *		Append a caller-allocated command queue entry to the queue, and update
+ *		conn->asyncStatus as needed to account for it.
  *
  * The query itself must already have been put in the output buffer by the
  * caller.
@@ -1239,6 +1240,33 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 		conn->cmd_queue_tail->next = entry;
 
 	conn->cmd_queue_tail = entry;
+
+	switch (conn->pipelineStatus)
+	{
+		case PQ_PIPELINE_OFF:
+		case PQ_PIPELINE_ON:
+			/*
+			 * If there's a result ready to be consumed, let it be so (that is,
+			 * don't change away from READY or READY_MORE); otherwise we wait
+			 * for some result to arrive from the server.
+			 */
+			if (conn->asyncStatus == PGASYNC_IDLE)
+				conn->asyncStatus = PGASYNC_BUSY;
+			break;
+
+			/*
+			 * If in aborted pipeline state, we don't expect anything from the
+			 * server, so we have to let PQgetResult consume from the aborted
+			 * queue right away.
+			 */
+		case PQ_PIPELINE_ABORTED:
+			if (conn->asyncStatus == PGASYNC_IDLE)
+			{
+				resetPQExpBuffer(&conn->errorMessage);
+				pqPipelineProcessQueue(conn);
+			}
+			break;
+	}
 }
 
 /*
@@ -1375,7 +1403,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -1510,10 +1537,6 @@ PQsendPrepare(PGconn *conn,
 	/* if insufficient memory, query just winds up NULL */
 	entry->query = strdup(query);
 
-	pqAppendCmdQueueEntry(conn, entry);
-
-	conn->asyncStatus = PGASYNC_BUSY;
-
 	/*
 	 * Give the data a push (in pipeline mode, only if we're past the size
 	 * threshold).  In nonblock mode, don't complain if we're unable to send
@@ -1522,6 +1545,8 @@ PQsendPrepare(PGconn *conn,
 	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
+	pqAppendCmdQueueEntry(conn, entry);
+
 	return 1;
 
 sendFailed:
@@ -1815,7 +1840,7 @@ PQsendQueryGuts(PGconn *conn,
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -2445,7 +2470,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -3072,15 +3097,14 @@ PQpipelineSync(PGconn *conn)
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	pqAppendCmdQueueEntry(conn, entry);
-
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
 	if (PQflush(conn) < 0)
 		goto sendFailed;
-	conn->asyncStatus = PGASYNC_BUSY;
+
+	pqAppendCmdQueueEntry(conn, entry);
 
 	return 1;
 
-- 
2.20.1

#30Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#29)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Can you please try with this patch?

I don't get any difference in behavior with this patch. That is, I
still get the unexpected NULL result. Does it make a difference for
your reproducer?

#31Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#30)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-07, Boris Kolpackov wrote:

I don't get any difference in behavior with this patch. That is, I
still get the unexpected NULL result. Does it make a difference for
your reproducer?

Yes, the behavior changes for my repro. Is it possible for you to
share a full program I can compile and run, plesse? Thanks

--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
"Most hackers will be perfectly comfortable conceptualizing users as entropy
sources, so let's move on." (Nathaniel Smith)

#32Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#31)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

On 2021-Jul-07, Boris Kolpackov wrote:

I don't get any difference in behavior with this patch. That is, I
still get the unexpected NULL result. Does it make a difference for
your reproducer?

Yes, the behavior changes for my repro. Is it possible for you to
share a full program I can compile and run, plesse?

Here is the test sans the connection setup:

-----------------------------------------------------------------------

#include <libpq-fe.h>

#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stddef.h>
#include <assert.h>
#include <sys/select.h>

// Note: hack.
//
#include <arpa/inet.h>
#define htonll(x) ((((long long)htonl(x)) << 32) + htonl((x) >> 32))

static const size_t columns = 3;

struct data
{
long long id;
long long idata;
const char* sdata;
};

static char* values[columns];
static int lengths[columns];
static int formats[columns] = {1, 1, 1};

static const unsigned int types[columns] = {
20, // int8
20, // int8
25 // text
};

static void
init (const struct data* d)
{
values[0] = (char*)&d->id;
lengths[0] = sizeof (d->id);

values[1] = (char*)&d->idata;
lengths[1] = sizeof (d->idata);

values[2] = (char*)d->sdata;
lengths[2] = strlen (d->sdata);
}

static void
execute (PGconn* conn, const struct data* ds, size_t n)
{
int sock = PQsocket (conn);
assert (sock != -1);

if (PQsetnonblocking (conn, 1) == -1 ||
PQenterPipelineMode (conn) == 0)
assert (false);

// True if we've written and read everything, respectively.
//
bool wdone = false;
bool rdone = false;

size_t wn = 0;
size_t rn = 0;

while (!rdone)
{
fd_set wds;
if (!wdone)
{
FD_ZERO (&wds);
FD_SET (sock, &wds);
}

fd_set rds;
FD_ZERO (&rds);
FD_SET (sock, &rds);

if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1)
{
if (errno == EINTR)
continue;

assert (false);
}

// Try to minimize the chance of blocking the server by first processing
// the result and then sending more queries.
//
if (FD_ISSET (sock, &rds))
{
if (PQconsumeInput (conn) == 0)
assert (false);

while (PQisBusy (conn) == 0)
{
//fprintf (stderr, "PQgetResult %zu\n", rn);

PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);

if (stat == PGRES_PIPELINE_SYNC)
{
assert (wdone && rn == n);
PQclear (res);
rdone = true;
break;
}

if (stat == PGRES_FATAL_ERROR)
{
const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);

if (strcmp (s, "23505") == 0)
fprintf (stderr, "duplicate id at %zu\n", rn);
}

PQclear (res);
assert (rn != n);
++rn;

// We get a NULL result after each query result.
//
{
PGresult* end = PQgetResult (conn);
assert (end == NULL);
}
}
}

if (!wdone && FD_ISSET (sock, &wds))
{
// Send queries until we get blocked (write-biased). This feels like
// a better overall strategy to keep the server busy compared to
// sending one query at a time and then re-checking if there is
// anything to read because the results of INSERT/UPDATE/DELETE are
// presumably small and quite a few of them can get buffered before
// the server gets blocked.
//
for (;;)
{
if (wn != n)
{
//fprintf (stderr, "PQsendQueryPrepared %zu\n", wn);

init (ds + wn);

if (PQsendQueryPrepared (conn,
"persist_object",
(int)(columns),
values,
lengths,
formats,
1) == 0)
assert (false);

if (++wn == n)
{
if (PQpipelineSync (conn) == 0)
assert (false);
}
}

// PQflush() result:
//
// 0 -- success (queue is now empty)
// 1 -- blocked
// -1 -- error
//
int r = PQflush (conn);
assert (r != -1);

if (r == 0)
{
if (wn != n)
{
// If we continue here, then we are write-biased. And if we
// break, then we are read-biased.
//
#if 1
break;
#else
continue;
#endif
}

wdone = true;
}

break; // Blocked or done.
}
}
}

if (PQexitPipelineMode (conn) == 0 ||
PQsetnonblocking (conn, 0) == -1)
assert (false);
}

static void
test (PGconn* conn)
{
const size_t batch = 500;
struct data ds[batch];

for (size_t i = 0; i != batch; ++i)
{
ds[i].id = htonll (i == batch / 2 ? i - 1 : i); // Cause duplicate PK.
ds[i].idata = htonll (i);
ds[i].sdata = "abc";
}

// Prepare the statement.
//
{
PGresult* res = PQprepare (
conn,
"persist_object",
"INSERT INTO \"pgsql_bulk_object\" "
"(\"id\", "
"\"idata\", "
"\"sdata\") "
"VALUES "
"($1, $2, $3)",
(int)(columns),
types);
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}

// Begin transaction.
//
{
PGresult* res = PQexec (conn, "begin");
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}

execute (conn, ds, batch);

// Commit transaction.
//
{
PGresult* res = PQexec (conn, "commit");
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}
}

-----------------------------------------------------------------------

Use the following statements to (re)create the table:

DROP TABLE IF EXISTS "pgsql_bulk_object" CASCADE;

CREATE TABLE "pgsql_bulk_object" (
"id" BIGINT NOT NULL PRIMARY KEY,
"idata" BIGINT NOT NULL,
"sdata" TEXT NOT NULL);

It fails consistently for me when running against the local PostgreSQL
9.5 server (connecting via the UNIX socket):

duplicate id at 250
driver: driver.cxx:105: void execute(PGconn*, const data*, size_t): Assertion `res != NULL' failed.

#33Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#32)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-07, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

On 2021-Jul-07, Boris Kolpackov wrote:

I don't get any difference in behavior with this patch. That is, I
still get the unexpected NULL result. Does it make a difference for
your reproducer?

Yes, the behavior changes for my repro. Is it possible for you to
share a full program I can compile and run, plesse?

Here is the test sans the connection setup:

Thanks, looking now. (I was trying to compile libodb and everything, and
I went a down rabbit hole of configure failing with mysterious m4 errors ...)

--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/

#34Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#32)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-07, Boris Kolpackov wrote:

// Try to minimize the chance of blocking the server by first processing
// the result and then sending more queries.
//
if (FD_ISSET (sock, &rds))
{
if (PQconsumeInput (conn) == 0)
assert (false);

while (PQisBusy (conn) == 0)
{
//fprintf (stderr, "PQgetResult %zu\n", rn);

PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);

Hmm ... aren't you trying to read more results than you sent queries? I
think there should be a break out of that block when that happens (which
means the read of the PGRES_PIPELINE_SYNC needs to be out of there too).
With this patch, the program seems to work well for me.

***************
*** 94,112 ****
while (PQisBusy (conn) == 0)
{
//fprintf (stderr, "PQgetResult %zu\n", rn);

PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);

-         if (stat == PGRES_PIPELINE_SYNC)
-         {
-           assert (wdone && rn == n);
-           PQclear (res);
-           rdone = true;
-           break;
-         }
- 
          if (stat == PGRES_FATAL_ERROR)
          {
            const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);
--- 94,110 ----
        while (PQisBusy (conn) == 0)
        {
          //fprintf (stderr, "PQgetResult %zu\n", rn);
+         if (rn >= wn)
+         {
+           if (wdone)
+             rdone = true;
+           break;
+         }

PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);

          if (stat == PGRES_FATAL_ERROR)
          {
            const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);
***************
*** 190,195 ****
--- 188,201 ----
          break; // Blocked or done.
        }
      }
+ 
+     if (rdone)
+     {
+       PGresult *res = PQgetResult(conn);
+       assert(PQresultStatus(res) == PGRES_PIPELINE_SYNC);
+       PQclear(res);
+       break;
+     }
    }
    if (PQexitPipelineMode (conn) == 0 ||
***************
*** 246,248 ****
--- 252,269 ----
      PQclear (res);
    }
  }
+ 
+ int main(int argc, char **argv)
+ {
+   PGconn *conn = PQconnectdb("");
+   if (PQstatus(conn) != CONNECTION_OK)
+   {
+     fprintf(stderr, "connection failed: %s\n",
+             PQerrorMessage(conn));
+     return 1;
+   }
+ 
+   test(conn);
+ }
+ 
+ 

--
Álvaro Herrera Valdivia, Chile — https://www.EnterpriseDB.com/

#35Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#34)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Hmm ... aren't you trying to read more results than you sent queries?

Hm, but should I be able to? Or, to put another way, should PQisBusy()
indicate there is a result available without me sending a query for it?
That sounds very counter-intuitive to me.

#36Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#35)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-08, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Hmm ... aren't you trying to read more results than you sent queries?

Hm, but should I be able to? Or, to put another way, should PQisBusy()
indicate there is a result available without me sending a query for it?
That sounds very counter-intuitive to me.

That seems a fair complaint, but I think PQisBusy is doing the right
thing per its charter. It is documented as "would PQgetResult block?"
and it is returning correctly that PQgetResult would not block in that
situation, because no queries are pending. I think we would regret
changing PQisBusy in the way you suggest.

I think your expectation is that we would have an entry point for easy
iteration; a way to say "if there's a result set to be had, can I have
it please, otherwise I'm done iterating". That seems a reasonable ask,
but PQisBusy is not that. Maybe it would be PQisResultPending() or
something like that. I again have to ask the RMT what do they think of
adding such a thing to libpq this late in the cycle.

--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/

#37Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#36)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

On 2021-Jul-08, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

Hmm ... aren't you trying to read more results than you sent queries?

Hm, but should I be able to? Or, to put another way, should PQisBusy()
indicate there is a result available without me sending a query for it?
That sounds very counter-intuitive to me.

That seems a fair complaint, but I think PQisBusy is doing the right
thing per its charter. It is documented as "would PQgetResult block?"
and it is returning correctly that PQgetResult would not block in that
situation, because no queries are pending.

Well, that's one way to view it. But in this case one can say that
the entire pipeline is still "busy" since we haven't seen the
PQpipelineSync() call. So maybe we could change the charter only
for this special situation (that is, inside the pipeline)?

But I agree, it may not be worth the trouble and a note in the
documentation may be an acceptable "solution".

I am happy to go either way, just let me know what it will be. And
also if the latest patch to libpq that you have shared[1]/messages/by-id/202107061747.tlss7f2somqf@alvherre.pgsql is still
necessary.

[1]: /messages/by-id/202107061747.tlss7f2somqf@alvherre.pgsql

#38Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Boris Kolpackov (#37)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-08, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

That seems a fair complaint, but I think PQisBusy is doing the right
thing per its charter. It is documented as "would PQgetResult block?"
and it is returning correctly that PQgetResult would not block in that
situation, because no queries are pending.

Well, that's one way to view it. But in this case one can say that
the entire pipeline is still "busy" since we haven't seen the
PQpipelineSync() call. So maybe we could change the charter only
for this special situation (that is, inside the pipeline)?

To be honest, I am hesitant to changing the charter in that way; I fear
it may have consequences I don't foresee. I think the workaround is not
*that* bad. On the other hand, since we explicitly made PQpipelineSync
not mandatory, it would be confusing to say that PQisBusy requires
PQpipelineSync to work properly.

But I agree, it may not be worth the trouble and a note in the
documentation may be an acceptable "solution".

I'm having a bit of trouble documenting this. I modified the paragraph in the
pipeline mode docs to read:

<para>
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
operate as normal when processing pipeline results. Note that if no
queries are pending receipt of the corresponding results,
<function>PQisBusy</function> returns 0.
</para>

This seems a bit silly/obvious to me, but it may be enlightening to
people writing apps to use pipeline mode. Do you find this sufficient?
(I tried to add something to the PQisBusy description, but it sounded
sillier.)

I am happy to go either way, just let me know what it will be. And
also if the latest patch to libpq that you have shared[1] is still
necessary.

[1] /messages/by-id/202107061747.tlss7f2somqf@alvherre.pgsql

Yes, this patch (or some version thereof) is still needed. I didn't
test the modified version of your program without it, but my repro
definitely misbehaved without it.

--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/

#39Alvaro Herrera
alvaro.herrera@2ndquadrant.com
In reply to: Alvaro Herrera (#38)
Re: Pipeline mode and PQpipelineSync()

Looking at this again, I noticed that I could probably do away with the
switch on pipelineStatus, and just call pqPipelineProcessQueue in all
cases when appending commands to the queue; I *think* that will do the
right thing in all cases. *Except* that I don't know what will happen
if the program is in the middle of processing a result in single-row
mode, and then sends another query: that would wipe out the pending
results of the query being processed ... but maybe that problem can
already occur in some other way.

I'll have to write some more tests in libpq_pipeline to verify this.

--
Álvaro Herrera Valdivia, Chile — https://www.EnterpriseDB.com/
"Java is clearly an example of money oriented programming" (A. Stepanov)

#40Boris Kolpackov
boris@codesynthesis.com
In reply to: Alvaro Herrera (#38)
Re: Pipeline mode and PQpipelineSync()

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

To be honest, I am hesitant to changing the charter in that way; I fear
it may have consequences I don't foresee. I think the workaround is not
*that* bad.

Ok, fair enough. I've updated my code to account for this and it seems
to be working fine now.

I'm having a bit of trouble documenting this. I modified the paragraph in the
pipeline mode docs to read:

<para>
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
operate as normal when processing pipeline results. Note that if no
queries are pending receipt of the corresponding results,
<function>PQisBusy</function> returns 0.
</para>

How about the following for the second sentence:

"In particular, a call to <function>PQisBusy</function> in the middle
of a pipeline returns 0 if all the results for queries issued so far
have been consumed."

#41Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Boris Kolpackov (#40)
Re: Pipeline mode and PQpipelineSync()

On 2021-Jul-08, Boris Kolpackov wrote:

Alvaro Herrera <alvaro.herrera@2ndquadrant.com> writes:

To be honest, I am hesitant to changing the charter in that way; I fear
it may have consequences I don't foresee. I think the workaround is not
*that* bad.

Ok, fair enough. I've updated my code to account for this and it seems
to be working fine now.

Great, thanks. I have pushed the fix, so beta3 (when it is released)
should work well for you.
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=ab09679429009bfed4bd894a6187afde0b7bdfcd

How about the following for the second sentence:

"In particular, a call to <function>PQisBusy</function> in the middle
of a pipeline returns 0 if all the results for queries issued so far
have been consumed."

I used this wording, thanks.

On 2021-Jul-08, Alvaro Herrera wrote:

Looking at this again, I noticed that I could probably do away with the
switch on pipelineStatus, and just call pqPipelineProcessQueue in all
cases when appending commands to the queue; I *think* that will do the
right thing in all cases. *Except* that I don't know what will happen
if the program is in the middle of processing a result in single-row
mode, and then sends another query: that would wipe out the pending
results of the query being processed ... but maybe that problem can
already occur in some other way.

I tried this and it doesn't work. It doesn't seem interesting to
pursue anyway, so I'll just drop the idea. (I did notice that the
comment on single-row mode was wrong, though, since
pqPipelineProcessQueue does nothing in READY_MORE state, which is what
it is in the middle of processing a result.)

Thanks for all the help in testing and reviewing,

--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/
"El hombre nunca sabe de lo que es capaz hasta que lo intenta" (C. Dickens)