Using PQexecQuery in pipeline mode produces unexpected Close messages

Started by Daniele Varrazzoover 3 years ago23 messages
#1Daniele Varrazzo
daniele.varrazzo@gmail.com

Hello,

Experimenting with pipeline mode, with libpq 14.2, sometimes we
receive the notice "message type 0x33 arrived from server while idle".
Tested with Postgres server 12 and 14.

This notice is generated by libpq upon receiving messages after using
PQsendQuery(). The libpq trace shows:

F 101 Parse "" "INSERT INTO pq_pipeline_demo(itemno,
int8filler) VALUES (1, 4611686018427387904) RETURNING id" 0
F 12 Bind "" "" 0 0 0
F 6 Describe P ""
F 9 Execute "" 0
F 6 Close P ""
F 4 Flush
B 4 ParseComplete
B 4 BindComplete
B 27 RowDescription 1 "id" 561056 1 23 4 -1 0
B 11 DataRow 1 1 '3'
B 15 CommandComplete "INSERT 0 1"
B 4 CloseComplete
F 4 Sync
B 5 ReadyForQuery I

in the state the server messages are received, CloseComplete is unexpected.

For comparison, PQsendQueryParams() produces the trace:

F 93 Parse "" "INSERT INTO pq_pipeline_demo(itemno,
int8filler) VALUES ($1, $2) RETURNING id" 2 21 20
F 36 Bind "" "" 2 1 1 2 2 '\x00\x01' 8
'@\x00\x00\x00\x00\x00\x00\x00' 1 0
F 6 Describe P ""
F 9 Execute "" 0
F 4 Flush
B 4 ParseComplete
B 4 BindComplete
B 27 RowDescription 1 "id" 561056 1 23 4 -1 0
B 11 DataRow 1 1 '4'
B 15 CommandComplete "INSERT 0 1"
F 4 Sync
B 5 ReadyForQuery I

where no Close is sent.

Is this a problem with PQexecQuery which should not send the Close, or
with receiving in IDLE mode which should expect a CloseComplete?

Should we avoid using PQexecQuery in pipeline mode altogether?

A playground to reproduce the issue is available at
https://github.com/psycopg/psycopg/issues/314

Cheers

-- Daniele

#2Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Daniele Varrazzo (#1)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

On 2022-Jun-08, Daniele Varrazzo wrote:

Is this a problem with PQexecQuery which should not send the Close, or
with receiving in IDLE mode which should expect a CloseComplete?

Interesting.

What that Close message is doing is closing the unnamed portal, which
is otherwise closed implicitly when the next one is opened. That's how
single-query mode works: if you run a single portal, it'll be kept open.

I believe that the right fix is to not send that Close message in
PQsendQuery.

--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
"Para tener más hay que desear menos"

#3Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#2)
1 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

(Moved to -hackers)

At Wed, 8 Jun 2022 17:08:47 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

What that Close message is doing is closing the unnamed portal, which
is otherwise closed implicitly when the next one is opened. That's how
single-query mode works: if you run a single portal, it'll be kept open.

I believe that the right fix is to not send that Close message in
PQsendQuery.

Agreed. At least Close message in that context is useless and
PQsendQueryGuts doesn't send it. And removes the Close message surely
fixes the issue.

The doc [1]https://www.postgresql.org/docs/14/protocol-flow.html says:

[1]: https://www.postgresql.org/docs/14/protocol-flow.html

The simple Query message is approximately equivalent to the series
Parse, Bind, portal Describe, Execute, Close, Sync, using the
unnamed prepared statement and portal objects and no parameters. One

The current implement of PQsendQueryInternal looks like the result of
a misunderstanding of the doc. In the regression tests, that path is
excercised only for an error case, where no CloseComplete comes.

The attached adds a test for the normal-path of pipelined
PQsendQuery() to simple_pipeline test then modifies that function not
to send Close message. Without the fix, the test fails by "unexpected
notice" even if the trace matches the "expected" content.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

0001-Remove-useless-Close-message-from-PQsendQuery-in-pip.patchtext/x-patch; charset=us-asciiDownload
From bbc556548cefe1b7a2fa63a33362829a137d91e7 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Fri, 10 Jun 2022 15:03:52 +0900
Subject: [PATCH] Remove useless Close message from PQsendQuery() in pipleline
 mode

PQsendQuery() switch to the extended query protocol while in pipeline
mode. The message sequence mistakenly contained a Close message for
unnamed portal, which may lead to an out-of-sync CloseComplete
message.  That Close message is useless and removing it resolves the
issue. A test for this case is added.

Still PQsendQuery() in pipeline-mode prompts ParseComplete and
BindComplete messages that are not seen while non-pipeline mode but
they are in-sync and correctly ignored.
---
 src/interfaces/libpq/fe-exec.c                |  5 --
 .../modules/libpq_pipeline/libpq_pipeline.c   | 56 +++++++++++++++++++
 .../traces/pipeline_abort.trace               |  2 -
 .../traces/simple_pipeline.trace              | 12 ++++
 4 files changed, 68 insertions(+), 7 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 919cf5741d..321579319e 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1501,11 +1501,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 			pqPutInt(0, 4, conn) < 0 ||
 			pqPutMsgEnd(conn) < 0)
 			goto sendFailed;
-		if (pqPutMsgStart('C', conn) < 0 ||
-			pqPutc('P', conn) < 0 ||
-			pqPuts("", conn) < 0 ||
-			pqPutMsgEnd(conn) < 0)
-			goto sendFailed;
 
 		entry->queryclass = PGQUERY_EXTENDED;
 		entry->query = strdup(query);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 0ff563f59a..0b443a611a 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,27 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+static int n_notice;
+static void
+notice_processor(void *arg, const char *message)
+{
+	n_notice++;
+	fprintf(stderr, "%s", message);
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
 	PGresult   *res = NULL;
 	const char *dummy_params[1] = {"1"};
 	Oid			dummy_param_oids[1] = {INT4OID};
+	PQnoticeProcessor oldproc;
 
 	fprintf(stderr, "simple pipeline... ");
 
+	n_notice = 0;
+	oldproc = PQsetNoticeProcessor(conn, notice_processor, NULL);
+
 	/*
 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
 	 * process the results of as they come in.
@@ -1052,6 +1064,50 @@ test_simple_pipeline(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("Exiting pipeline mode didn't seem to work");
 
+	if (n_notice > 0)
+		pg_fatal("unexpected notice");
+
+	/* Try the same thing with PQsendQuery */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1;") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res =NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (n_notice > 0)
+		pg_fatal("unexpected notice");
+
+	PQsetNoticeProcessor(conn, oldproc, NULL);
+
 	fprintf(stderr, "ok\n");
 }
 
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace b/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
index 3fce548b99..254e485997 100644
--- a/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
@@ -38,7 +38,6 @@ F	26	Parse	 "" "SELECT 1; SELECT 2" 0
 F	12	Bind	 "" "" 0 0 0
 F	6	Describe	 P ""
 F	9	Execute	 "" 0
-F	6	Close	 P ""
 F	4	Sync
 B	NN	ErrorResponse	 S "ERROR" V "ERROR" C "42601" M "cannot insert multiple commands into a prepared statement" F "SSSS" L "SSSS" R "SSSS" \x00
 B	5	ReadyForQuery	 I
@@ -46,7 +45,6 @@ F	54	Parse	 "" "SELECT 1.0/g FROM generate_series(3, -1, -1) g" 0
 F	12	Bind	 "" "" 0 0 0
 F	6	Describe	 P ""
 F	9	Execute	 "" 0
-F	6	Close	 P ""
 F	4	Sync
 B	4	ParseComplete
 B	4	BindComplete
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..00710bc6bc 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,16 @@ B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
 B	11	DataRow	 1 1 '1'
 B	13	CommandComplete	 "SELECT 1"
 B	5	ReadyForQuery	 I
+F	17	Parse	 "" "SELECT 1;" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	5	ReadyForQuery	 I
 F	4	Terminate
-- 
2.31.1

#4Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#3)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Fri, 10 Jun 2022 15:25:44 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

The current implement of PQsendQueryInternal looks like the result of
a misunderstanding of the doc. In the regression tests, that path is
excercised only for an error case, where no CloseComplete comes.

The attached adds a test for the normal-path of pipelined
PQsendQuery() to simple_pipeline test then modifies that function not
to send Close message. Without the fix, the test fails by "unexpected
notice" even if the trace matches the "expected" content.

And, as a matter of course, this fix should be back-patched to 14.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#5Álvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Kyotaro Horiguchi (#3)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

On Fri, Jun 10, 2022, at 8:25 AM, Kyotaro Horiguchi wrote:

The current implement of PQsendQueryInternal looks like the result of
a misunderstanding of the doc. In the regression tests, that path is
excercised only for an error case, where no CloseComplete comes.

The attached adds a test for the normal-path of pipelined
PQsendQuery() to simple_pipeline test then modifies that function not
to send Close message. Without the fix, the test fails by "unexpected
notice" even if the trace matches the "expected" content.

Hah, the patch I wrote is almost identical to yours, down to the notice processor counting the number of notices received. The only difference is that I put my test in pipeline_abort.

Sadly, it looks like I won't be able to get this patched pushed for 14.4.

#6Tom Lane
tgl@sss.pgh.pa.us
In reply to: Álvaro Herrera (#5)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

=?UTF-8?Q?=C3=81lvaro_Herrera?= <alvherre@alvh.no-ip.org> writes:

Sadly, it looks like I won't be able to get this patched pushed for 14.4.

I think that's a good thing actually; this isn't urgent enough to
risk a last-minute commit. Please wait till the release freeze
lifts.

regards, tom lane

#7Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Kyotaro Horiguchi (#3)
1 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

On 2022-Jun-10, Kyotaro Horiguchi wrote:

(Moved to -hackers)

At Wed, 8 Jun 2022 17:08:47 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

What that Close message is doing is closing the unnamed portal, which
is otherwise closed implicitly when the next one is opened. That's how
single-query mode works: if you run a single portal, it'll be kept open.

I believe that the right fix is to not send that Close message in
PQsendQuery.

Agreed. At least Close message in that context is useless and
PQsendQueryGuts doesn't send it. And removes the Close message surely
fixes the issue.

So, git archaeology led me to this thread
/messages/by-id/202106072107.d4i55hdscxqj@alvherre.pgsql
which is why we added that message in the first place.

I was about to push the attached patch (a merged version of Kyotaro's
and mine), but now I'm wondering if that's the right approach.

Alternatives:

- Have the client not complain if it gets CloseComplete in idle state.
(After all, it's a pretty useless message, since we already do nothing
with it if we get it in BUSY state.)

- Have the server not send CloseComplete at all in the cases where
the client is not expecting it. Not sure how this would be
implemented.

- Other ideas?

--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
"That sort of implies that there are Emacs keystrokes which aren't obscure.
I've been using it daily for 2 years now and have yet to discover any key
sequence which makes any sense." (Paul Thomas)

Attachments:

v2-0001-PQsendQuery-Don-t-send-Close-in-pipeline-mode.patchtext/x-diff; charset=us-asciiDownload
From d9f0ff57fe8aa7f963a9411741bb1d68082cc31a Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 15 Jun 2022 19:56:41 +0200
Subject: [PATCH v2] PQsendQuery: Don't send Close in pipeline mode

In commit 4efcf47053ea, we modified PQsendQuery to send a Close message
when in pipeline mode.  But now we discover that that's not a good
thing: under certain circumstances, it causes the server to deliver a
CloseComplete message at a time when the client is not expecting it.  We
failed to noticed it because the tests don't have any scenario where the
problem is hit.  Remove the offending Close, and add a test case that
tickles the problematic scenario.

Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
---
 src/interfaces/libpq/fe-exec.c                |  5 --
 .../modules/libpq_pipeline/libpq_pipeline.c   | 62 +++++++++++++++++++
 .../traces/pipeline_abort.trace               |  2 -
 .../traces/simple_pipeline.trace              | 12 ++++
 4 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4180683194..e2df3a3480 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1403,11 +1403,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 			pqPutInt(0, 4, conn) < 0 ||
 			pqPutMsgEnd(conn) < 0)
 			goto sendFailed;
-		if (pqPutMsgStart('C', conn) < 0 ||
-			pqPutc('P', conn) < 0 ||
-			pqPuts("", conn) < 0 ||
-			pqPutMsgEnd(conn) < 0)
-			goto sendFailed;
 
 		entry->queryclass = PGQUERY_EXTENDED;
 		entry->query = strdup(query);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0ada..e24fbfe1cc 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,29 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print them, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+	int	   *n_notices = (int *) arg;
+
+	(*n_notices)++;
+	fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
 	PGresult   *res = NULL;
 	const char *dummy_params[1] = {"1"};
 	Oid			dummy_param_oids[1] = {INT4OID};
+	PQnoticeProcessor oldproc;
+	int			n_notices = 0;
 
 	fprintf(stderr, "simple pipeline... ");
 
+	oldproc = PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
 	/*
 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
 	 * process the results of as they come in.
@@ -1052,6 +1066,54 @@ test_simple_pipeline(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("Exiting pipeline mode didn't seem to work");
 
+	if (n_notices > 0)
+		pg_fatal("unexpected notice");
+
+	/* Try the same thing with PQsendQuery */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Must not have got any notices here; note bug as described in
+	 * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+	 */
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+
+	PQsetNoticeProcessor(conn, oldproc, NULL);
+
 	fprintf(stderr, "ok\n");
 }
 
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace b/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
index 3fce548b99..254e485997 100644
--- a/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
@@ -38,7 +38,6 @@ F	26	Parse	 "" "SELECT 1; SELECT 2" 0
 F	12	Bind	 "" "" 0 0 0
 F	6	Describe	 P ""
 F	9	Execute	 "" 0
-F	6	Close	 P ""
 F	4	Sync
 B	NN	ErrorResponse	 S "ERROR" V "ERROR" C "42601" M "cannot insert multiple commands into a prepared statement" F "SSSS" L "SSSS" R "SSSS" \x00
 B	5	ReadyForQuery	 I
@@ -46,7 +45,6 @@ F	54	Parse	 "" "SELECT 1.0/g FROM generate_series(3, -1, -1) g" 0
 F	12	Bind	 "" "" 0 0 0
 F	6	Describe	 P ""
 F	9	Execute	 "" 0
-F	6	Close	 P ""
 F	4	Sync
 B	4	ParseComplete
 B	4	BindComplete
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..349034dda6 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,16 @@ B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
 B	11	DataRow	 1 1 '1'
 B	13	CommandComplete	 "SELECT 1"
 B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	5	ReadyForQuery	 I
 F	4	Terminate
-- 
2.30.2

#8Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#7)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

So, git archaeology led me to this thread
/messages/by-id/202106072107.d4i55hdscxqj@alvherre.pgsql
which is why we added that message in the first place.

Um. Good thing you looked. I doubt we want to revert that change now.

Alternatives:
- Have the client not complain if it gets CloseComplete in idle state.
(After all, it's a pretty useless message, since we already do nothing
with it if we get it in BUSY state.)

ISTM the actual problem here is that we're reverting to IDLE state too
soon. I didn't try to trace down exactly where that's happening, but
I notice that in the non-pipeline case we don't go to IDLE till we've
seen 'Z' (Sync). Something in the pipeline logic must be jumping the
gun on that state transition.

regards, tom lane

#9Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Tom Lane (#8)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Wed, 15 Jun 2022 14:56:42 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

So, git archaeology led me to this thread
/messages/by-id/202106072107.d4i55hdscxqj@alvherre.pgsql
which is why we added that message in the first place.

Um. Good thing you looked. I doubt we want to revert that change now.

Alternatives:
- Have the client not complain if it gets CloseComplete in idle state.
(After all, it's a pretty useless message, since we already do nothing
with it if we get it in BUSY state.)

ISTM the actual problem here is that we're reverting to IDLE state too
soon. I didn't try to trace down exactly where that's happening, but

Yes. I once visited that fact but also I thought that in the
comparison with non-pipelined PQsendQuery, the three messages look
extra. Thus I concluded (at the time) that removing Close is enough
here.

I notice that in the non-pipeline case we don't go to IDLE till we've
seen 'Z' (Sync). Something in the pipeline logic must be jumping the
gun on that state transition.

PQgetResult() resets the state to IDLE when not in pipeline mode.

fe-exec.c:2171

if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
conn->asyncStatus = PGASYNC_IDLE;

And actually that code let the connection state enter to IDLE before
CloseComplete. In the test case I posted, the following happens.

PQsendQuery(conn, "SELECT 1;");
PQsendFlushRequest(conn);
PQgetResult(conn); // state enters IDLE, reads down to <CommandComplete>
PQgetResult(conn); // reads <CloseComplete comes>
PQpipelineSync(conn); // sync too late

Pipeline feature seems intending to allow PQgetResult called before
PQpipelineSync. And also seems allowing to call QPpipelineSync() after
PQgetResult().

I haven't come up with a valid *fix* of this flow..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#10Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#9)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Thu, 16 Jun 2022 10:34:22 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

At Wed, 15 Jun 2022 14:56:42 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in

Alvaro Herrera <alvherre@alvh.no-ip.org> writes:

So, git archaeology led me to this thread
/messages/by-id/202106072107.d4i55hdscxqj@alvherre.pgsql
which is why we added that message in the first place.

Um. Good thing you looked. I doubt we want to revert that change now.

Alternatives:
- Have the client not complain if it gets CloseComplete in idle state.
(After all, it's a pretty useless message, since we already do nothing
with it if we get it in BUSY state.)

ISTM the actual problem here is that we're reverting to IDLE state too
soon. I didn't try to trace down exactly where that's happening, but

Yes. I once visited that fact but also I thought that in the
comparison with non-pipelined PQsendQuery, the three messages look
extra. Thus I concluded (at the time) that removing Close is enough
here.

I notice that in the non-pipeline case we don't go to IDLE till we've
seen 'Z' (Sync). Something in the pipeline logic must be jumping the
gun on that state transition.

- PQgetResult() resets the state to IDLE when not in pipeline mode.

D... the "not" should not be there.

+ PQgetResult() resets the state to IDLE while in pipeline mode.

fe-exec.c:2171

if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
conn->asyncStatus = PGASYNC_IDLE;

And actually that code let the connection state enter to IDLE before
CloseComplete. In the test case I posted, the following happens.

PQsendQuery(conn, "SELECT 1;");
PQsendFlushRequest(conn);
PQgetResult(conn); // state enters IDLE, reads down to <CommandComplete>
PQgetResult(conn); // reads <CloseComplete comes>
PQpipelineSync(conn); // sync too late

Pipeline feature seems intending to allow PQgetResult called before
PQpipelineSync. And also seems allowing to call QPpipelineSync() after
PQgetResult().

I haven't come up with a valid *fix* of this flow..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#11Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#9)
2 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Thu, 16 Jun 2022 10:34:22 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

PQgetResult() resets the state to IDLE while in pipeline mode.

fe-exec.c:2171

if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
conn->asyncStatus = PGASYNC_IDLE;

And actually that code let the connection state enter to IDLE before
CloseComplete. In the test case I posted, the following happens.

PQsendQuery(conn, "SELECT 1;");
PQsendFlushRequest(conn);
PQgetResult(conn); // state enters IDLE, reads down to <CommandComplete>
PQgetResult(conn); // reads <CloseComplete comes>
PQpipelineSync(conn); // sync too late

Pipeline feature seems intending to allow PQgetResult called before
PQpipelineSync. And also seems allowing to call QPpipelineSync() after
PQgetResult().

I haven't come up with a valid *fix* of this flow..

The attached is a crude patch to separate the state for PIPELINE-IDLE
from PGASYNC_IDLE. I haven't found a better way..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

libpqpline-test.difftext/x-patch; charset=us-asciiDownload
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 0ff563f59a..261ccc3ed4 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,27 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+static int n_notice;
+static void
+notice_processor(void *arg, const char *message)
+{
+	n_notice++;
+	fprintf(stderr, "%s", message);
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
 	PGresult   *res = NULL;
 	const char *dummy_params[1] = {"1"};
 	Oid			dummy_param_oids[1] = {INT4OID};
-
+	PQnoticeProcessor oldproc;
+ 
 	fprintf(stderr, "simple pipeline... ");
 
+	n_notice = 0;
+	oldproc = PQsetNoticeProcessor(conn, notice_processor, NULL);
+
 	/*
 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
 	 * process the results of as they come in.
@@ -1052,6 +1064,51 @@ test_simple_pipeline(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("Exiting pipeline mode didn't seem to work");
 
+	if (n_notice > 0)
+		pg_fatal("unexpected notice");
+
+	/* Try the same thing with PQsendQuery */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1;") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res =NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (n_notice > 0)
+		pg_fatal("unexpected notice");
+
+	PQsetNoticeProcessor(conn, oldproc, NULL);
+
 	fprintf(stderr, "ok\n");
 }
 
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..1612c371c0 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,18 @@ B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
 B	11	DataRow	 1 1 '1'
 B	13	CommandComplete	 "SELECT 1"
 B	5	ReadyForQuery	 I
+F	17	Parse	 "" "SELECT 1;" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	4	CloseComplete
+B	5	ReadyForQuery	 I
 F	4	Terminate
libpqpline-separte-pipelineidle-state.difftext/x-patch; charset=us-asciiDownload
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 919cf5741d..a8999911d7 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1380,7 +1380,7 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 				pqPipelineProcessQueue(conn);
 			break;
 	}
@@ -1778,6 +1778,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 				appendPQExpBufferStr(&conn->errorMessage,
 									 libpq_gettext("cannot queue commands during COPY\n"));
 				return false;
+			case PGASYNC_PIPELINE_IDLE:
+				appendPQExpBufferStr(&conn->errorMessage,
+									 libpq_gettext("unexpected PGASYNC_PIPELINE_IDLE state during COPY\n"));
+				return false;
 		}
 	}
 	else
@@ -2144,15 +2148,17 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.
-				 */
-				pqPipelineProcessQueue(conn);
-			}
+			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert (conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			res = NULL;			/* query is complete */
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query when we're called next.
+			 */
+			pqPipelineProcessQueue(conn);
 			break;
 		case PGASYNC_READY:
 
@@ -2174,7 +2180,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -3090,9 +3096,10 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
+		case PGASYNC_IDLE:
 			/* client still has to process current query or results */
 			return;
-		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* next query please */
 			break;
 	}
@@ -3100,7 +3107,10 @@ pqPipelineProcessQueue(PGconn *conn)
 	/* Nothing to do if not in pipeline mode, or queue is empty */
 	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
 		conn->cmd_queue_head == NULL)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
 
 	/*
 	 * Reset the error state.  This and the next couple of steps correspond to
@@ -3193,6 +3203,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 10c76daf6e..0d60e8c5c0 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 3db6a17db4..7fcad1dd41 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -225,7 +225,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
#12Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Kyotaro Horiguchi (#11)
2 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

On 2022-Jun-16, Kyotaro Horiguchi wrote:

The attached is a crude patch to separate the state for PIPELINE-IDLE
from PGASYNC_IDLE. I haven't found a better way..

Ah, yeah, this might be a way to fix this.

Something very similar to a PIPELINE_IDLE mode was present in Craig's
initial patch for pipeline mode. However, I fought very hard to remove
it, because it seemed to me that failing to handle it correctly
everywhere would lead to more bugs than not having it. (Indeed, there
were some.)

However, I see now that your patch would not only fix this bug, but also
let us remove the ugly "notionally not-idle" bit in fe-protocol3.c,
which makes me ecstatic. So let's push forward with this. However,
this means we'll have to go over all places that use asyncStatus to
ensure that they all handle the new value correctly.

I did found one bug in your patch: in the switch for asyncStatus in
PQsendQueryStart, you introduce a new error message. With the current
tests, that never fires, which is telling us that our coverage is not
complete. But with the right sequence of libpq calls, which the
attached adds (note that it's for REL_14_STABLE), that can be hit, and
it's easy to see that throwing an error there is a mistake. The right
action to take there is to let the action through.

Others to think about:

PQisBusy (I think no changes are needed),
PQfn (I think it should accept a call in PGASYNC_PIPELINE_IDLE mode;
fully untested in pipeline mode),
PQexitPipelineMode (I think it needs to return error; needs test case),
PQsendFlushRequest (I think it should let through; ditto).

I also attach a patch to make the test suite use Test::Differences, if
available. It makes the diffs of the traces much easier to read, when
they fail. (I wish for a simple way to set the context size, but that
would need a shim routine that I'm currently too lazy to write.)

--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/

Attachments:

v5-0001-Avoid-going-IDLE-in-pipeline-mode.patchtext/x-diff; charset=us-asciiDownload
From f96058efc70c3f72bc910308a9c2f64e4d3c7d8e Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 15 Jun 2022 19:56:41 +0200
Subject: [PATCH v5 1/2] Avoid going IDLE in pipeline mode

Introduce a new PGASYNC_PIPELINE_IDLE state, which helps PQgetResult
distinguish the case of "really idle".

This fixes the problem that ReadyForQuery is sent too soon, which caused
a CloseComplete to be received when in idle state.

XXX -- this is still WIP.

Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
---
 src/interfaces/libpq/fe-connect.c             |  1 +
 src/interfaces/libpq/fe-exec.c                | 45 +++++----
 src/interfaces/libpq/fe-protocol3.c           | 12 ---
 src/interfaces/libpq/libpq-int.h              |  3 +-
 .../modules/libpq_pipeline/libpq_pipeline.c   | 99 +++++++++++++++++++
 .../traces/simple_pipeline.trace              | 37 +++++++
 6 files changed, 166 insertions(+), 31 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 709ba15220..afd0bc809a 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn)
 {
 	if (!conn || conn->status != CONNECTION_OK)
 		return PQTRANS_UNKNOWN;
+	/* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */
 	if (conn->asyncStatus != PGASYNC_IDLE)
 		return PQTRANS_ACTIVE;
 	return conn->xactStatus;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4180683194..59f2e7f724 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_IDLE ||
+				conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 			{
 				resetPQExpBuffer(&conn->errorMessage);
 				pqPipelineProcessQueue(conn);
@@ -1642,9 +1643,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		return false;
 	}
 
-	/* Can't send while already busy, either, unless enqueuing for later */
-	if (conn->asyncStatus != PGASYNC_IDLE &&
-		conn->pipelineStatus == PQ_PIPELINE_OFF)
+	/* In non-pipeline mode, we can't send anything while already busy */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+		conn->asyncStatus != PGASYNC_IDLE)
 	{
 		appendPQExpBufferStr(&conn->errorMessage,
 							 libpq_gettext("another command is already in progress\n"));
@@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		switch (conn->asyncStatus)
 		{
 			case PGASYNC_IDLE:
+			case PGASYNC_PIPELINE_IDLE:
 			case PGASYNC_READY:
 			case PGASYNC_READY_MORE:
 			case PGASYNC_BUSY:
 				/* ok to queue */
 				break;
+
 			case PGASYNC_COPY_IN:
 			case PGASYNC_COPY_OUT:
 			case PGASYNC_COPY_BOTH:
@@ -1881,6 +1884,7 @@ PQsetSingleRowMode(PGconn *conn)
 	 */
 	if (!conn)
 		return 0;
+	/* XXX modify this? */
 	if (conn->asyncStatus != PGASYNC_BUSY)
 		return 0;
 	if (!conn->cmd_queue_head ||
@@ -2047,19 +2051,19 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.  Also, since this
-				 * is the start of the results of the next query, clear any
-				 * prior error message.
-				 */
-				resetPQExpBuffer(&conn->errorMessage);
-				pqPipelineProcessQueue(conn);
-			}
 			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			res = NULL;			/* query is complete */
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query when we're called next.
+			 */
+			pqPipelineProcessQueue(conn);
+			break;
+
 		case PGASYNC_READY:
 
 			/*
@@ -2080,7 +2084,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -3008,9 +3012,10 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
+		case PGASYNC_IDLE:
 			/* client still has to process current query or results */
 			return;
-		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* next query please */
 			break;
 	}
@@ -3018,7 +3023,10 @@ pqPipelineProcessQueue(PGconn *conn)
 	/* Nothing to do if not in pipeline mode, or queue is empty */
 	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
 		conn->cmd_queue_head == NULL)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
 
 	/* Initialize async result-accumulation state */
 	pqClearAsyncResult(conn);
@@ -3105,6 +3113,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9ab3bf1fcb..bab8926a63 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 334aea4b6e..44a65e41b7 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -224,7 +224,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* Pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0ada..2bdd4e308f 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,29 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print them, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+	int	   *n_notices = (int *) arg;
+
+	(*n_notices)++;
+	fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
 	PGresult   *res = NULL;
 	const char *dummy_params[1] = {"1"};
 	Oid			dummy_param_oids[1] = {INT4OID};
+	PQnoticeProcessor oldproc;
+	int			n_notices = 0;
 
 	fprintf(stderr, "simple pipeline... ");
 
+	oldproc = PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
 	/*
 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
 	 * process the results of as they come in.
@@ -1052,6 +1066,91 @@ test_simple_pipeline(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("Exiting pipeline mode didn't seem to work");
 
+	if (n_notices > 0)
+		pg_fatal("unexpected notice");
+
+
+	/* Try the same thing with PQsendQuery */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Must not have got any notices here; note bug as described in
+	 * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+	 */
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+
+	PQsetNoticeProcessor(conn, oldproc, NULL);
+
+	/*
+	 * Send a second command when libpq is in "pipeline-idle" state.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	/* read terminating null from first query */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
 	fprintf(stderr, "ok\n");
 }
 
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..e1c0cba07e 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,41 @@ B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
 B	11	DataRow	 1 1 '1'
 B	13	CommandComplete	 "SELECT 1"
 B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	4	CloseComplete
+B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	16	Parse	 "" "SELECT 2" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	CloseComplete
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
 F	4	Terminate
-- 
2.30.2

v5-0002-Use-Test-Differences-if-available.patchtext/x-diff; charset=us-asciiDownload
From 48ff379f2688ec4237d863a16ff0e23e668bc3a4 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 15 Jun 2022 19:42:23 +0200
Subject: [PATCH v5 2/2] Use Test::Differences if available

---
 src/test/modules/libpq_pipeline/README              |  3 +++
 .../modules/libpq_pipeline/t/001_libpq_pipeline.pl  | 13 ++++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README
index d8174dd579..59c6ea8109 100644
--- a/src/test/modules/libpq_pipeline/README
+++ b/src/test/modules/libpq_pipeline/README
@@ -1 +1,4 @@
 Test programs and libraries for libpq
+
+If you have Test::Differences installed, any differences in the trace files
+are displayed in a format that's easier to read than the standard format.
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index d8d496c995..49eec8a63a 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -9,6 +9,17 @@ use PostgresNode;
 use TestLib;
 use Test::More;
 
+# Use Test::Differences if installed, and select unified diff output.
+# No decent way to select a context line count with this;
+# we could use a sub ref to allow that.
+BEGIN
+{
+	if (!eval q{ use Test::Differences; unified_diff(); 1 })
+	{
+		*eq_or_diff = \&is;
+	}
+}
+
 my $node = get_new_node('main');
 $node->init;
 $node->start;
@@ -54,7 +65,7 @@ for my $testname (@tests)
 		$result = slurp_file_eval($traceout);
 		next unless $result ne "";
 
-		is($result, $expected, "$testname trace match");
+		eq_or_diff($result, $expected, "$testname trace match");
 	}
 }
 
-- 
2.30.2

#13Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#12)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Fri, 17 Jun 2022 20:31:50 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

On 2022-Jun-16, Kyotaro Horiguchi wrote:

The attached is a crude patch to separate the state for PIPELINE-IDLE
from PGASYNC_IDLE. I haven't found a better way..

Ah, yeah, this might be a way to fix this.

Something very similar to a PIPELINE_IDLE mode was present in Craig's
initial patch for pipeline mode. However, I fought very hard to remove
it, because it seemed to me that failing to handle it correctly
everywhere would lead to more bugs than not having it. (Indeed, there
were some.)

However, I see now that your patch would not only fix this bug, but also
let us remove the ugly "notionally not-idle" bit in fe-protocol3.c,
which makes me ecstatic. So let's push forward with this. However,

Yey.

this means we'll have to go over all places that use asyncStatus to
ensure that they all handle the new value correctly.

Sure.

I did found one bug in your patch: in the switch for asyncStatus in
PQsendQueryStart, you introduce a new error message. With the current
tests, that never fires, which is telling us that our coverage is not
complete. But with the right sequence of libpq calls, which the
attached adds (note that it's for REL_14_STABLE), that can be hit, and

# (ah, I wondered why it failed to apply..)

it's easy to see that throwing an error there is a mistake. The right
action to take there is to let the action through.

Yeah.. Actulallly I really did it carelessly.. Thanks!

Others to think about:

PQisBusy (I think no changes are needed),

Agreed.

PQfn (I think it should accept a call in PGASYNC_PIPELINE_IDLE mode;
fully untested in pipeline mode),

Does a PQ_PIPELINE_OFF path need that? Rather I thought that we need
Assert(!conn->asyncStatus != PGASYNC_PIPELINE_IDLE) there. But anyway
we might need a test for this path.

PQexitPipelineMode (I think it needs to return error; needs test case),

Agreed about test case. Currently the function doesn't handle
PGASYNC_IDLE so I thought that PGASYNC_PIPELINE_IDLE also don't need a
care. If the function treats PGASYNC_PIPELINE_IDLE state as error,
the regression test fails (but I haven't examine it furtuer.)

PQsendFlushRequest (I think it should let through; ditto).

Does that mean exit without pushing 'H' message?

I also attach a patch to make the test suite use Test::Differences, if
available. It makes the diffs of the traces much easier to read, when
they fail. (I wish for a simple way to set the context size, but that
would need a shim routine that I'm currently too lazy to write.)

Yeah, it was annoying that the script prints expected and result trace
separately. It looks pretty good with the patch. I don't think
there's much use of context size here.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#14Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#13)
1 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Tue, 21 Jun 2022 11:42:59 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

At Fri, 17 Jun 2022 20:31:50 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

Others to think about:

PQisBusy (I think no changes are needed),

Agreed.

PQfn (I think it should accept a call in PGASYNC_PIPELINE_IDLE mode;
fully untested in pipeline mode),

Does a PQ_PIPELINE_OFF path need that? Rather I thought that we need
Assert(!conn->asyncStatus != PGASYNC_PIPELINE_IDLE) there. But anyway
we might need a test for this path.

In the attached, PQfn() is used while in pipeline mode and
PGASYNC_PIPELINE_IDLE. Both error out and effectivelly no-op.

PQexitPipelineMode (I think it needs to return error; needs test case),

Agreed about test case. Currently the function doesn't handle
PGASYNC_IDLE so I thought that PGASYNC_PIPELINE_IDLE also don't need a
care. If the function treats PGASYNC_PIPELINE_IDLE state as error,
the regression test fails (but I haven't examine it furtuer.)

PQexitPipelineMode() is called while PGASYNC_PIPELINE_IDLE.

PQsendFlushRequest (I think it should let through; ditto).

Does that mean exit without pushing 'H' message?

I didn't do anything on this in the sttached.

By the way, I noticed that "libpq_pipeline uniqviol" intermittently
fails for uncertain reasons.

result 574/575: pipeline aborted
...........................................................
done writing

libpq_pipeline:1531: got unexpected NULL

The "...........done writing" is printed too late in the error cases.

This causes the TAP test fail, but I haven't find what's happnening at
the time.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

libpq_pline_add_some_tests.difftext/x-patch; charset=us-asciiDownload
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 0ff563f59a..84734f5d41 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -53,6 +53,8 @@ static const char *const insert_sql =
 static const char *const insert_sql2 =
 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
 
+Oid	oid_pg_backend_pid;
+
 /* max char length of an int32/64, plus sign and null terminator */
 #define MAXINTLEN 12
 #define MAXINT8LEN 20
@@ -86,6 +88,24 @@ pg_fatal_impl(int line, const char *fmt,...)
 	exit(1);
 }
 
+/* returns true if PQfn succeeded */
+static bool
+PQfn_succeed(PGconn *conn)
+{
+	PGresult   *res;
+	int			res_buf;
+	int			res_len;
+
+	/* PQfn should fail in pipeline mode */
+	if ((res = PQfn(conn, oid_pg_backend_pid, &res_buf, &res_len, 1, NULL, 0))
+		!= NULL &&
+		PQresultStatus(res) == PGRES_COMMAND_OK)
+		return true;
+
+	return false;
+}
+
+
 static void
 test_disallowed_in_pipeline(PGconn *conn)
 {
@@ -93,6 +113,9 @@ test_disallowed_in_pipeline(PGconn *conn)
 
 	fprintf(stderr, "test error cases... ");
 
+	if (oid_pg_backend_pid == 0)
+		pg_fatal("Failed to obtain functino oid: %s", PQgetvalue(res, 0, 0));
+		
 	if (PQisnonblocking(conn))
 		pg_fatal("Expected blocking connection mode");
 
@@ -107,6 +130,10 @@ test_disallowed_in_pipeline(PGconn *conn)
 	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
 		pg_fatal("PQexec should fail in pipeline mode but succeeded");
 
+	/* PQfn should fail in pipeline mode */
+	if (PQfn_succeed(conn))
+		pg_fatal("PQfn succeeded while in pipeline mode");
+
 	/* Entering pipeline mode when already in pipeline mode is OK */
 	if (PQenterPipelineMode(conn) != 1)
 		pg_fatal("re-entering pipeline mode should be a no-op but failed");
@@ -1014,6 +1041,18 @@ test_simple_pipeline(PGconn *conn)
 	PQclear(res);
 	res = NULL;
 
+	/* PQfn shoud be error no-op while PGASYNC_PIPELINE_IDLE */
+	if (PQfn_succeed(conn))
+		pg_fatal("PQfn succeeded while in pipeline mode");
+
+	/*
+	 * Trial to getting out of pipeline mode while PGASYNC_PIPELINE_IDLE should
+	 * gracefully fail.
+	 */
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	/* This PQgetResult moves async status to PGASYNC_BUSY */
 	if (PQgetResult(conn) != NULL)
 		pg_fatal("PQgetResult returned something extra after first query result.");
 
@@ -1627,6 +1666,15 @@ main(int argc, char **argv)
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
 
+	/* obtain function OID of pg_backend_pid for PQfn test */
+	res = PQexec(conn, "SELECT 'pg_backend_pid'::regproc::int");
+	if (!res ||
+		(PQresultStatus(res) != PGRES_COMMAND_OK &&
+		 PQresultStatus(res) != PGRES_TUPLES_OK))
+		pg_fatal("Failed to obtain functino oid");
+
+	oid_pg_backend_pid = atoi(PQgetvalue(res, 0, 0));
+
 	/* Set the trace file, if requested */
 	if (tracefile != NULL)
 	{
#15Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#14)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Tue, 21 Jun 2022 14:56:40 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

By the way, I noticed that "libpq_pipeline uniqviol" intermittently
fails for uncertain reasons.

result 574/575: pipeline aborted
...........................................................
done writing

libpq_pipeline:1531: got unexpected NULL

The "...........done writing" is printed too late in the error cases.

This causes the TAP test fail, but I haven't find what's happnening at
the time.

Just to make sure, I see this with the master branch

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#16Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#15)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Tue, 21 Jun 2022 14:59:07 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

At Tue, 21 Jun 2022 14:56:40 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

By the way, I noticed that "libpq_pipeline uniqviol" intermittently
fails for uncertain reasons.

result 574/575: pipeline aborted
...........................................................
done writing

libpq_pipeline:1531: got unexpected NULL

The "...........done writing" is printed too late in the error cases.

This causes the TAP test fail, but I haven't find what's happnening at
the time.

Just to make sure, I see this with the master branch

No. It *is* caused by the fix. Sorry for the mistake. The test module
linked to the wrong binary..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#17Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#14)
1 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Tue, 21 Jun 2022 14:56:40 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

By the way, I noticed that "libpq_pipeline uniqviol" intermittently
fails for uncertain reasons.

result 574/575: pipeline aborted
...........................................................
done writing

libpq_pipeline:1531: got unexpected NULL

PQsendQueryPrepared() is called after the conection's state has moved
to PGASYNC_IDLE so PQgetResult returns NULL. But actually there are
results. So, if pqPipelineProcessorQueue() doesn't move the async
state to PGASYNC_IDLE when queue is emtpy, uniqviol can run till the
end. But that change breaks almost all of other test items.

Finally, I found that the change in pqPipelineProcessorQueue() as
attached fixes the uniqviol failure and doesn't break other tests.
However, I don't understand what I did by the change for now... X(
It seems to me something's wrong in the PQ_PIPELINE_ABORTED mode..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

v6-0001-Avoid-going-IDLE-in-pipeline-mode.patchtext/x-patch; charset=us-asciiDownload
From f0b69fbc4f708d3b844b672989dbf15f84ed0c9b Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 15 Jun 2022 19:56:41 +0200
Subject: [PATCH v6] Avoid going IDLE in pipeline mode

Introduce a new PGASYNC_PIPELINE_IDLE state, which helps PQgetResult
distinguish the case of "really idle".

This fixes the problem that ReadyForQuery is sent too soon, which caused
a CloseComplete to be received when in idle state.

XXX -- this is still WIP.

Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
---
 src/interfaces/libpq/fe-connect.c             |  1 +
 src/interfaces/libpq/fe-exec.c                | 56 +++++++----
 src/interfaces/libpq/fe-protocol3.c           | 12 ---
 src/interfaces/libpq/libpq-int.h              |  3 +-
 .../modules/libpq_pipeline/libpq_pipeline.c   | 99 +++++++++++++++++++
 .../traces/simple_pipeline.trace              | 37 +++++++
 6 files changed, 175 insertions(+), 33 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 709ba15220..afd0bc809a 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn)
 {
 	if (!conn || conn->status != CONNECTION_OK)
 		return PQTRANS_UNKNOWN;
+	/* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */
 	if (conn->asyncStatus != PGASYNC_IDLE)
 		return PQTRANS_ACTIVE;
 	return conn->xactStatus;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4180683194..3cf59e45e1 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_IDLE ||
+				conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 			{
 				resetPQExpBuffer(&conn->errorMessage);
 				pqPipelineProcessQueue(conn);
@@ -1642,9 +1643,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		return false;
 	}
 
-	/* Can't send while already busy, either, unless enqueuing for later */
-	if (conn->asyncStatus != PGASYNC_IDLE &&
-		conn->pipelineStatus == PQ_PIPELINE_OFF)
+	/* In non-pipeline mode, we can't send anything while already busy */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+		conn->asyncStatus != PGASYNC_IDLE)
 	{
 		appendPQExpBufferStr(&conn->errorMessage,
 							 libpq_gettext("another command is already in progress\n"));
@@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		switch (conn->asyncStatus)
 		{
 			case PGASYNC_IDLE:
+			case PGASYNC_PIPELINE_IDLE:
 			case PGASYNC_READY:
 			case PGASYNC_READY_MORE:
 			case PGASYNC_BUSY:
 				/* ok to queue */
 				break;
+
 			case PGASYNC_COPY_IN:
 			case PGASYNC_COPY_OUT:
 			case PGASYNC_COPY_BOTH:
@@ -1881,6 +1884,7 @@ PQsetSingleRowMode(PGconn *conn)
 	 */
 	if (!conn)
 		return 0;
+	/* XXX modify this? */
 	if (conn->asyncStatus != PGASYNC_BUSY)
 		return 0;
 	if (!conn->cmd_queue_head ||
@@ -2047,19 +2051,19 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.  Also, since this
-				 * is the start of the results of the next query, clear any
-				 * prior error message.
-				 */
-				resetPQExpBuffer(&conn->errorMessage);
-				pqPipelineProcessQueue(conn);
-			}
 			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			res = NULL;			/* query is complete */
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query when we're called next.
+			 */
+			pqPipelineProcessQueue(conn);
+			break;
+
 		case PGASYNC_READY:
 
 			/*
@@ -2080,7 +2084,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -3008,17 +3012,28 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
+		case PGASYNC_IDLE:
 			/* client still has to process current query or results */
 			return;
-		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* next query please */
 			break;
 	}
 
 	/* Nothing to do if not in pipeline mode, or queue is empty */
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
-		conn->cmd_queue_head == NULL)
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		if (conn->pipelineStatus != PQ_PIPELINE_ABORTED)
+			conn->asyncStatus = PGASYNC_IDLE;
+			
+		return;
+	}
 
 	/* Initialize async result-accumulation state */
 	pqClearAsyncResult(conn);
@@ -3105,6 +3120,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9ab3bf1fcb..bab8926a63 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 334aea4b6e..44a65e41b7 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -224,7 +224,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* Pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0ada..2bdd4e308f 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,15 +968,29 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print them, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+	int	   *n_notices = (int *) arg;
+
+	(*n_notices)++;
+	fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
 	PGresult   *res = NULL;
 	const char *dummy_params[1] = {"1"};
 	Oid			dummy_param_oids[1] = {INT4OID};
+	PQnoticeProcessor oldproc;
+	int			n_notices = 0;
 
 	fprintf(stderr, "simple pipeline... ");
 
+	oldproc = PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
 	/*
 	 * Enter pipeline mode and dispatch a set of operations, which we'll then
 	 * process the results of as they come in.
@@ -1052,6 +1066,91 @@ test_simple_pipeline(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("Exiting pipeline mode didn't seem to work");
 
+	if (n_notices > 0)
+		pg_fatal("unexpected notice");
+
+
+	/* Try the same thing with PQsendQuery */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Must not have got any notices here; note bug as described in
+	 * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+	 */
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+
+	PQsetNoticeProcessor(conn, oldproc, NULL);
+
+	/*
+	 * Send a second command when libpq is in "pipeline-idle" state.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	/* read terminating null from first query */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
 	fprintf(stderr, "ok\n");
 }
 
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
index 5c94749bc1..e1c0cba07e 100644
--- a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -9,4 +9,41 @@ B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
 B	11	DataRow	 1 1 '1'
 B	13	CommandComplete	 "SELECT 1"
 B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	4	CloseComplete
+B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	16	Parse	 "" "SELECT 2" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	CloseComplete
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
 F	4	Terminate
-- 
2.31.1

#18Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Kyotaro Horiguchi (#17)
6 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

So I wrote some more test scenarios for this, and as I wrote in some
other thread, I realized that there are more problems than just some
NOTICE trouble. For instance, if you send a query, then read the result
but not the terminating NULL then send another query, everything gets
confused; the next thing you'll read is the result for the second query,
without having read the NULL that terminates the results of the first
query. Any application that expects the usual flow of results will be
confused. Kyotaro's patch to add PIPELINE_IDLE fixes this bit too, as
far as I can tell.

However, another problem case, not fixed by PIPELINE_IDLE, occurs if you
exit pipeline mode after PQsendQuery() and then immediately use
PQexec(). The CloseComplete will be received at the wrong time, and a
notice is emitted nevertheless.

I spent a lot of time trying to understand how to fix this last bit, and
the solution I came up with is that PQsendQuery() must add a second
entry to the command queue after the PGQUERY_EXTENDED one, to match the
CloseComplete message being expected; with that entry in the queue,
PQgetResult will really only get to IDLE mode after the Close has been
seen, which is what we want. I named it PGQUERY_CLOSE.

Sadly, some hacks are needed to make this work fully:

1. the client is never expecting that PQgetResult() would return
anything for the CloseComplete, so something needs to consume the
CloseComplete silently (including the queue entry for it) when it is
received; I chose to do this directly in pqParseInput3. I tried to
make PQgetResult itself do it, but it became a pile of hacks until I
was no longer sure what was going on. Putting it in fe-protocol3.c
ends up a lot cleaner. However, we still need PQgetResult to invoke
parseInput() at the point where Close is expected.

2. if an error occurs while executing the query, the CloseComplete will
of course never arrive, so we need to erase it from the queue
silently if we're returning an error.

I toyed with the idea of having parseInput() produce a PGresult that
encodes the Close message, and have PQgetResult consume and discard
that, then read some further message to have something to return. But
it seemed inefficient and equally ugly and I'm not sure that flow
control is any simpler.

I think another possibility would be to make PQexitPipelineMode
responsible for /something/, but I'm not sure what that would be.
Checking the queue and seeing if the next message is CloseComplete, then
eating that message before exiting pipeline mode? That seems way too
magical. I didn't attempt this.

I attach a patch series that implements the proposed fix (again for
REL_14_STABLE) in steps, to make it easy to read. I intend to squash
the whole lot into a single commit before pushing. But while writing
this email it occurred to me that I need to add at least one more test,
to receive a WARNING while waiting for CloseComplete. AFAICT it should
work, but better make sure.

I produced pipeline_idle.trace file by running the test in the fully
fixed tree, then used it to verify that all tests fail in different ways
when run without the fixes. The first fix with PIPELINE_IDLE fixes some
of these failures, and the PGQUERY_CLOSE one fixes the remaining one.
Reading the trace file, it looks correct to me.

--
Álvaro Herrera Breisgau, Deutschland — https://www.EnterpriseDB.com/
"Doing what he did amounts to sticking his fingers under the hood of the
implementation; if he gets his fingers burnt, it's his problem." (Tom Lane)

Attachments:

v7-0001-Use-Test-Differences-if-available.patchtext/x-diff; charset=us-asciiDownload
From 64fc6f56f88cf3d5e6c3eaada32887939ad3b49f Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 15 Jun 2022 19:42:23 +0200
Subject: [PATCH v7 1/6] Use Test::Differences if available

---
 src/test/modules/libpq_pipeline/README              |  3 +++
 .../modules/libpq_pipeline/t/001_libpq_pipeline.pl  | 13 ++++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README
index d8174dd579..59c6ea8109 100644
--- a/src/test/modules/libpq_pipeline/README
+++ b/src/test/modules/libpq_pipeline/README
@@ -1 +1,4 @@
 Test programs and libraries for libpq
+
+If you have Test::Differences installed, any differences in the trace files
+are displayed in a format that's easier to read than the standard format.
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index d8d496c995..49eec8a63a 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -9,6 +9,17 @@ use PostgresNode;
 use TestLib;
 use Test::More;
 
+# Use Test::Differences if installed, and select unified diff output.
+# No decent way to select a context line count with this;
+# we could use a sub ref to allow that.
+BEGIN
+{
+	if (!eval q{ use Test::Differences; unified_diff(); 1 })
+	{
+		*eq_or_diff = \&is;
+	}
+}
+
 my $node = get_new_node('main');
 $node->init;
 $node->start;
@@ -54,7 +65,7 @@ for my $testname (@tests)
 		$result = slurp_file_eval($traceout);
 		next unless $result ne "";
 
-		is($result, $expected, "$testname trace match");
+		eq_or_diff($result, $expected, "$testname trace match");
 	}
 }
 
-- 
2.30.2

v7-0002-Allow-tracefile-to-be-stdout.patchtext/x-diff; charset=us-asciiDownload
From e0b67f7b20938ebc9d37ef0b1c34af1e536d978c Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 21 Jun 2022 19:55:12 +0200
Subject: [PATCH v7 2/6] Allow tracefile to be stdout

---
 src/test/modules/libpq_pipeline/libpq_pipeline.c | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0ada..8f6f2d4b4b 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -1630,7 +1630,10 @@ main(int argc, char **argv)
 	/* Set the trace file, if requested */
 	if (tracefile != NULL)
 	{
-		trace = fopen(tracefile, "w");
+		if (strcmp(tracefile, "-") == 0)
+			trace = stdout;
+		else
+			trace = fopen(tracefile, "w");
 		if (trace == NULL)
 			pg_fatal("could not open file \"%s\": %m", tracefile);
 
-- 
2.30.2

v7-0003-if-the-queue-becomes-empty-reset-the-tail-too.patchtext/x-diff; charset=us-asciiDownload
From 435c04917e5081375b25f9f1ea638a3c9c5b1996 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 21 Jun 2022 19:35:26 +0200
Subject: [PATCH v7 3/6] if the queue becomes empty, reset the tail too

---
 src/interfaces/libpq/fe-exec.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4180683194..ed26bab033 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2988,6 +2988,10 @@ pqCommandQueueAdvance(PGconn *conn)
 	prevquery = conn->cmd_queue_head;
 	conn->cmd_queue_head = conn->cmd_queue_head->next;
 
+	/* If the queue is now empty, reset the tail too */
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_tail = NULL;
+
 	/* and make it recyclable */
 	prevquery->next = NULL;
 	pqRecycleCmdQueueEntry(conn, prevquery);
-- 
2.30.2

v7-0004-add-some-tests-maybe-wrong.patchtext/x-diff; charset=us-asciiDownload
From 25943c6a5e0df502839851e7f525f37a73182ac6 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 24 Jun 2022 17:43:16 +0200
Subject: [PATCH v7 4/6] add some tests, maybe wrong

---
 .../modules/libpq_pipeline/libpq_pipeline.c   | 197 ++++++++++++++++++
 .../libpq_pipeline/traces/pipeline_idle.trace |  38 ++++
 2 files changed, 235 insertions(+)
 create mode 100644 src/test/modules/libpq_pipeline/traces/pipeline_idle.trace

diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 8f6f2d4b4b..a3ac10ae4a 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -968,6 +968,198 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print notices, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+	int	   *n_notices = (int *) arg;
+
+	(*n_notices)++;
+	fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
+/* Verify behavior in "idle" state */
+static void
+test_pipeline_idle(PGconn *conn)
+{
+	PGresult   *res;
+	int			n_notices = 0;
+
+	fprintf(stderr, "\npipeline idle...\n");
+
+	PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
+	/*
+	 * Cause a Close message to be sent to the server, and watch libpq's
+	 * reaction to the resulting CloseComplete.  libpq must not get in IDLE
+	 * state until that has been received.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Must not have got any notices here; note bug as described in
+	 * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+	 */
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	fprintf(stderr, "ok - 1\n");
+
+	/*
+	 * Verify that we can send a query using simple query protocol after one
+	 * in pipeline mode.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	fprintf(stderr, "going to expect null-terminating result\n");
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("got unexpected non-null result");
+	fprintf(stderr, "received null-terminating result, exiting pipeline mode\n");
+	/* We can exit pipeline mode now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+	res = PQexec(conn, "SELECT 2");
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	if (res == NULL)
+		pg_fatal("PQexec returned NULL");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from non-pipeline query",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	fprintf(stderr, "ok - 2\n");
+
+	/*
+	 * Case 2: exiting pipeline mode is not OK if a second command is sent.
+	 */
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	/* read terminating null from first query */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/* Try to exit pipeline mode in pipeline-idle state */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQexitPipelineMode(conn) == 1)
+		pg_fatal("exiting pipeline succeeded when it shouldn't");
+	if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
+				strlen("cannot exit pipeline mode")) != 0)
+		pg_fatal("did not get expected error; got: %s",
+				 PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from second pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	if (PQexitPipelineMode(conn) == 1)
+		pg_fatal("exiting pipeline succeeded when it shouldn't");
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	fprintf(stderr, "ok - 3\n");
+}
+
+
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
@@ -1160,6 +1352,8 @@ test_singlerowmode(PGconn *conn)
 
 	if (PQexitPipelineMode(conn) != 1)
 		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
 }
 
 /*
@@ -1549,6 +1743,7 @@ print_test_list(void)
 	printf("multi_pipelines\n");
 	printf("nosync\n");
 	printf("pipeline_abort\n");
+	printf("pipeline_idle\n");
 	printf("pipelined_insert\n");
 	printf("prepared\n");
 	printf("simple_pipeline\n");
@@ -1653,6 +1848,8 @@ main(int argc, char **argv)
 		test_nosync(conn);
 	else if (strcmp(testname, "pipeline_abort") == 0)
 		test_pipeline_abort(conn);
+	else if (strcmp(testname, "pipeline_idle") == 0)
+		test_pipeline_idle(conn);
 	else if (strcmp(testname, "pipelined_insert") == 0)
 		test_pipelined_insert(conn, numrows);
 	else if (strcmp(testname, "prepared") == 0)
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
new file mode 100644
index 0000000000..7d07c296f7
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
@@ -0,0 +1,38 @@
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Sync
+B	4	CloseComplete
+B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+F	16	Parse	 "" "SELECT 2" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	CloseComplete
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
+F	4	Terminate
-- 
2.30.2

v7-0005-add-PIPELINE_IDLE-state.patchtext/x-diff; charset=us-asciiDownload
From 0630f5d00e21e91c0457b6442c6fa23827241d86 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 24 Jun 2022 18:13:58 +0200
Subject: [PATCH v7 5/6] add PIPELINE_IDLE state

---
 src/interfaces/libpq/fe-connect.c   |  1 +
 src/interfaces/libpq/fe-exec.c      | 58 ++++++++++++++++++++---------
 src/interfaces/libpq/fe-protocol3.c | 12 ------
 src/interfaces/libpq/libpq-int.h    |  3 +-
 4 files changed, 44 insertions(+), 30 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 709ba15220..afd0bc809a 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn)
 {
 	if (!conn || conn->status != CONNECTION_OK)
 		return PQTRANS_UNKNOWN;
+	/* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */
 	if (conn->asyncStatus != PGASYNC_IDLE)
 		return PQTRANS_ACTIVE;
 	return conn->xactStatus;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ed26bab033..7cb803de94 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_IDLE ||
+				conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 			{
 				resetPQExpBuffer(&conn->errorMessage);
 				pqPipelineProcessQueue(conn);
@@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		switch (conn->asyncStatus)
 		{
 			case PGASYNC_IDLE:
+			case PGASYNC_PIPELINE_IDLE:
 			case PGASYNC_READY:
 			case PGASYNC_READY_MORE:
 			case PGASYNC_BUSY:
 				/* ok to queue */
 				break;
+
 			case PGASYNC_COPY_IN:
 			case PGASYNC_COPY_OUT:
 			case PGASYNC_COPY_BOTH:
@@ -2047,19 +2050,22 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.  Also, since this
-				 * is the start of the results of the next query, clear any
-				 * prior error message.
-				 */
-				resetPQExpBuffer(&conn->errorMessage);
-				pqPipelineProcessQueue(conn);
-			}
 			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query, if any, when we're called next.  If there's
+			 * no next element in the command queue, this gets us in IDLE
+			 * state.
+			 */
+			resetPQExpBuffer(&conn->errorMessage);
+			pqPipelineProcessQueue(conn);
+			res = NULL;			/* query is complete */
+			break;
+
 		case PGASYNC_READY:
 
 			/*
@@ -2080,7 +2086,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -2939,6 +2945,7 @@ PQexitPipelineMode(PGconn *conn)
 	{
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* there are some uncollected results */
 			appendPQExpBufferStr(&conn->errorMessage,
 								 libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
@@ -3014,15 +3021,31 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_BUSY:
 			/* client still has to process current query or results */
 			return;
+
 		case PGASYNC_IDLE:
+			/*
+			 * When in IDLE mode, there are no further commands to process,
+			 * and no further action to take on the queue, since it must
+			 * be empty.
+			 */
+			Assert(conn->cmd_queue_head == NULL);
+			return;
+
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
 			/* next query please */
 			break;
 	}
 
-	/* Nothing to do if not in pipeline mode, or queue is empty */
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
-		conn->cmd_queue_head == NULL)
+	/*
+	 * If there are no further commands to process in the queue, get us in
+	 * "real idle" mode now.
+	 */
+	if (conn->cmd_queue_head == NULL)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
 
 	/* Initialize async result-accumulation state */
 	pqClearAsyncResult(conn);
@@ -3109,6 +3132,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9ab3bf1fcb..bab8926a63 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 334aea4b6e..e40a657f55 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -224,7 +224,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* "Idle" between commands in pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
-- 
2.30.2

v7-0006-Add-the-CLOSE-message-fix.patchtext/x-diff; charset=us-asciiDownload
From f50dca0ee1f0a71111b583c7e3220997ad950ea2 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Sun, 26 Jun 2022 19:53:35 +0200
Subject: [PATCH v7 6/6] Add the CLOSE message fix

---
 src/interfaces/libpq/fe-exec.c      | 37 +++++++++++++++++++++++++++++
 src/interfaces/libpq/fe-protocol3.c | 18 +++++++++++++-
 src/interfaces/libpq/libpq-int.h    |  3 ++-
 3 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 7cb803de94..b1a3378b8f 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1339,6 +1339,7 @@ static int
 PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 {
 	PGcmdQueueEntry *entry = NULL;
+	PGcmdQueueEntry *entry2 = NULL;
 
 	if (!PQsendQueryStart(conn, newQuery))
 		return 0;
@@ -1354,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 	entry = pqAllocCmdQueueEntry(conn);
 	if (entry == NULL)
 		return 0;				/* error msg already set */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		entry2 = pqAllocCmdQueueEntry(conn);
+		if (entry2 == NULL)
+			goto sendFailed;
+	}
 
 	/* Send the query message(s) */
 	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1423,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
+
+	/*
+	 * When pipeline mode is in use, we need a second entry in the command
+	 * queue to represent Close Portal message.  This allows us later to wait
+	 * for the CloseComplete message to be received before getting in IDLE
+	 * state.
+	 */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		entry2->queryclass = PGQUERY_CLOSE;
+		entry2->query = NULL;
+		pqAppendCmdQueueEntry(conn, entry2);
+	}
+
 	return 1;
 
 sendFailed:
@@ -2130,6 +2151,22 @@ PQgetResult(PGconn *conn)
 			break;
 	}
 
+	/* If the next command we expect is CLOSE, read and consume it */
+	if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+		conn->cmd_queue_head &&
+		conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+	{
+		if (res && res->resultStatus != PGRES_FATAL_ERROR)
+		{
+			conn->asyncStatus = PGASYNC_BUSY;
+			parseInput(conn);
+			conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+		}
+		else
+			/* we won't ever see the Close */
+			pqCommandQueueAdvance(conn);
+	}
+
 	if (res)
 	{
 		int			i;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index bab8926a63..c33f904db4 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -284,8 +284,24 @@ pqParseInput3(PGconn *conn)
 					}
 					break;
 				case '2':		/* Bind Complete */
+					/* Nothing to do for this message type */
+					break;
 				case '3':		/* Close Complete */
-					/* Nothing to do for these message types */
+					/*
+					 * If we get CloseComplete when waiting for it, consume
+					 * the queue element and keep going.  A result is not
+					 * expected from this message; it is just there so that
+					 * we know to wait for it when PQsendQuery is used in
+					 * pipeline mode, before going in IDLE state.  Failing to
+					 * do this makes us receive CloseComplete when IDLE, which
+					 * creates problems.
+					 */
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+					{
+						pqCommandQueueAdvance(conn);
+					}
+
 					break;
 				case 'S':		/* parameter status */
 					if (getParameterStatus(conn))
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index e40a657f55..df2f17721c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -311,7 +311,8 @@ typedef enum
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
 	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
-	PGQUERY_SYNC				/* Sync (at end of a pipeline) */
+	PGQUERY_SYNC,				/* Sync (at end of a pipeline) */
+	PGQUERY_CLOSE
 } PGQueryClass;
 
 /*
-- 
2.30.2

#19Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#18)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

Thanks for the further testing scenario.

At Wed, 29 Jun 2022 14:09:17 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

So I wrote some more test scenarios for this, and as I wrote in some
other thread, I realized that there are more problems than just some
NOTICE trouble. For instance, if you send a query, then read the result
but not the terminating NULL then send another query, everything gets
confused; the next thing you'll read is the result for the second query,
without having read the NULL that terminates the results of the first
query. Any application that expects the usual flow of results will be
confused. Kyotaro's patch to add PIPELINE_IDLE fixes this bit too, as
far as I can tell.

However, another problem case, not fixed by PIPELINE_IDLE, occurs if you
exit pipeline mode after PQsendQuery() and then immediately use
PQexec(). The CloseComplete will be received at the wrong time, and a
notice is emitted nevertheless.

Mmm. My patch moves the point of failure of the scenario a bit but
still a little short. However, as my understanding, it seems like the
task of the PQpipelineSync()-PQgetResult() pair to consume the
CloseComplete. If Iinserted PQpipelineSync() just after PQsendQuery()
and called PQgetResult() for PGRES_PIPELINE_SYNC before
PQexitPipelineMode(), the out-of-sync CloseComplete is not seen in the
scenario. But if it is right, I'd like to complain about the
obscure-but-stiff protocol of pipleline mode..

I spent a lot of time trying to understand how to fix this last bit, and
the solution I came up with is that PQsendQuery() must add a second
entry to the command queue after the PGQUERY_EXTENDED one, to match the
CloseComplete message being expected; with that entry in the queue,
PQgetResult will really only get to IDLE mode after the Close has been
seen, which is what we want. I named it PGQUERY_CLOSE.

Sadly, some hacks are needed to make this work fully:

1. the client is never expecting that PQgetResult() would return
anything for the CloseComplete, so something needs to consume the
CloseComplete silently (including the queue entry for it) when it is
received; I chose to do this directly in pqParseInput3. I tried to
make PQgetResult itself do it, but it became a pile of hacks until I
was no longer sure what was going on. Putting it in fe-protocol3.c
ends up a lot cleaner. However, we still need PQgetResult to invoke
parseInput() at the point where Close is expected.

2. if an error occurs while executing the query, the CloseComplete will
of course never arrive, so we need to erase it from the queue
silently if we're returning an error.

I toyed with the idea of having parseInput() produce a PGresult that
encodes the Close message, and have PQgetResult consume and discard
that, then read some further message to have something to return. But
it seemed inefficient and equally ugly and I'm not sure that flow
control is any simpler.

I think another possibility would be to make PQexitPipelineMode
responsible for /something/, but I'm not sure what that would be.
Checking the queue and seeing if the next message is CloseComplete, then
eating that message before exiting pipeline mode? That seems way too
magical. I didn't attempt this.

I attach a patch series that implements the proposed fix (again for
REL_14_STABLE) in steps, to make it easy to read. I intend to squash
the whole lot into a single commit before pushing. But while writing
this email it occurred to me that I need to add at least one more test,
to receive a WARNING while waiting for CloseComplete. AFAICT it should
work, but better make sure.

I produced pipeline_idle.trace file by running the test in the fully

By the perl script doesn't produce the trace file since the list in
$cmptrace line doesn't contain pipleline_idle..

fixed tree, then used it to verify that all tests fail in different ways
when run without the fixes. The first fix with PIPELINE_IDLE fixes some
of these failures, and the PGQUERY_CLOSE one fixes the remaining one.
Reading the trace file, it looks correct to me.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#20Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Kyotaro Horiguchi (#19)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

On 2022-Jul-04, Kyotaro Horiguchi wrote:

At Wed, 29 Jun 2022 14:09:17 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

However, another problem case, not fixed by PIPELINE_IDLE, occurs if you
exit pipeline mode after PQsendQuery() and then immediately use
PQexec(). The CloseComplete will be received at the wrong time, and a
notice is emitted nevertheless.

Mmm. My patch moves the point of failure of the scenario a bit but
still a little short. However, as my understanding, it seems like the
task of the PQpipelineSync()-PQgetResult() pair to consume the
CloseComplete. If Iinserted PQpipelineSync() just after PQsendQuery()
and called PQgetResult() for PGRES_PIPELINE_SYNC before
PQexitPipelineMode(), the out-of-sync CloseComplete is not seen in the
scenario. But if it is right, I'd like to complain about the
obscure-but-stiff protocol of pipleline mode..

Yeah, if you introduce PQpipelineSync then I think it'll work okay, but
my point here was to make it work without requiring that; that's why I
wrote the test to use PQsendFlushRequest instead.

BTW I patch for the problem with uniqviol also (not fixed by v7). I'll
send an updated patch in a little while.

I produced pipeline_idle.trace file by running the test in the fully

By the perl script doesn't produce the trace file since the list in
$cmptrace line doesn't contain pipleline_idle..

Ouch, of course, thanks for noticing.

--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/

#21Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Alvaro Herrera (#20)
1 attachment(s)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

On 2022-Jul-04, Alvaro Herrera wrote:

BTW I patch for the problem with uniqviol also (not fixed by v7). I'll
send an updated patch in a little while.

Here it is. I ran "libpq_pipeline uniqviol" in a tight loop a few
thousand times and didn't get any error. Before these fixes, it would
fail in half a dozen iterations.

--
Álvaro Herrera Breisgau, Deutschland — https://www.EnterpriseDB.com/

Attachments:

v8-0001-libpq-Improve-idle-state-handling-in-pipeline-mod.patchtext/x-diff; charset=us-asciiDownload
From d4d446531d52a34115fe7446732d61ae8f61d8bb Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 1 Jul 2022 18:11:10 +0200
Subject: [PATCH v8] libpq: Improve idle state handling in pipeline mode

We were going into IDLE state too soon when executing queries via
PQsendQuery in pipeline mode, causing several scenarios to misbehave in
different ways -- most notably, as reported by Daniele Varrazzo, that a
warning message is produced by libpq:
  message type 0x33 arrived from server while idle
But it is also possible, if queries are sent and results consumed not in
lockstep, for the expected mediating NULL result values from PQgetResult
to be lost (a problem which has not been reported, but which is more
serious).

Fix this by introducing two new concepts: one is a command queue element
PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server
response to the Close message that is sent by PQsendQuery.  Because the
application is not expecting any PGresult from this, the mechanism to
consume it is a bit hackish.

The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE
state for libpq's state machine to differentiate "really idle" from
merely "the idle state that occurs in between reading results from the
server for elements in the pipeline".  This makes libpq not go fully
IDLE when the libpq command queue contains entries; in normal cases, we
only go IDLE once at the end of the pipeline, when the server response
to the final SYNC message is received.  (However, there are corner cases
it doesn't fix, such as terminating the query sequence by
PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is
what requires PGQUERY_CLOSE bit above.)

This last bit helps make the libpq state machine clearer; in particular
we can get rid of an ugly hack in pqParseInput3 to avoid considering
IDLE as such when the command queue contains entries.

A new test mode is added to libpq_pipeline.c to tickle some related
problematic cases.

Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
---
 src/interfaces/libpq/fe-exec.c                | 116 ++++++++--
 src/interfaces/libpq/fe-protocol3.c           |  30 +--
 src/interfaces/libpq/libpq-int.h              |   6 +-
 .../modules/libpq_pipeline/libpq_pipeline.c   | 215 +++++++++++++++++-
 .../libpq_pipeline/t/001_libpq_pipeline.pl    |   3 +-
 .../libpq_pipeline/traces/pipeline_idle.trace |  93 ++++++++
 6 files changed, 425 insertions(+), 38 deletions(-)
 create mode 100644 src/test/modules/libpq_pipeline/traces/pipeline_idle.trace

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4180683194..e22d0814f0 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_IDLE ||
+				conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 			{
 				resetPQExpBuffer(&conn->errorMessage);
 				pqPipelineProcessQueue(conn);
@@ -1338,6 +1339,7 @@ static int
 PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 {
 	PGcmdQueueEntry *entry = NULL;
+	PGcmdQueueEntry *entry2 = NULL;
 
 	if (!PQsendQueryStart(conn, newQuery))
 		return 0;
@@ -1353,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 	entry = pqAllocCmdQueueEntry(conn);
 	if (entry == NULL)
 		return 0;				/* error msg already set */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		entry2 = pqAllocCmdQueueEntry(conn);
+		if (entry2 == NULL)
+			goto sendFailed;
+	}
 
 	/* Send the query message(s) */
 	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1422,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
+
+	/*
+	 * When pipeline mode is in use, we need a second entry in the command
+	 * queue to represent Close Portal message.  This allows us later to wait
+	 * for the CloseComplete message to be received before getting in IDLE
+	 * state.
+	 */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		entry2->queryclass = PGQUERY_CLOSE;
+		entry2->query = NULL;
+		pqAppendCmdQueueEntry(conn, entry2);
+	}
+
 	return 1;
 
 sendFailed:
@@ -1667,11 +1689,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		switch (conn->asyncStatus)
 		{
 			case PGASYNC_IDLE:
+			case PGASYNC_PIPELINE_IDLE:
 			case PGASYNC_READY:
 			case PGASYNC_READY_MORE:
 			case PGASYNC_BUSY:
 				/* ok to queue */
 				break;
+
 			case PGASYNC_COPY_IN:
 			case PGASYNC_COPY_OUT:
 			case PGASYNC_COPY_BOTH:
@@ -2047,19 +2071,22 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.  Also, since this
-				 * is the start of the results of the next query, clear any
-				 * prior error message.
-				 */
-				resetPQExpBuffer(&conn->errorMessage);
-				pqPipelineProcessQueue(conn);
-			}
 			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query, if any, when we're called next.  If there's
+			 * no next element in the command queue, this gets us in IDLE
+			 * state.
+			 */
+			resetPQExpBuffer(&conn->errorMessage);
+			pqPipelineProcessQueue(conn);
+			res = NULL;			/* query is complete */
+			break;
+
 		case PGASYNC_READY:
 
 			/*
@@ -2080,7 +2107,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -2124,6 +2151,22 @@ PQgetResult(PGconn *conn)
 			break;
 	}
 
+	/* If the next command we expect is CLOSE, read and consume it */
+	if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+		conn->cmd_queue_head &&
+		conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+	{
+		if (res && res->resultStatus != PGRES_FATAL_ERROR)
+		{
+			conn->asyncStatus = PGASYNC_BUSY;
+			parseInput(conn);
+			conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+		}
+		else
+			/* we won't ever see the Close */
+			pqCommandQueueAdvance(conn);
+	}
+
 	if (res)
 	{
 		int			i;
@@ -2932,7 +2975,10 @@ PQexitPipelineMode(PGconn *conn)
 	if (!conn)
 		return 0;
 
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+		(conn->asyncStatus == PGASYNC_IDLE ||
+		 conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
+		conn->cmd_queue_head == NULL)
 		return 1;
 
 	switch (conn->asyncStatus)
@@ -2949,9 +2995,16 @@ PQexitPipelineMode(PGconn *conn)
 								 libpq_gettext("cannot exit pipeline mode while busy\n"));
 			return 0;
 
-		default:
+		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK */
 			break;
+
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("cannot exit pipeline mode while in COPY\n"));
 	}
 
 	/* still work to process */
@@ -2988,6 +3041,10 @@ pqCommandQueueAdvance(PGconn *conn)
 	prevquery = conn->cmd_queue_head;
 	conn->cmd_queue_head = conn->cmd_queue_head->next;
 
+	/* If the queue is now empty, reset the tail too */
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_tail = NULL;
+
 	/* and make it recyclable */
 	prevquery->next = NULL;
 	pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3010,15 +3067,35 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_BUSY:
 			/* client still has to process current query or results */
 			return;
+
 		case PGASYNC_IDLE:
+			/*
+			 * If we're in IDLE mode and there's some command in the queue,
+			 * get us into PIPELINE_IDLE mode and process normally.  Otherwise
+			 * there's nothing for us to do.
+			 */
+			if (conn->cmd_queue_head != NULL)
+			{
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+				break;
+			}
+			return;
+
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
 			/* next query please */
 			break;
 	}
 
-	/* Nothing to do if not in pipeline mode, or queue is empty */
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
-		conn->cmd_queue_head == NULL)
+	/*
+	 * If there are no further commands to process in the queue, get us in
+	 * "real idle" mode now.
+	 */
+	if (conn->cmd_queue_head == NULL)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
 
 	/* Initialize async result-accumulation state */
 	pqClearAsyncResult(conn);
@@ -3105,6 +3182,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9ab3bf1fcb..c33f904db4 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
 					}
 					break;
 				case '2':		/* Bind Complete */
+					/* Nothing to do for this message type */
+					break;
 				case '3':		/* Close Complete */
-					/* Nothing to do for these message types */
+					/*
+					 * If we get CloseComplete when waiting for it, consume
+					 * the queue element and keep going.  A result is not
+					 * expected from this message; it is just there so that
+					 * we know to wait for it when PQsendQuery is used in
+					 * pipeline mode, before going in IDLE state.  Failing to
+					 * do this makes us receive CloseComplete when IDLE, which
+					 * creates problems.
+					 */
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+					{
+						pqCommandQueueAdvance(conn);
+					}
+
 					break;
 				case 'S':		/* parameter status */
 					if (getParameterStatus(conn))
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 334aea4b6e..df2f17721c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -224,7 +224,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* "Idle" between commands in pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
@@ -310,7 +311,8 @@ typedef enum
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
 	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
-	PGQUERY_SYNC				/* Sync (at end of a pipeline) */
+	PGQUERY_SYNC,				/* Sync (at end of a pipeline) */
+	PGQUERY_CLOSE
 } PGQueryClass;
 
 /*
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c27c4e0ada..dfab924965 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -581,8 +581,6 @@ test_pipeline_abort(PGconn *conn)
 	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
 		pg_fatal("exiting pipeline mode didn't seem to work");
 
-	fprintf(stderr, "ok\n");
-
 	/*-
 	 * Since we fired the pipelines off without a surrounding xact, the results
 	 * should be:
@@ -614,6 +612,8 @@ test_pipeline_abort(PGconn *conn)
 	}
 
 	PQclear(res);
+
+	fprintf(stderr, "ok\n");
 }
 
 /* State machine enum for test_pipelined_insert */
@@ -968,6 +968,207 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/* Notice processor: print notices, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+	int	   *n_notices = (int *) arg;
+
+	(*n_notices)++;
+	fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
+/* Verify behavior in "idle" state */
+static void
+test_pipeline_idle(PGconn *conn)
+{
+	PGresult   *res;
+	int			n_notices = 0;
+
+	fprintf(stderr, "\npipeline idle...\n");
+
+	PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
+	/*
+	 * Cause a Close message to be sent to the server, and watch libpq's
+	 * reaction to the resulting CloseComplete.  libpq must not get in IDLE
+	 * state until that has been received.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Must not have got any notices here; note bug as described in
+	 * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+	 */
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	fprintf(stderr, "ok - 1\n");
+
+	/*
+	 * Verify that we can send a query using simple query protocol after one
+	 * in pipeline mode.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("got unexpected non-null result");
+	/* We can exit pipeline mode now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+	res = PQexec(conn, "SELECT 2");
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	if (res == NULL)
+		pg_fatal("PQexec returned NULL");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from non-pipeline query",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	fprintf(stderr, "ok - 2\n");
+
+	/*
+	 * Case 2: exiting pipeline mode is not OK if a second command is sent.
+	 */
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	/* read terminating null from first query */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	/* Try to exit pipeline mode in pipeline-idle state */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT 1") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQsendQuery(conn, "SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQexitPipelineMode(conn) == 1)
+		pg_fatal("exiting pipeline succeeded when it shouldn't");
+	if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
+				strlen("cannot exit pipeline mode")) != 0)
+		pg_fatal("did not get expected error; got: %s",
+				 PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from second pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+	if (n_notices > 0)
+		pg_fatal("got %d notice(s)", n_notices);
+	fprintf(stderr, "ok - 3\n");
+
+	/* Have a WARNING in the middle of a resultset */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
+	if (PQsendQuery(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("unexpected NULL result received");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
+	fprintf(stderr, "ok - 4\n");
+}
+
 static void
 test_simple_pipeline(PGconn *conn)
 {
@@ -1160,6 +1361,8 @@ test_singlerowmode(PGconn *conn)
 
 	if (PQexitPipelineMode(conn) != 1)
 		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
 }
 
 /*
@@ -1549,6 +1752,7 @@ print_test_list(void)
 	printf("multi_pipelines\n");
 	printf("nosync\n");
 	printf("pipeline_abort\n");
+	printf("pipeline_idle\n");
 	printf("pipelined_insert\n");
 	printf("prepared\n");
 	printf("simple_pipeline\n");
@@ -1630,7 +1834,10 @@ main(int argc, char **argv)
 	/* Set the trace file, if requested */
 	if (tracefile != NULL)
 	{
-		trace = fopen(tracefile, "w");
+		if (strcmp(tracefile, "-") == 0)
+			trace = stdout;
+		else
+			trace = fopen(tracefile, "w");
 		if (trace == NULL)
 			pg_fatal("could not open file \"%s\": %m", tracefile);
 
@@ -1650,6 +1857,8 @@ main(int argc, char **argv)
 		test_nosync(conn);
 	else if (strcmp(testname, "pipeline_abort") == 0)
 		test_pipeline_abort(conn);
+	else if (strcmp(testname, "pipeline_idle") == 0)
+		test_pipeline_idle(conn);
 	else if (strcmp(testname, "pipelined_insert") == 0)
 		test_pipelined_insert(conn, numrows);
 	else if (strcmp(testname, "prepared") == 0)
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index d8d496c995..b02928cad2 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -26,7 +26,8 @@ for my $testname (@tests)
 	my @extraargs = ('-r', $numrows);
 	my $cmptrace = grep(/^$testname$/,
 		qw(simple_pipeline nosync multi_pipelines prepared singlerow
-		  pipeline_abort transaction disallowed_in_pipeline)) > 0;
+		  pipeline_abort pipeline_idle transaction
+		  disallowed_in_pipeline)) > 0;
 
 	# For a bunch of tests, generate a libpq trace file too.
 	my $traceout = "$TestLib::tmp_check/traces/$testname.trace";
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
new file mode 100644
index 0000000000..3957ee4dfe
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
@@ -0,0 +1,93 @@
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	4	Sync
+B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	13	Query	 "SELECT 2"
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
+B	5	ReadyForQuery	 I
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	16	Parse	 "" "SELECT 2" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	16	Parse	 "" "SELECT 1" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	16	Parse	 "" "SELECT 2" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '2'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	49	Parse	 "" "SELECT pg_catalog.pg_advisory_unlock(1,1)" 0
+F	12	Bind	 "" "" 0 0 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	6	Close	 P ""
+F	4	Flush
+B	4	ParseComplete
+B	4	BindComplete
+B	43	RowDescription	 1 "pg_advisory_unlock" NNNN 0 NNNN 1 -1 0
+B	NN	NoticeResponse	 S "WARNING" V "WARNING" C "01000" M "you don't own a lock of type ExclusiveLock" F "SSSS" L "SSSS" R "SSSS" \x00
+B	11	DataRow	 1 1 'f'
+B	13	CommandComplete	 "SELECT 1"
+B	4	CloseComplete
+F	4	Terminate
-- 
2.30.2

#22Alvaro Herrera
alvherre@alvh.no-ip.org
In reply to: Alvaro Herrera (#21)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

I have pushed this to all three branches. Thanks!

--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
"It takes less than 2 seconds to get to 78% complete; that's a good sign.
A few seconds later it's at 90%, but it seems to have stuck there. Did
somebody make percentages logarithmic while I wasn't looking?"
http://smylers.hates-software.com/2005/09/08/1995c749.html

#23Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Alvaro Herrera (#20)
Re: Using PQexecQuery in pipeline mode produces unexpected Close messages

At Mon, 4 Jul 2022 10:49:33 +0200, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote in

Mmm. My patch moves the point of failure of the scenario a bit but
still a little short. However, as my understanding, it seems like the
task of the PQpipelineSync()-PQgetResult() pair to consume the
CloseComplete. If Iinserted PQpipelineSync() just after PQsendQuery()
and called PQgetResult() for PGRES_PIPELINE_SYNC before
PQexitPipelineMode(), the out-of-sync CloseComplete is not seen in the
scenario. But if it is right, I'd like to complain about the
obscure-but-stiff protocol of pipleline mode..

Yeah, if you introduce PQpipelineSync then I think it'll work okay, but
my point here was to make it work without requiring that; that's why I
wrote the test to use PQsendFlushRequest instead.

A bit too late, but it is good to make state-transition simpler.

--
Kyotaro Horiguchi
NTT Open Source Software Center