From 0630f5d00e21e91c0457b6442c6fa23827241d86 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 24 Jun 2022 18:13:58 +0200
Subject: [PATCH v7 5/6] add PIPELINE_IDLE state

---
 src/interfaces/libpq/fe-connect.c   |  1 +
 src/interfaces/libpq/fe-exec.c      | 58 ++++++++++++++++++++---------
 src/interfaces/libpq/fe-protocol3.c | 12 ------
 src/interfaces/libpq/libpq-int.h    |  3 +-
 4 files changed, 44 insertions(+), 30 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 709ba15220..afd0bc809a 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn)
 {
 	if (!conn || conn->status != CONNECTION_OK)
 		return PQTRANS_UNKNOWN;
+	/* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */
 	if (conn->asyncStatus != PGASYNC_IDLE)
 		return PQTRANS_ACTIVE;
 	return conn->xactStatus;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ed26bab033..7cb803de94 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 			 * itself consume commands from the queue; if we're in any other
 			 * state, we don't have to do anything.
 			 */
-			if (conn->asyncStatus == PGASYNC_IDLE)
+			if (conn->asyncStatus == PGASYNC_IDLE ||
+				conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
 			{
 				resetPQExpBuffer(&conn->errorMessage);
 				pqPipelineProcessQueue(conn);
@@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 		switch (conn->asyncStatus)
 		{
 			case PGASYNC_IDLE:
+			case PGASYNC_PIPELINE_IDLE:
 			case PGASYNC_READY:
 			case PGASYNC_READY_MORE:
 			case PGASYNC_BUSY:
 				/* ok to queue */
 				break;
+
 			case PGASYNC_COPY_IN:
 			case PGASYNC_COPY_OUT:
 			case PGASYNC_COPY_BOTH:
@@ -2047,19 +2050,22 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
-			{
-				/*
-				 * We're about to return the NULL that terminates the round of
-				 * results from the current query; prepare to send the results
-				 * of the next query when we're called next.  Also, since this
-				 * is the start of the results of the next query, clear any
-				 * prior error message.
-				 */
-				resetPQExpBuffer(&conn->errorMessage);
-				pqPipelineProcessQueue(conn);
-			}
 			break;
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+			/*
+			 * We're about to return the NULL that terminates the round of
+			 * results from the current query; prepare to send the results
+			 * of the next query, if any, when we're called next.  If there's
+			 * no next element in the command queue, this gets us in IDLE
+			 * state.
+			 */
+			resetPQExpBuffer(&conn->errorMessage);
+			pqPipelineProcessQueue(conn);
+			res = NULL;			/* query is complete */
+			break;
+
 		case PGASYNC_READY:
 
 			/*
@@ -2080,7 +2086,7 @@ PQgetResult(PGconn *conn)
 				 * We're about to send the results of the current query.  Set
 				 * us idle now, and ...
 				 */
-				conn->asyncStatus = PGASYNC_IDLE;
+				conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
 
 				/*
 				 * ... in cases when we're sending a pipeline-sync result,
@@ -2939,6 +2945,7 @@ PQexitPipelineMode(PGconn *conn)
 	{
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* there are some uncollected results */
 			appendPQExpBufferStr(&conn->errorMessage,
 								 libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
@@ -3014,15 +3021,31 @@ pqPipelineProcessQueue(PGconn *conn)
 		case PGASYNC_BUSY:
 			/* client still has to process current query or results */
 			return;
+
 		case PGASYNC_IDLE:
+			/*
+			 * When in IDLE mode, there are no further commands to process,
+			 * and no further action to take on the queue, since it must
+			 * be empty.
+			 */
+			Assert(conn->cmd_queue_head == NULL);
+			return;
+
+		case PGASYNC_PIPELINE_IDLE:
+			Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
 			/* next query please */
 			break;
 	}
 
-	/* Nothing to do if not in pipeline mode, or queue is empty */
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
-		conn->cmd_queue_head == NULL)
+	/*
+	 * If there are no further commands to process in the queue, get us in
+	 * "real idle" mode now.
+	 */
+	if (conn->cmd_queue_head == NULL)
+	{
+		conn->asyncStatus = PGASYNC_IDLE;
 		return;
+	}
 
 	/* Initialize async result-accumulation state */
 	pqClearAsyncResult(conn);
@@ -3109,6 +3132,7 @@ PQpipelineSync(PGconn *conn)
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
 		case PGASYNC_IDLE:
+		case PGASYNC_PIPELINE_IDLE:
 			/* OK to send sync */
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 9ab3bf1fcb..bab8926a63 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
-			/*
-			 * We're also notionally not-IDLE when in pipeline mode the state
-			 * says "idle" (so we have completed receiving the results of one
-			 * query from the server and dispatched them to the application)
-			 * but another query is queued; yield back control to caller so
-			 * that they can initiate processing of the next query in the
-			 * queue.
-			 */
-			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
-				conn->cmd_queue_head != NULL)
-				return;
-
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 334aea4b6e..e40a657f55 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -224,7 +224,8 @@ typedef enum
 								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_PIPELINE_IDLE,		/* "Idle" between commands in pipeline mode */
 } PGAsyncStatusType;
 
 /* Target server type (decoded value of target_session_attrs) */
-- 
2.30.2

