From f50dca0ee1f0a71111b583c7e3220997ad950ea2 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Sun, 26 Jun 2022 19:53:35 +0200
Subject: [PATCH v7 6/6] Add the CLOSE message fix

---
 src/interfaces/libpq/fe-exec.c      | 37 +++++++++++++++++++++++++++++
 src/interfaces/libpq/fe-protocol3.c | 18 +++++++++++++-
 src/interfaces/libpq/libpq-int.h    |  3 ++-
 3 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 7cb803de94..b1a3378b8f 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1339,6 +1339,7 @@ static int
 PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 {
 	PGcmdQueueEntry *entry = NULL;
+	PGcmdQueueEntry *entry2 = NULL;
 
 	if (!PQsendQueryStart(conn, newQuery))
 		return 0;
@@ -1354,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 	entry = pqAllocCmdQueueEntry(conn);
 	if (entry == NULL)
 		return 0;				/* error msg already set */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		entry2 = pqAllocCmdQueueEntry(conn);
+		if (entry2 == NULL)
+			goto sendFailed;
+	}
 
 	/* Send the query message(s) */
 	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1423,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
+
+	/*
+	 * When pipeline mode is in use, we need a second entry in the command
+	 * queue to represent Close Portal message.  This allows us later to wait
+	 * for the CloseComplete message to be received before getting in IDLE
+	 * state.
+	 */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		entry2->queryclass = PGQUERY_CLOSE;
+		entry2->query = NULL;
+		pqAppendCmdQueueEntry(conn, entry2);
+	}
+
 	return 1;
 
 sendFailed:
@@ -2130,6 +2151,22 @@ PQgetResult(PGconn *conn)
 			break;
 	}
 
+	/* If the next command we expect is CLOSE, read and consume it */
+	if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+		conn->cmd_queue_head &&
+		conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+	{
+		if (res && res->resultStatus != PGRES_FATAL_ERROR)
+		{
+			conn->asyncStatus = PGASYNC_BUSY;
+			parseInput(conn);
+			conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+		}
+		else
+			/* we won't ever see the Close */
+			pqCommandQueueAdvance(conn);
+	}
+
 	if (res)
 	{
 		int			i;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index bab8926a63..c33f904db4 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -284,8 +284,24 @@ pqParseInput3(PGconn *conn)
 					}
 					break;
 				case '2':		/* Bind Complete */
+					/* Nothing to do for this message type */
+					break;
 				case '3':		/* Close Complete */
-					/* Nothing to do for these message types */
+					/*
+					 * If we get CloseComplete when waiting for it, consume
+					 * the queue element and keep going.  A result is not
+					 * expected from this message; it is just there so that
+					 * we know to wait for it when PQsendQuery is used in
+					 * pipeline mode, before going in IDLE state.  Failing to
+					 * do this makes us receive CloseComplete when IDLE, which
+					 * creates problems.
+					 */
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+					{
+						pqCommandQueueAdvance(conn);
+					}
+
 					break;
 				case 'S':		/* parameter status */
 					if (getParameterStatus(conn))
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index e40a657f55..df2f17721c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -311,7 +311,8 @@ typedef enum
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
 	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
-	PGQUERY_SYNC				/* Sync (at end of a pipeline) */
+	PGQUERY_SYNC,				/* Sync (at end of a pipeline) */
+	PGQUERY_CLOSE
 } PGQueryClass;
 
 /*
-- 
2.30.2

