libpq pipelining

Started by Matt Newellabout 11 years ago11 messages
#1Matt Newell
newellm@blur.com
2 attachment(s)

Hi,

The recent discussion about pipelining in the jodbc driver prompted me to look
at what it would take for libpq.

I have a proof of concept patch working. The results are even more promising
than I expected.

While it's true that many applications and frameworks won't easily benefit, it
amazes me that this hasn't been explored before.

I developed a simple test application that creates a table with a single auto
increment primary key column, then runs a 4 simple queries x times each:

"INSERT INTO test() VALUES ()"
"SELECT * FROM test LIMIT 1"
"SELECT * FROM test"
"DELETE FROM test"

The parameters to testPipelinedSeries are (number of times to execute each
query, maximum number of queued queries).

Results against local server:

testPipelinedSeries(10,1) took 0.020884
testPipelinedSeries(10,3) took 0.020630, speedup 1.01
testPipelinedSeries(10,10) took 0.006265, speedup 3.33
testPipelinedSeries(100,1) took 0.042731
testPipelinedSeries(100,3) took 0.043035, speedup 0.99
testPipelinedSeries(100,10) took 0.037222, speedup 1.15
testPipelinedSeries(100,25) took 0.031223, speedup 1.37
testPipelinedSeries(100,50) took 0.032482, speedup 1.32
testPipelinedSeries(100,100) took 0.031356, speedup 1.36

Results against remote server through ssh tunnel(30-40ms rtt):

testPipelinedSeries(10,1) took 3.2461736
testPipelinedSeries(10,3) took 1.1008443, speedup 2.44
testPipelinedSeries(10,10) took 0.342399, speedup 7.19
testPipelinedSeries(100,1) took 26.25882588
testPipelinedSeries(100,3) took 8.8509234, speedup 3.04
testPipelinedSeries(100,10) took 3.2866285, speedup 9.03
testPipelinedSeries(100,25) took 2.1472847, speedup 17.57
testPipelinedSeries(100,50) took 1.957510, speedup 27.03
testPipelinedSeries(100,100) took 0.690682, speedup 37.47

I plan to write documentation, add regression testing, and do general cleanup
before asking for feedback on the patch itself. Any suggestions about
performance testing or api design would be nice. I haven't played with
changing the sync logic yet, but I'm guessing that an api to allow manual sync
instead of a sync per PQsendQuery will be needed. That could make things
tricky though with multi-statement queries, because currently the only way to
detect when results change from one query to the next are a ReadyForQuery
message.

Matt Newell

Attachments:

testlibpqpipeline.ctext/x-csrc; charset=UTF-8; name=testlibpqpipeline.cDownload
libpq.pipeline.difftext/x-patch; charset=UTF-8; name=libpq.pipeline.diffDownload
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..6bbc6b4 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,6 @@ lo_lseek64                162
 lo_tell64                 163
 lo_truncate64             164
 PQconninfo                165
+PQgetFirstQuery           166
+PQgetLastQuery            167
+PQgetNextQuery            168
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..31fa437 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
 		free(conn->gsslib);
 #endif
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->inBuffer)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
 										 * absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
+	
+	/*
+	 * Link active queries into the free list so we can free them
+	 */
+	if (conn->queryTail)
+	{
+		conn->queryTail->next = conn->queryFree;
+		conn->queryFree = conn->queryHead;
+	}
+	conn->queryHead = conn->queryTail = NULL;
+	
+	/*
+	 * Free all query objects
+	 */
+	while (conn->queryFree)
+	{
+		PGquery * prev = conn->queryFree;
+		conn->queryFree = prev->next;
+		if (prev->querycmd)
+			free(prev->querycmd);
+		free(prev);
+	}
+
 	resetPQExpBuffer(&conn->errorMessage);
 	pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
 	conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
 }
 
 /*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
  * A copy is needed to be able to cancel a running query from a different
  * thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..48fc278 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * row; the original conn->result is left unchanged so that it can be used
 	 * again as the template for future rows.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Copy everything that should be in the result at this point */
 		res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * Success.  In single-row mode, make the result available to the client
 	 * immediately.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Change result status to special single-row value */
 		res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1088,6 +1088,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
+		/* TODO: Still correct ? */
 		conn->asyncStatus = PGASYNC_READY;
 	}
 
@@ -1132,14 +1133,12 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
-
+	conn->queryTail->queryclass = PGQUERY_SIMPLE;
+	
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
-
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
+	
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
@@ -1151,7 +1150,9 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if( conn->asyncStatus == PGASYNC_IDLE )
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 }
 
@@ -1272,13 +1273,11 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	conn->queryTail->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1288,6 +1287,7 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
 static bool
 PQsendQueryStart(PGconn *conn)
 {
+	PGquery * query;
+	
 	if (!conn)
 		return false;
 
@@ -1357,21 +1359,46 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while in copy mode, either. */
+	switch (conn->asyncStatus)
 	{
-		printfPQExpBuffer(&conn->errorMessage,
+		case PGASYNC_IDLE:
+		case PGASYNC_BUSY:
+		case PGASYNC_READY:
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
-		return false;
+			return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
-
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
-
+	if( !conn->queryFree )
+	{
+		query = (PGquery*) malloc(sizeof(PGquery));
+		query->querycmd = 0;
+		query->singleRowMode = false;
+		query->next = 0;
+	}
+	else
+	{
+		query = conn->queryFree;
+		conn->queryFree = query->next;
+		if (query->querycmd)
+			free(query->querycmd);
+		query->querycmd = NULL;
+		query->next = NULL;
+	}
+	
+	if( conn->queryTail )
+		conn->queryTail->next = query;
+	else
+		conn->queryHead = query;
+	
+	conn->queryTail = query;
+	
 	/* ready to send command message */
 	return true;
 }
@@ -1522,16 +1549,12 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	conn->queryTail->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, querycmd just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		conn->queryTail->querycmd = strdup(command);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1541,6 +1564,7 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1576,7 +1600,7 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last launched query
  */
 int
 PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1609,16 @@ PQsetSingleRowMode(PGconn *conn)
 	 * Only allow setting the flag when we have launched a query and not yet
 	 * received any results.
 	 */
-	if (!conn)
-		return 0;
-	if (conn->asyncStatus != PGASYNC_BUSY)
+	if (!conn || !conn->queryTail)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
+	if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
 		return 0;
-	if (conn->result)
+	if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+		conn->queryTail->queryclass != PGQUERY_EXTENDED)
 		return 0;
 
 	/* OK, set flag */
-	conn->singleRowMode = true;
+	conn->queryTail->singleRowMode = true;
 	return 1;
 }
 
@@ -1670,6 +1692,40 @@ PQisBusy(PGconn *conn)
 
 
 /*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetFirstQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	
+	return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetLastQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+	if (!query)
+		return 0;
+	return query->next;
+}
+
+/*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2188,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
-	{
-		free(conn->last_query);
-		conn->last_query = NULL;
-	}
+	conn->queryTail->queryclass = PGQUERY_DESCRIBE;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -2301,7 +2350,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..c839244 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,26 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
 static int build_startup_packet(const PGconn *conn, char *packet,
 					 const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
 
+void
+pqQueryAdvance(PGconn *conn)
+{
+	PGquery * query;
+	
+	query = conn->queryHead;
+	if (query == NULL)
+		return;
+	
+	/* Advance queryHead */
+	conn->queryHead = query->next;
+	/* Push last query onto free stack */
+	query->next = conn->queryFree;
+	conn->queryFree = query;
+	
+	if (conn->queryHead == NULL)
+		conn->queryTail = NULL;
+}
 
 /*
  * parseInput: if appropriate, parse input data from backend
@@ -68,7 +87,7 @@ pqParseInput3(PGconn *conn)
 	char		id;
 	int			msgLength;
 	int			avail;
-
+	
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
@@ -218,7 +237,15 @@ pqParseInput3(PGconn *conn)
 				case 'Z':		/* backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+
+					pqQueryAdvance(conn);
+					/* initialize async result-accumulation state */
+					conn->result = NULL;
+					conn->next_result = NULL;
+					if( conn->queryHead != NULL )
+						conn->asyncStatus = PGASYNC_BUSY;
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -232,7 +259,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->queryHead->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -266,7 +293,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case 'T':		/* Row Description */
 					if (conn->result == NULL ||
-						conn->queryclass == PGQUERY_DESCRIBE)
+						conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -299,7 +326,7 @@ pqParseInput3(PGconn *conn)
 					 * instead of TUPLES_OK.  Otherwise we can just ignore
 					 * this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -422,6 +449,8 @@ pqParseInput3(PGconn *conn)
 static void
 handleSyncLoss(PGconn *conn, char id, int msgLength)
 {
+	PGquery * query;
+	
 	printfPQExpBuffer(&conn->errorMessage,
 					  libpq_gettext(
 	"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +459,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 	pqSaveErrorResult(conn);
 	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
 
+	/* All queries are canceled, move them to the free list and free the query commands */
+	while ((query = conn->queryHead) != 0)
+	{
+		free(query->querycmd);
+		query->querycmd = 0;
+		conn->queryHead = query->next;
+		query->next = conn->queryFree;
+	}
+	
 	pqDropConnection(conn);
 	conn->status = CONNECTION_BAD;		/* No more connection to backend */
 }
@@ -455,7 +493,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		if (conn->result)
 			result = conn->result;
@@ -562,7 +600,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -865,10 +903,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
 	if (val)
 	{
-		if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+		if (conn->verbosity != PQERRORS_TERSE && conn->queryHead->querycmd != NULL)
 		{
 			/* emit position as a syntax cursor display */
-			querytext = conn->last_query;
+			querytext = conn->queryHead->querycmd;
 			querypos = atoi(val);
 		}
 		else
@@ -1696,7 +1734,7 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..750a8b4 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
  */
 typedef struct pg_cancel PGcancel;
 
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
 /* PGnotify represents the occurrence of a NOTIFY message.
  * Ideally this would be an opaque typedef, but it's so simple that it's
  * unlikely to change.
@@ -404,6 +411,10 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+extern PGquery *PQgetFirstQuery(PGconn *conn);
+extern PGquery *PQgetLastQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..fb9bd61 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+typedef struct pg_query
+{
+	PGQueryClass queryclass;
+	char	   *querycmd;		/* last SQL command, or NULL if unknown */
+	bool		singleRowMode;	/* return query result row-by-row? */
+	struct pg_query * next;
+	void	   *userptr;        /* convenience for the user */
+} PGquery;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -350,13 +360,19 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
+	
+	/* queryHead and queryTail form a FIFO representing queries sent
+	 * to the backend.  queryHead is the first query sent, and is the
+	 * query we are receiving results from, or have received results from */
+	PGquery *queryHead;
+	PGquery *queryTail;
+	PGquery *queryFree; /* Reuse PGQuery allocations */
+	int nQueries;
+	
 	char		last_sqlstate[6];		/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
-	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
 										 * OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..5d3f317 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline
 
 all: $(PROGS)
 
#2Heikki Linnakangas
hlinnakangas@vmware.com
In reply to: Matt Newell (#1)
Re: libpq pipelining

On 12/04/2014 03:11 AM, Matt Newell wrote:

The recent discussion about pipelining in the jodbc driver prompted me to look
at what it would take for libpq.

Great!

I have a proof of concept patch working. The results are even more promising
than I expected.

While it's true that many applications and frameworks won't easily benefit, it
amazes me that this hasn't been explored before.

I developed a simple test application that creates a table with a single auto
increment primary key column, then runs a 4 simple queries x times each:
...

I plan to write documentation, add regression testing, and do general cleanup
before asking for feedback on the patch itself. Any suggestions about
performance testing or api design would be nice. I haven't played with
changing the sync logic yet, but I'm guessing that an api to allow manual sync
instead of a sync per PQsendQuery will be needed. That could make things
tricky though with multi-statement queries, because currently the only way to
detect when results change from one query to the next are a ReadyForQuery
message.

A good API is crucial for this. It should make it easy to write an
application that does pipelining, and to handle all the error conditions
in a predictable way. I'd suggest that you write the documentation
first, before writing any code, so that we can discuss the API. It
doesn't have to be in SGML format yet, a plain-text description of the
API will do.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Craig Ringer
craig@2ndquadrant.com
In reply to: Heikki Linnakangas (#2)
Re: libpq pipelining

On 12/04/2014 05:08 PM, Heikki Linnakangas wrote:

A good API is crucial for this. It should make it easy to write an
application that does pipelining, and to handle all the error conditions
in a predictable way. I'd suggest that you write the documentation
first, before writing any code, so that we can discuss the API. It
doesn't have to be in SGML format yet, a plain-text description of the
API will do.

I strongly agree.

Applications need to be able to reliably predict what will happen if
there's an error in the middle of a pipeline.

Consideration of implicit transactions (autocommit), the whole pipeline
being one transaction, or multiple transactions is needed.

Apps need to be able to wait for the result of a query partway through a
pipeline, e.g. scheduling four queries, then waiting for the result of
the 2nd.

There are probably plenty of other wrinkly bits to think about.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Matt Newell
newellm@blur.com
In reply to: Craig Ringer (#3)
4 attachment(s)
Re: libpq pipelining

On Thursday, December 04, 2014 10:30:46 PM Craig Ringer wrote:

On 12/04/2014 05:08 PM, Heikki Linnakangas wrote:

A good API is crucial for this. It should make it easy to write an
application that does pipelining, and to handle all the error conditions
in a predictable way. I'd suggest that you write the documentation
first, before writing any code, so that we can discuss the API. It
doesn't have to be in SGML format yet, a plain-text description of the
API will do.

I strongly agree.

First pass at the documentation changes attached, along with a new example
that demonstrates pipelining 3 queries, with the middle one resulting in a
PGRES_FATAL_ERROR response.

With the API i am proposing, only 2 new functions (PQgetFirstQuery,
PQgetLastQuery) are required to be able to match each result to the query that
caused it. Another function, PQgetNextQuery allows iterating through the
pending queries, and PQgetQueryCommand permits getting the original query
text.

Adding the ability to set a user supplied pointer on the PGquery struct might
make it much easier for some frameworks, and other users might want a
callback, but I don't think either are required.

Applications need to be able to reliably predict what will happen if
there's an error in the middle of a pipeline.

Yes, the API i am proposing makes it easy to get results for each submitted
query independently of the success or failure of previous queries in the
pipeline.

Consideration of implicit transactions (autocommit), the whole pipeline
being one transaction, or multiple transactions is needed.

The more I think about this the more confident I am that no extra work is
needed.

Unless we start doing some preliminary processing of the query inside of
libpq, our hands are tied wrt sending a sync at the end of each query. The
reason for this is that we rely on the ReadyForQuery message to indicate the
end of a query, so without the sync there is no way to tell if the next result
is from another statement in the current query, or the first statement in the
next query.

I also don't see a reason to need multiple queries without a sync statement.
If the user wants all queries to succeed or fail together it should be no
problem to start the pipeline with begin and complete it commit. But I may be
missing some detail...

Apps need to be able to wait for the result of a query partway through a
pipeline, e.g. scheduling four queries, then waiting for the result of
the 2nd.

Right. With the api i am proposing the user does have to process each result
until it gets to the one it wants, but it's no problem doing that. It would
also be trivial to add a function

PGresult * PQgetNextQueryResult(PQquery *query);

that discards all results from previous queries. Very similar to how a PQexec
disregards all results from previous async queries.

It would also be possible to queue the results and be able to retrieve them
out of order, but I think that add unnecessary complexity and might also make
it easy for users to never retrieve and free some results.

There are probably plenty of other wrinkly bits to think about.

Yup, I'm sure i'm still missing some significant things at this point...

Matt Newell

Attachments:

testlibpqpipeline2.ctext/x-csrc; charset=UTF-8; name=testlibpqpipeline2.cDownload
libpq.pipeline.docs.patchtext/x-patch; charset=UTF-8; name=libpq.pipeline.docs.patchDownload
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d829a4b..96e3b2a 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3947,9 +3947,13 @@ int PQsendQuery(PGconn *conn, const char *command);
 
        After successfully calling <function>PQsendQuery</function>, call
        <function>PQgetResult</function> one or more times to obtain the
-       results.  <function>PQsendQuery</function> cannot be called again
-       (on the same connection) until <function>PQgetResult</function>
-       has returned a null pointer, indicating that the command is done.
+       results.  <function>PQsendQuery</function> may be called multiple
+       times (on the same connection), allowing query pipelining. Call
+       <function>PQgetLastQuery</function> to get a <structname>PGquery</structname>
+       which can be used to identify which results obtained from
+       <function>PQgetResult</function> belong to each pipelined query dispatch.
+       If only one query is dispatched at a time, you can call <function>PQgetResult</function>
+       until a NULL value is returned to indicate the end of the query.
       </para>
      </listitem>
     </varlistentry>
@@ -4133,8 +4137,8 @@ PGresult *PQgetResult(PGconn *conn);
 
       <para>
        <function>PQgetResult</function> must be called repeatedly until
-       it returns a null pointer, indicating that the command is done.
-       (If called when no command is active,
+       it returns a null pointer, indicating that all dispatched commands
+       are done. (If called when no command is active,
        <function>PQgetResult</function> will just return a null pointer
        at once.) Each non-null result from
        <function>PQgetResult</function> should be processed using the
@@ -4144,14 +4148,19 @@ PGresult *PQgetResult(PGconn *conn);
        <function>PQgetResult</function> will block only if a command is
        active and the necessary response data has not yet been read by
        <function>PQconsumeInput</function>.
+       If query pipelining is being used, call 
+       <function>PQgetFirstQuery</function> on a non-busy connection
+       (<function>PQisBusy</function> returns false) to determine which
+       query the next result belongs to.
       </para>
 
       <note>
        <para>
         Even when <function>PQresultStatus</function> indicates a fatal
-        error, <function>PQgetResult</function> should be called until it
-        returns a null pointer, to allow <application>libpq</> to
-        process the error information completely.
+        error, <function>PQgetResult</function> should be called until the
+        query has no more results (null pointer return if not using query
+        pipelining, otherwise see <function>PQgetFirstQuery</function>),
+        to allow <application>libpq</> to process the error information completely.
        </para>
       </note>
      </listitem>
@@ -4385,6 +4394,109 @@ int PQflush(PGconn *conn);
    read-ready and then read the response as described above.
   </para>
 
+ <variablelist>
+  <varlistentry id="libpq-pqgetquerycommand">
+   <term>
+    <function>PQgetQueryCommand</function>
+    <indexterm>
+     <primary>PQgetQueryCommand</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the query string associated with the <structure>PGquery</structure>.
+<synopsis>
+const char * PQgetQueryCommand(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     When using query pipelining this function can be used to retrieve the command
+     that created the query object.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetfirstquery">
+   <term>
+    <function>PQgetFirstQuery</function>
+    <indexterm>
+     <primary>PQgetFirstQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the first async query to recieve results, or NULL if no
+     async queries are active.
+<synopsis>
+PGquery * PQgetFirstQuery(PGconn *conn);
+</synopsis>
+    </para>
+
+    <para>
+     When using query pipelining this function can be used to indicate
+     which query the next result of <function>PQgetResult</function>
+     belongs. The result is only guaranteed to be valid for the next call to
+     <function>PQgetResult</function> if <function>PQisBusy</function>
+     is false, otherwise the returned <structure>PGquery</structure>
+     may or may not have more results.  The <structure>PGquery</structure>
+     remains valid after <function>PQgetResult</function> until the next
+     libpq call that consumes input.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetnextquery">
+   <term>
+    <function>PQgetNextQuery</function>
+    <indexterm>
+     <primary>PQgetNextQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the next <structure>PGquery</structure> in the list of
+     dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetNextQuery(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     This function can be used to iterate each pending async query starting
+     with the result of <function>PQgetFirstQuery</function>, and ending
+     with the result of <function>PQgetLastQuery</function>, then null.
+    </para>
+   </varlistentry>
+
+   <varlistentry>
+    <term>
+     <function>PQgetLastQuery</function>
+     <indexterm>
+      <primary>PQgetLastQuery</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+      Returns the last <structure>PGquery</structure> in the list of
+      dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetLastQuery(PGquery *query);
+</synopsis>
+     </para>
+
+     <para>
+      Call this function after dispatching an async query to get
+      a <structure>PGquery</structure> that can be used to identify
+      the originating query for each result obtained by
+      <function>PGgetResult</function>.
+     </para>
+   </varlistentry>
+  </variablelist>
  </sect1>
 
  <sect1 id="libpq-single-row-mode">
@@ -4411,7 +4523,7 @@ int PQflush(PGconn *conn);
    immediately after a successful call of <function>PQsendQuery</function>
    (or a sibling function).  This mode selection is effective only for the
    currently executing query.  Then call <function>PQgetResult</function>
-   repeatedly, until it returns null, as documented in <xref
+   repeatedly, until the last query result is returned, as documented in <xref
    linkend="libpq-async">.  If the query returns any rows, they are returned
    as individual <structname>PGresult</structname> objects, which look like
    normal query results except for having status code
@@ -4420,8 +4532,8 @@ int PQflush(PGconn *conn);
    the query returns zero rows, a zero-row object with status
    <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
    more rows will arrive.  (But note that it is still necessary to continue
-   calling <function>PQgetResult</function> until it returns null.)  All of
-   these <structname>PGresult</structname> objects will contain the same row
+   calling <function>PQgetResult</function> until the last query result is returned.)
+   All of these <structname>PGresult</structname> objects will contain the same row
    description data (column names, types, etc) that an ordinary
    <structname>PGresult</structname> object for the query would have.
    Each object should be freed with <function>PQclear</function> as usual.
libpq.pipeline.src.patchtext/x-patch; charset=UTF-8; name=libpq.pipeline.src.patchDownload
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..74d76f3 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,7 @@ lo_lseek64                162
 lo_tell64                 163
 lo_truncate64             164
 PQconninfo                165
+PQgetFirstQuery           166
+PQgetLastQuery            167
+PQgetNextQuery            168
+PQgetQueryCommand         169
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..31fa437 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
 		free(conn->gsslib);
 #endif
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->inBuffer)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
 										 * absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
+	
+	/*
+	 * Link active queries into the free list so we can free them
+	 */
+	if (conn->queryTail)
+	{
+		conn->queryTail->next = conn->queryFree;
+		conn->queryFree = conn->queryHead;
+	}
+	conn->queryHead = conn->queryTail = NULL;
+	
+	/*
+	 * Free all query objects
+	 */
+	while (conn->queryFree)
+	{
+		PGquery * prev = conn->queryFree;
+		conn->queryFree = prev->next;
+		if (prev->querycmd)
+			free(prev->querycmd);
+		free(prev);
+	}
+
 	resetPQExpBuffer(&conn->errorMessage);
 	pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
 	conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
 }
 
 /*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
  * A copy is needed to be able to cancel a running query from a different
  * thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..2a421a8 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * row; the original conn->result is left unchanged so that it can be used
 	 * again as the template for future rows.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Copy everything that should be in the result at this point */
 		res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * Success.  In single-row mode, make the result available to the client
 	 * immediately.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Change result status to special single-row value */
 		res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1088,6 +1088,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
+		/* TODO: Still correct ? */
 		conn->asyncStatus = PGASYNC_READY;
 	}
 
@@ -1132,14 +1133,12 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
-
+	conn->queryTail->queryclass = PGQUERY_SIMPLE;
+	
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
-
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
+	
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
@@ -1151,7 +1150,9 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if( conn->asyncStatus == PGASYNC_IDLE )
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 }
 
@@ -1272,13 +1273,11 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	conn->queryTail->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1288,6 +1287,7 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
 static bool
 PQsendQueryStart(PGconn *conn)
 {
+	PGquery * query;
+	
 	if (!conn)
 		return false;
 
@@ -1357,21 +1359,46 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while in copy mode, either. */
+	switch (conn->asyncStatus)
 	{
-		printfPQExpBuffer(&conn->errorMessage,
+		case PGASYNC_IDLE:
+		case PGASYNC_BUSY:
+		case PGASYNC_READY:
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
-		return false;
+			return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
-
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
-
+	if( !conn->queryFree )
+	{
+		query = (PGquery*) malloc(sizeof(PGquery));
+		query->querycmd = 0;
+		query->singleRowMode = false;
+		query->next = 0;
+	}
+	else
+	{
+		query = conn->queryFree;
+		conn->queryFree = query->next;
+		if (query->querycmd)
+			free(query->querycmd);
+		query->querycmd = NULL;
+		query->next = NULL;
+	}
+	
+	if( conn->queryTail )
+		conn->queryTail->next = query;
+	else
+		conn->queryHead = query;
+	
+	conn->queryTail = query;
+	
 	/* ready to send command message */
 	return true;
 }
@@ -1522,16 +1549,12 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	conn->queryTail->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, querycmd just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		conn->queryTail->querycmd = strdup(command);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1541,6 +1564,7 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1576,7 +1600,7 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last launched query
  */
 int
 PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1609,16 @@ PQsetSingleRowMode(PGconn *conn)
 	 * Only allow setting the flag when we have launched a query and not yet
 	 * received any results.
 	 */
-	if (!conn)
-		return 0;
-	if (conn->asyncStatus != PGASYNC_BUSY)
+	if (!conn || !conn->queryTail)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
+	if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
 		return 0;
-	if (conn->result)
+	if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+		conn->queryTail->queryclass != PGQUERY_EXTENDED)
 		return 0;
 
 	/* OK, set flag */
-	conn->singleRowMode = true;
+	conn->queryTail->singleRowMode = true;
 	return 1;
 }
 
@@ -1670,6 +1692,51 @@ PQisBusy(PGconn *conn)
 
 
 /*
+ * PQgetQueryCommand
+ */
+const char *
+PQgetQueryCommand(PGquery *query)
+{
+	if (!query)
+		return NULL;
+	return query->querycmd;
+}
+
+/*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetFirstQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	
+	return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetLastQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+	if (!query)
+		return 0;
+	return query->next;
+}
+
+/*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2199,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
-	{
-		free(conn->last_query);
-		conn->last_query = NULL;
-	}
+	conn->queryTail->queryclass = PGQUERY_DESCRIBE;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -2301,7 +2361,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..f8a262c 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,26 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
 static int build_startup_packet(const PGconn *conn, char *packet,
 					 const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
 
+void
+pqQueryAdvance(PGconn *conn)
+{
+	PGquery * query;
+	
+	query = conn->queryHead;
+	if (query == NULL)
+		return;
+	
+	/* Advance queryHead */
+	conn->queryHead = query->next;
+	/* Push last query onto free stack */
+	query->next = conn->queryFree;
+	conn->queryFree = query;
+	
+	if (conn->queryHead == NULL)
+		conn->queryTail = NULL;
+}
 
 /*
  * parseInput: if appropriate, parse input data from backend
@@ -68,7 +87,7 @@ pqParseInput3(PGconn *conn)
 	char		id;
 	int			msgLength;
 	int			avail;
-
+	
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
@@ -218,7 +237,15 @@ pqParseInput3(PGconn *conn)
 				case 'Z':		/* backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+
+					pqQueryAdvance(conn);
+					/* initialize async result-accumulation state */
+					conn->result = NULL;
+					conn->next_result = NULL;
+					if( conn->queryHead != NULL )
+						conn->asyncStatus = PGASYNC_BUSY;
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -232,7 +259,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->queryHead->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -266,7 +293,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case 'T':		/* Row Description */
 					if (conn->result == NULL ||
-						conn->queryclass == PGQUERY_DESCRIBE)
+						conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -299,7 +326,7 @@ pqParseInput3(PGconn *conn)
 					 * instead of TUPLES_OK.  Otherwise we can just ignore
 					 * this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -422,6 +449,8 @@ pqParseInput3(PGconn *conn)
 static void
 handleSyncLoss(PGconn *conn, char id, int msgLength)
 {
+	PGquery * query;
+	
 	printfPQExpBuffer(&conn->errorMessage,
 					  libpq_gettext(
 	"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +459,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 	pqSaveErrorResult(conn);
 	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
 
+	/* All queries are canceled, move them to the free list and free the query commands */
+	while ((query = conn->queryHead) != NULL)
+	{
+		free(query->querycmd);
+		query->querycmd = NULL;
+		conn->queryHead = query->next;
+		query->next = conn->queryFree;
+	}
+	
 	pqDropConnection(conn);
 	conn->status = CONNECTION_BAD;		/* No more connection to backend */
 }
@@ -455,7 +493,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		if (conn->result)
 			result = conn->result;
@@ -562,7 +600,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -865,10 +903,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
 	if (val)
 	{
-		if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+		if (conn->verbosity != PQERRORS_TERSE && conn->queryHead->querycmd != NULL)
 		{
 			/* emit position as a syntax cursor display */
-			querytext = conn->last_query;
+			querytext = conn->queryHead->querycmd;
 			querypos = atoi(val);
 		}
 		else
@@ -1696,7 +1734,7 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..163a5e0 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
  */
 typedef struct pg_cancel PGcancel;
 
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
 /* PGnotify represents the occurrence of a NOTIFY message.
  * Ideally this would be an opaque typedef, but it's so simple that it's
  * unlikely to change.
@@ -404,6 +411,11 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+extern const char * PQgetQueryCommand(PGquery *query);
+extern PGquery *PQgetFirstQuery(PGconn *conn);
+extern PGquery *PQgetLastQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..fb9bd61 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+typedef struct pg_query
+{
+	PGQueryClass queryclass;
+	char	   *querycmd;		/* last SQL command, or NULL if unknown */
+	bool		singleRowMode;	/* return query result row-by-row? */
+	struct pg_query * next;
+	void	   *userptr;        /* convenience for the user */
+} PGquery;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -350,13 +360,19 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
+	
+	/* queryHead and queryTail form a FIFO representing queries sent
+	 * to the backend.  queryHead is the first query sent, and is the
+	 * query we are receiving results from, or have received results from */
+	PGquery *queryHead;
+	PGquery *queryTail;
+	PGquery *queryFree; /* Reuse PGQuery allocations */
+	int nQueries;
+	
 	char		last_sqlstate[6];		/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
-	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
 										 * OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..3996760 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline testlibpqpipeline2
 
 all: $(PROGS)
 
diff --git a/src/test/examples/testlibpqpipeline.c b/src/test/examples/testlibpqpipeline.c
index 725aad5..da0466f 100644
--- a/src/test/examples/testlibpqpipeline.c
+++ b/src/test/examples/testlibpqpipeline.c
@@ -101,7 +101,7 @@ int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int b
 #define TEST_P(q) \
 	if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \
 		return result;
-	TEST_P("INSERT INTO test() VALUES ()");
+	TEST_P("INSERT INTO test(id) VALUES (DEFAULT)");
 	TEST_P("SELECT * FROM test LIMIT 1");
 	TEST_P("SELECT * FROM test");
 	TEST_P("DELETE FROM test");
@@ -137,7 +137,7 @@ main(int argc, char **argv)
 
 	PQsetnonblocking(conn,1);
 	
-	PQexec(conn,"CREATE TABLE test ( id PRIMARY KEY AUTOINCREMENT )");
+	PQclear(PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"));
 
 	baseline = testPipelinedSeries(conn,10,1,0);
 	testPipelinedSeries(conn,10,3,baseline);
@@ -150,7 +150,7 @@ main(int argc, char **argv)
 	testPipelinedSeries(conn,100,50,baseline);
 	testPipelinedSeries(conn,100,100,baseline);
 	
-	PQexec(conn,"DROP TABLE test");
+	PQclear(PQexec(conn,"DROP TABLE test"));
 	
 	return 0;
 }
\ No newline at end of file
testlibpqpipeline.ctext/x-csrc; charset=UTF-8; name=testlibpqpipeline.cDownload
#5Claudio Freire
klaussfreire@gmail.com
In reply to: Matt Newell (#4)
Re: libpq pipelining

On Thu, Dec 4, 2014 at 4:11 PM, Matt Newell <newellm@blur.com> wrote:

With the API i am proposing, only 2 new functions (PQgetFirstQuery,
PQgetLastQuery) are required to be able to match each result to the query that
caused it. Another function, PQgetNextQuery allows iterating through the
pending queries, and PQgetQueryCommand permits getting the original query
text.

Adding the ability to set a user supplied pointer on the PGquery struct might
make it much easier for some frameworks, and other users might want a
callback, but I don't think either are required.

With a pointer on PGquery you wouldn't need any of the above. Who
whants the query text sets it as a pointer, who wants some other
struct sets it as a pointer.

You would only need to be careful about the lifetime of the pointed
struct, but that onus is on the application I'd say. The API only
needs to provide some guarantees about how long or short it holds onto
that pointer.

I'm thinking this would be somewhat necessary for a python wrapper,
like psycopg2 (the wrapper could build a dictionary based on query
text, but there's no guarantee that query text will be unique so it'd
be very tricky).

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Matt Newell
newellm@blur.com
In reply to: Claudio Freire (#5)
Re: libpq pipelining

On Thursday, December 04, 2014 04:30:27 PM Claudio Freire wrote:

On Thu, Dec 4, 2014 at 4:11 PM, Matt Newell <newellm@blur.com> wrote:

With the API i am proposing, only 2 new functions (PQgetFirstQuery,
PQgetLastQuery) are required to be able to match each result to the query
that caused it. Another function, PQgetNextQuery allows iterating
through the pending queries, and PQgetQueryCommand permits getting the
original query text.

Adding the ability to set a user supplied pointer on the PGquery struct
might make it much easier for some frameworks, and other users might want
a callback, but I don't think either are required.

With a pointer on PGquery you wouldn't need any of the above. Who
whants the query text sets it as a pointer, who wants some other
struct sets it as a pointer.

libpq already stores the (current) query text as it's used in some error
cases, so that's not really optional without breaking backwards compatibility.
Adding another pointer for the user to optional utilize should be no big deal
though if everyone agrees it's a good thing.

You would only need to be careful about the lifetime of the pointed
struct, but that onus is on the application I'd say. The API only
needs to provide some guarantees about how long or short it holds onto
that pointer.

Agreed.

I'm thinking this would be somewhat necessary for a python wrapper,
like psycopg2 (the wrapper could build a dictionary based on query
text, but there's no guarantee that query text will be unique so it'd
be very tricky).

While it might make some things simpler, i really don't think it absolutely
necessary since the wrapper can maintain a queue that corresponds to libpq's
internal queue of PGquery's. ie, each time you call a PQsendQuery* function
you push your required state, and each time the return value of
PQgetFirstQuery changes you pop from the queue.

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Heikki Linnakangas
hlinnakangas@vmware.com
In reply to: Matt Newell (#4)
Re: libpq pipelining

On 12/04/2014 09:11 PM, Matt Newell wrote:

With the API i am proposing, only 2 new functions (PQgetFirstQuery,
PQgetLastQuery) are required to be able to match each result to the query that
caused it. Another function, PQgetNextQuery allows iterating through the
pending queries, and PQgetQueryCommand permits getting the original query
text.

Adding the ability to set a user supplied pointer on the PGquery struct might
make it much easier for some frameworks, and other users might want a
callback, but I don't think either are required.

I don't like exposing the PGquery struct to the application like that.
Access to all other libpq objects is done via functions. The application
can't (or shouldn't, anyway) directly access the fields of PGresult, for
example. It has to call PQnfields(), PQntuples() etc.

The user-supplied pointer seems quite pointless. It would make sense if
the pointer was passed to PQsendquery(), and you'd get it back in
PGquery. You could then use it to tag the query when you send it with
whatever makes sense for the application, and use the tag in the result
to match it with the original query. But as it stands, I don't see the
point.

The original query string might be handy for some things, but for others
it's useless. It's not enough as a general method to identify the query
the result belongs to. A common use case for this is to execute the same
query many times with different parameters.

So I don't think you've quite nailed the problem of how to match the
results to the commands that originated them, yet. One idea is to add a
function that can be called after PQgetResult(), to get some identifier
of the original command. But there needs to be a mechanism to tag the
PQsendQuery() calls. Or you can assign each call a unique ID
automatically, and have a way to ask for that ID after calling
PQsendQuery().

The explanation of PQgetFirstQuery makes it sound pretty hard to match
up the result with the query. You have to pay attention to PQisBusy.

It would be good to make it explicit when you start a pipelined
operation. Currently, you get an error if you call PQsendQuery() twice
in a row, without reading the result inbetween. That's a good thing, to
catch application errors, when you're not trying to do pipelining.
Otherwise, if you forget to get the result of a query you've sent, and
then send another query, you'll merrily read the result of the first
query and think that it belongs to the second.

Are you trying to support "continous pipelining", where you send new
queries all the time, and read results as they arrive, without ever
draining the pipe? Or are you just trying to do "batches", where you
send a bunch of queries, and wait for all the results to arrive, before
sending more? A batched API would be easier to understand and work with,
although a "continuous" pipeline could be more efficient for an
application that can take advantage of it.

Consideration of implicit transactions (autocommit), the whole pipeline
being one transaction, or multiple transactions is needed.

The more I think about this the more confident I am that no extra work is
needed.

Unless we start doing some preliminary processing of the query inside of
libpq, our hands are tied wrt sending a sync at the end of each query. The
reason for this is that we rely on the ReadyForQuery message to indicate the
end of a query, so without the sync there is no way to tell if the next result
is from another statement in the current query, or the first statement in the
next query.

I also don't see a reason to need multiple queries without a sync statement.
If the user wants all queries to succeed or fail together it should be no
problem to start the pipeline with begin and complete it commit. But I may be
missing some detail...

True. It makes me a bit uneasy, though, to not be sure that the whole
batch is committed or rolled back as one unit. There are many ways the
user can shoot himself in the foot with that. Error handling would be a
lot simpler if you would only send one Sync for the whole batch. Tom
explained it better on this recent thread:
/messages/by-id/32086.1415063405@sss.pgh.pa.us.

Another thought is that for many applications, it would actually be OK
to not know which query each result belongs to. For example, if you
execute a bunch of inserts, you often just want to get back the total
number of inserted, or maybe not even that. Or if you execute a "CREATE
TEMPORARY TABLE ... ON COMMIT DROP", followed by some insertions to it,
some more data manipulations, and finally a SELECT to get the results
back. All you want is the last result set.

If we could modify the wire protocol, we'd want to have a MiniSync
message that is like Sync except that it wouldn't close the current
transaction. The server would respond to it with a ReadyForQuery message
(which could carry an ID number, to match it up with the MiniSync
command). But I really wish we'd find a way to do this without changing
the wire protocol.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Matt Newell
newellm@blur.com
In reply to: Heikki Linnakangas (#7)
Re: libpq pipelining

On Thursday, December 04, 2014 11:39:02 PM Heikki Linnakangas wrote:

Adding the ability to set a user supplied pointer on the PGquery struct
might make it much easier for some frameworks, and other users might want
a callback, but I don't think either are required.

I don't like exposing the PGquery struct to the application like that.
Access to all other libpq objects is done via functions. The application
can't (or shouldn't, anyway) directly access the fields of PGresult, for
example. It has to call PQnfields(), PQntuples() etc.

Right, my patch doesn't expose it. I was thinking of adding two new functions
to get/set the user tag/pointer.

The user-supplied pointer seems quite pointless. It would make sense if
the pointer was passed to PQsendquery(), and you'd get it back in
PGquery. You could then use it to tag the query when you send it with
whatever makes sense for the application, and use the tag in the result
to match it with the original query.

That's exactly what I envisioned, but with a separate call to avoid having to
modify/duplicate the PQsendQuery functions:

PQsendQuery(conn,...)
query = PQgetLastQuery(conn);
PQquerySetUserPointer(query,userPtr);

...
result = PQgetResult(conn);
query = PQgetFirstQuery(conn);
userPtr = PQqueryGetUserPointer(query);

But as it stands, I don't see the
point.

I don't need it since it should be easy to keep track without it. It was just
an idea.

The original query string might be handy for some things, but for others
it's useless. It's not enough as a general method to identify the query
the result belongs to. A common use case for this is to execute the same
query many times with different parameters.

Right, I'm only saving the query text because that's how things were done
already. Since it's already there I didn't see a reason not to expose it.

So I don't think you've quite nailed the problem of how to match the
results to the commands that originated them, yet. One idea is to add a
function that can be called after PQgetResult(), to get some identifier
of the original command. But there needs to be a mechanism to tag the
PQsendQuery() calls. Or you can assign each call a unique ID
automatically, and have a way to ask for that ID after calling
PQsendQuery().

PGquery IS the unique ID, and it is available after calling PQsendQuery by
calling PQgetLastQuery.

The explanation of PQgetFirstQuery makes it sound pretty hard to match
up the result with the query. You have to pay attention to PQisBusy.

It's not hard at all and is very natural to use since the whole point of an
async api is to avoid blocking, so it's natural to only call PQgetResult when
it's not going to block. PQgetFirstQuery should also be valid after calling
PQgetResult and then you don't have to worry about PQisBusy, so I should
probably change the documentation to indicate that is the preferred usage, or
maybe make that the only guaranteed usage, and say the results are undefined if
you call it before calling PQgetResult. That usage also makes it consistent
with PQgetLastQuery being called immediately after PQsendQuery.

Another option would be a function to get the PGquery for any PGresult. This
would make things a bit more straightforward for the user, but more
complicated in the implementation since multiple PGresults will share the same
PGquery. However it's nothing that a reference count wouldn't solve.

It would be good to make it explicit when you start a pipelined
operation. Currently, you get an error if you call PQsendQuery() twice
in a row, without reading the result inbetween. That's a good thing, to
catch application errors, when you're not trying to do pipelining.
Otherwise, if you forget to get the result of a query you've sent, and
then send another query, you'll merrily read the result of the first
query and think that it belongs to the second.

Agreed, and I think this is the only behavior change currently. An easy fix to
restore existing behavior by default:

PQsetPipelining(PGconn *conn, int arg);

should work.

Are you trying to support "continous pipelining", where you send new
queries all the time, and read results as they arrive, without ever
draining the pipe? Or are you just trying to do "batches", where you
send a bunch of queries, and wait for all the results to arrive, before
sending more? A batched API would be easier to understand and work with,
although a "continuous" pipeline could be more efficient for an
application that can take advantage of it.

I don't see any reason to limit it to batches, though it can certainly be used
that way. My first test case does continuous pipelining and it provides a huge
throughput gain when there's any latency in the connection. I can envision a
lot of uses for the continuous approach.

Consideration of implicit transactions (autocommit), the whole pipeline
being one transaction, or multiple transactions is needed.

The more I think about this the more confident I am that no extra work is
needed.

Unless we start doing some preliminary processing of the query inside of
libpq, our hands are tied wrt sending a sync at the end of each query.
The
reason for this is that we rely on the ReadyForQuery message to indicate
the end of a query, so without the sync there is no way to tell if the
next result is from another statement in the current query, or the first
statement in the next query.

I also don't see a reason to need multiple queries without a sync
statement. If the user wants all queries to succeed or fail together it
should be no problem to start the pipeline with begin and complete it
commit. But I may be missing some detail...

True. It makes me a bit uneasy, though, to not be sure that the whole
batch is committed or rolled back as one unit. There are many ways the
user can shoot himself in the foot with that. Error handling would be a
lot simpler if you would only send one Sync for the whole batch. Tom
explained it better on this recent thread:
/messages/by-id/32086.1415063405@sss.pgh.pa.us.

I've read tom's email and every other one i could find related to pipelining
and I simply don't see the problem.

What's the advantage of being able to pipeline multiple queries without sync
and without an explicit transaction, vs an explicit transaction with a sync
per query?

The usage I have in mind for pipelining needs each query to succeed or fail
independently of any query before or after it, unless the caller explicitly
uses a transaction.

Another thought is that for many applications, it would actually be OK
to not know which query each result belongs to. For example, if you
execute a bunch of inserts, you often just want to get back the total
number of inserted, or maybe not even that. Or if you execute a "CREATE
TEMPORARY TABLE ... ON COMMIT DROP", followed by some insertions to it,
some more data manipulations, and finally a SELECT to get the results
back. All you want is the last result set.

Easy, grab the PGquery from only the last select, call PQgetResult until you
get a matching result, PQclear the rest. We could provide a function to do
just that as a convenience to the user, if it's going to be a common use-case.
It would be no different than how PQexec processes and discards results.

Matt

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Matt Newell
newellm@blur.com
In reply to: Matt Newell (#8)
Re: libpq pipelining

The explanation of PQgetFirstQuery makes it sound pretty hard to match
up the result with the query. You have to pay attention to PQisBusy.

PQgetFirstQuery should also be valid after
calling PQgetResult and then you don't have to worry about PQisBusy, so I
should probably change the documentation to indicate that is the preferred
usage, or maybe make that the only guaranteed usage, and say the results
are undefined if you call it before calling PQgetResult. That usage also
makes it consistent with PQgetLastQuery being called immediately after
PQsendQuery.

I changed my second example to call PQgetFirstQuery after PQgetResult instead
of before, and that removes the need to call PQconsumeInput and PQisBusy when
you don't mind blocking. It makes the example super simple:

PQsendQuery(conn, "INSERT INTO test(id) VALUES (DEFAULT),(DEFAULT)
RETURNING id");
query1 = PQgetLastQuery(conn);

/* Duplicate primary key error */
PQsendQuery(conn, "UPDATE test SET id=2 WHERE id=1");
query2 = PQgetLastQuery(conn);

PQsendQuery(conn, "SELECT * FROM test");
query3 = PQgetLastQuery(conn);

while( (result = PQgetResult(conn)) != NULL )
{
curQuery = PQgetFirstQuery(conn);

if (curQuery == query1)
checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
if (curQuery == query2)
checkResult(conn,result,curQuery,PGRES_FATAL_ERROR);
if (curQuery == query3)
checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
}

Note that the curQuery == queryX check will work no matter how many results a
query produces.

Matt Newell

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Heikki Linnakangas
hlinnakangas@vmware.com
In reply to: Matt Newell (#9)
Re: libpq pipelining

On 12/05/2014 02:30 AM, Matt Newell wrote:

The explanation of PQgetFirstQuery makes it sound pretty hard to match
up the result with the query. You have to pay attention to PQisBusy.

PQgetFirstQuery should also be valid after
calling PQgetResult and then you don't have to worry about PQisBusy, so I
should probably change the documentation to indicate that is the preferred
usage, or maybe make that the only guaranteed usage, and say the results
are undefined if you call it before calling PQgetResult. That usage also
makes it consistent with PQgetLastQuery being called immediately after
PQsendQuery.

I changed my second example to call PQgetFirstQuery after PQgetResult instead
of before, and that removes the need to call PQconsumeInput and PQisBusy when
you don't mind blocking. It makes the example super simple:

PQsendQuery(conn, "INSERT INTO test(id) VALUES (DEFAULT),(DEFAULT)
RETURNING id");
query1 = PQgetLastQuery(conn);

/* Duplicate primary key error */
PQsendQuery(conn, "UPDATE test SET id=2 WHERE id=1");
query2 = PQgetLastQuery(conn);

PQsendQuery(conn, "SELECT * FROM test");
query3 = PQgetLastQuery(conn);

while( (result = PQgetResult(conn)) != NULL )
{
curQuery = PQgetFirstQuery(conn);

if (curQuery == query1)
checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
if (curQuery == query2)
checkResult(conn,result,curQuery,PGRES_FATAL_ERROR);
if (curQuery == query3)
checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
}

Note that the curQuery == queryX check will work no matter how many results a
query produces.

Oh, that's what the PQgetLastQuery/PQgetNextQuery functions work! I
didn't understand that before. I'd suggest renaming them to something
like PQgetSentQuery() and PQgetResultQuery(). The first/last/next names
made me think that they're used to iterate a list of queries, but in
fact they're supposed to be used at very different stages.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#11Matt Newell
newellm@blur.com
In reply to: Heikki Linnakangas (#10)
4 attachment(s)
Re: libpq pipelining

On Friday, December 05, 2014 12:22:38 PM Heikki Linnakangas wrote:

Oh, that's what the PQgetLastQuery/PQgetNextQuery functions work! I
didn't understand that before. I'd suggest renaming them to something
like PQgetSentQuery() and PQgetResultQuery(). The first/last/next names
made me think that they're used to iterate a list of queries, but in
fact they're supposed to be used at very different stages.

- Heikki

Okay, I have renamed them with your suggestions, and added
PQsetPipelining/PQgetPipelining, defaulting to pipelining off. There should be
no behavior change unless pipelining is enabled.

Documentation should be mostly complete except the possible addition of an
example and maybe a general pipelining overview paragraph.

I have implemented async query support (that takes advantage of pipelining) in
Qt, along with a couple test cases. This led to me discovering a bug with my
last patch where a PGquery object could be reused twice in a row. I have fixed
that. I contemplated not reusing the PGquery objects at all, but that
wouldn't solve the problem because it's very possible that malloc will return
a recent free of the same size anyway. Making the guarantee that a PGquery
won't be reused twice in a row should be sufficient, and the only alternative is
to add a unique id, but that will add further complexity that I don't think is
warranted.

Feedback is very welcome and appreciated.

Thanks,
Matt Newell

Attachments:

libpq.pipeline.docs.patchtext/x-patch; charset=UTF-8; name=libpq.pipeline.docs.patchDownload
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d829a4b..4e0431e 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3947,9 +3947,14 @@ int PQsendQuery(PGconn *conn, const char *command);
 
        After successfully calling <function>PQsendQuery</function>, call
        <function>PQgetResult</function> one or more times to obtain the
-       results.  <function>PQsendQuery</function> cannot be called again
-       (on the same connection) until <function>PQgetResult</function>
-       has returned a null pointer, indicating that the command is done.
+       results.  If pipelining is enabled <function>PQsendQuery</function>
+       may be called multiple times before reading the results. See 
+       <function>PQsetPipelining</function> and <function>PQisPipelining</function>.
+       Call <function>PQgetSentQuery</function> to get a <structname>PGquery</structname>
+       which can be used to identify which results obtained from
+       <function>PQgetResult</function> belong to each pipelined query.
+       If only one query is dispatched at a time, you can call <function>PQgetResult</function>
+       until a NULL value is returned to indicate the end of the query.
       </para>
      </listitem>
     </varlistentry>
@@ -4133,8 +4138,8 @@ PGresult *PQgetResult(PGconn *conn);
 
       <para>
        <function>PQgetResult</function> must be called repeatedly until
-       it returns a null pointer, indicating that the command is done.
-       (If called when no command is active,
+       it returns a null pointer, indicating that all dispatched commands
+       are done. (If called when no command is active,
        <function>PQgetResult</function> will just return a null pointer
        at once.) Each non-null result from
        <function>PQgetResult</function> should be processed using the
@@ -4144,14 +4149,17 @@ PGresult *PQgetResult(PGconn *conn);
        <function>PQgetResult</function> will block only if a command is
        active and the necessary response data has not yet been read by
        <function>PQconsumeInput</function>.
+       If query pipelining is being used, <function>PQgetResultQuery</function>
+       can be called after PQgetResult to match the result to the query.
       </para>
 
       <note>
        <para>
         Even when <function>PQresultStatus</function> indicates a fatal
-        error, <function>PQgetResult</function> should be called until it
-        returns a null pointer, to allow <application>libpq</> to
-        process the error information completely.
+        error, <function>PQgetResult</function> should be called until the
+        query has no more results (null pointer return if not using query
+        pipelining, otherwise see <function>PQgetResultQuery</function>),
+        to allow <application>libpq</> to process the error information completely.
        </para>
       </note>
      </listitem>
@@ -4385,6 +4393,158 @@ int PQflush(PGconn *conn);
    read-ready and then read the response as described above.
   </para>
 
+ <variablelist>
+  <varlistentry id="libpq-pqsetpipelining">
+   <term>
+    <function>PQsetPipelining</function>
+    <indexterm>
+     <primary>PQsetPipelining</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Enables or disables query pipelining.
+<synopsis>
+int PQsetPipelining(PGconn *conn, int arg);
+</synopsis>
+    </para>
+
+    <para>
+     Enables pipelining for the connectino if arg is 1, or disables it
+     if arg is 0.  When pipelining is enabled multiple async queries can
+     be sent before processing the results of the first.  If pipelining
+     is disabled an error will be raised an async query is attempted
+     while another is active.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqispipelining">
+   <term>
+    <function>PQisPipelining</function>
+    <indexterm>
+     <primary>PQisPipelining</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the pipelining status of the connection
+<synopsis>
+int PQisPipelining(PGconn *conn);
+</synopsis>
+    </para>
+
+    <para>
+     Returns 1 if pipelining is enabled, or 0 if pipeling is disabled.
+     Query pipelining is disabled unless enabled with a call to
+     <function>PQsetPipelining</function>.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetquerycommand">
+   <term>
+    <function>PQgetQueryCommand</function>
+    <indexterm>
+     <primary>PQgetQueryCommand</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the query string associated with the <structure>PGquery</structure>.
+<synopsis>
+const char * PQgetQueryCommand(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     When using query pipelining this function can be used to retrieve the command
+     that created the query object.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetresultquery">
+   <term>
+    <function>PQgetResultQuery</function>
+    <indexterm>
+     <primary>PQgetResultQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the first async query to recieve results, or NULL if no
+     async queries are active.
+<synopsis>
+PGquery * PQgetResultQuery(PGconn *conn);
+</synopsis>
+    </para>
+
+    <para>
+     When pipelining queries this function indicates which query the
+     result of <function>PQgetResult</function> results from.
+     Call this function immediately after calling 
+     <function>PQgetResult</function>, or immediately before if a result
+     is ready to read, indicated by <function>PQisBusy</function>
+     being false.  The <structure>PGquery</structure> remains valid
+     until the next libpq call that consumes input.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetnextquery">
+   <term>
+    <function>PQgetNextQuery</function>
+    <indexterm>
+     <primary>PQgetNextQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the next <structure>PGquery</structure> in the list of
+     pipelined queries.
+<synopsis>
+PGquery * PQgetNextQuery(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     This function can be used to iterate each pending async query,
+     starting with <function>PQgetResultQuery</function>
+     and ending with <function>PQgetSentQuery</function>.
+    </para>
+   </varlistentry>
+
+   <varlistentry>
+    <term>
+     <function>PQgetSentQuery</function>
+     <indexterm>
+      <primary>PQgetSentQuery</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+      Returns the last <structure>PGquery</structure> in the list of
+      dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetSentQuery(PGquery *query);
+</synopsis>
+     </para>
+
+     <para>
+      Call this function after dispatching an async query to get
+      a <structure>PGquery</structure> that can be used to identify
+      the originating query for each result obtained by
+      <function>PGgetResult</function>.
+     </para>
+   </varlistentry>
+  </variablelist>
  </sect1>
 
  <sect1 id="libpq-single-row-mode">
@@ -4411,7 +4571,7 @@ int PQflush(PGconn *conn);
    immediately after a successful call of <function>PQsendQuery</function>
    (or a sibling function).  This mode selection is effective only for the
    currently executing query.  Then call <function>PQgetResult</function>
-   repeatedly, until it returns null, as documented in <xref
+   repeatedly, until the last query result is returned, as documented in <xref
    linkend="libpq-async">.  If the query returns any rows, they are returned
    as individual <structname>PGresult</structname> objects, which look like
    normal query results except for having status code
@@ -4420,8 +4580,8 @@ int PQflush(PGconn *conn);
    the query returns zero rows, a zero-row object with status
    <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
    more rows will arrive.  (But note that it is still necessary to continue
-   calling <function>PQgetResult</function> until it returns null.)  All of
-   these <structname>PGresult</structname> objects will contain the same row
+   calling <function>PQgetResult</function> until the last query result is returned.)
+   All of these <structname>PGresult</structname> objects will contain the same row
    description data (column names, types, etc) that an ordinary
    <structname>PGresult</structname> object for the query would have.
    Each object should be freed with <function>PQclear</function> as usual.
libpq.pipeline.src.patchtext/x-patch; charset=UTF-8; name=libpq.pipeline.src.patchDownload
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..050bf05 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,9 @@ lo_lseek64                162
 lo_tell64                 163
 lo_truncate64             164
 PQconninfo                165
+PQgetResultQuery          166
+PQgetSentQuery            167
+PQgetNextQuery            168
+PQgetQueryCommand         169
+PQsetPipelining           170
+PQisPipelining            171
\ No newline at end of file
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..fc72605 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
 		free(conn->gsslib);
 #endif
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->inBuffer)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
 										 * absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
+
+	/*
+	 * Link active queries into the free list so we can free them
+	 */
+	if (conn->queryTail)
+	{
+		conn->queryTail->next = conn->queryFree;
+		conn->queryFree = conn->queryHead;
+	}
+	conn->queryHead = conn->queryTail = NULL;
+
+	/*
+	 * Free all query objects
+	 */
+	while (conn->queryFree)
+	{
+		PGquery * prev = conn->queryFree;
+		conn->queryFree = prev->next;
+		if (prev->querycmd)
+			free(prev->querycmd);
+		free(prev);
+	}
+
 	resetPQExpBuffer(&conn->errorMessage);
 	pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
 	conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
 }
 
 /*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
  * A copy is needed to be able to cancel a running query from a different
  * thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..379c38c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * row; the original conn->result is left unchanged so that it can be used
 	 * again as the template for future rows.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Copy everything that should be in the result at this point */
 		res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * Success.  In single-row mode, make the result available to the client
 	 * immediately.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Change result status to special single-row value */
 		res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1132,13 +1132,11 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
+	conn->queryTail->queryclass = PGQUERY_SIMPLE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1151,7 +1149,9 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->asyncStatus == PGASYNC_IDLE)
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 }
 
@@ -1272,13 +1272,11 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	conn->queryTail->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1288,7 +1286,9 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->asyncStatus == PGASYNC_IDLE)
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
 static bool
 PQsendQueryStart(PGconn *conn)
 {
+	PGquery * query;
+
 	if (!conn)
 		return false;
 
@@ -1357,20 +1359,59 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Check if we are in a valid state to send an async query */
+	switch (conn->asyncStatus)
 	{
-		printfPQExpBuffer(&conn->errorMessage,
+		case PGASYNC_IDLE:
+			break;
+		/* Can only send a query during busy or ready state if
+		 * pipelining is enabled */
+		case PGASYNC_BUSY:
+		case PGASYNC_READY:
+			if (conn->pipelining)
+				break;
+			/* Fall through to error */
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
-		return false;
+			return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
+	query = 0;
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+	/* Check if we have a free PGquery to use if not we create one
+	 * We have to make sure we don't use the same PGquery twice
+	 * in a row, so we will try both the first and second free
+	 * entries, if not create a new one. */
+	if (conn->queryFree && conn->queryFree != conn->queryLast)
+	{
+		query = conn->queryFree;
+		conn->queryFree = query->next;
+		query->next = NULL;
+	}
+	else if(conn->queryFree && conn->queryFree->next)
+	{
+		query = conn->queryFree->next;
+		conn->queryFree->next = query->next;
+		query->next = NULL;
+		conn->queryLast = NULL; /* First is fine to use again now */
+	} else
+	{
+		query = (PGquery*) malloc(sizeof(PGquery));
+		query->querycmd = 0;
+		query->singleRowMode = false;
+		query->next = 0;
+	}
+
+	if( conn->queryTail )
+		conn->queryTail->next = query;
+	else
+		conn->queryHead = query;
+
+	conn->queryTail = query;
 
 	/* ready to send command message */
 	return true;
@@ -1522,16 +1563,12 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	conn->queryTail->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, querycmd just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		conn->queryTail->querycmd = strdup(command);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1541,7 +1578,9 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->asyncStatus == PGASYNC_IDLE)
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -1576,7 +1615,7 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last sent query
  */
 int
 PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1624,16 @@ PQsetSingleRowMode(PGconn *conn)
 	 * Only allow setting the flag when we have launched a query and not yet
 	 * received any results.
 	 */
-	if (!conn)
+	if (!conn || !conn->queryTail)
 		return 0;
-	if (conn->asyncStatus != PGASYNC_BUSY)
+	if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
-		return 0;
-	if (conn->result)
+	if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+		conn->queryTail->queryclass != PGQUERY_EXTENDED)
 		return 0;
 
 	/* OK, set flag */
-	conn->singleRowMode = true;
+	conn->queryTail->singleRowMode = true;
 	return 1;
 }
 
@@ -1670,6 +1707,50 @@ PQisBusy(PGconn *conn)
 
 
 /*
+ * PQgetQueryCommand
+ */
+const char *
+PQgetQueryCommand(PGquery *query)
+{
+	if (!query)
+		return NULL;
+	return query->querycmd;
+}
+
+/*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetResultQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetSentQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+	if (!query)
+		return 0;
+	return query->next;
+}
+
+/*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2213,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
-	{
-		free(conn->last_query);
-		conn->last_query = NULL;
-	}
+	conn->queryTail->queryclass = PGQUERY_DESCRIBE;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -2301,7 +2375,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
@@ -3112,6 +3186,31 @@ PQisnonblocking(const PGconn *conn)
 	return pqIsnonblocking(conn);
 }
 
+int
+PQsetPipelining(PGconn *conn, int arg)
+{
+	bool barg;
+
+	if (!conn)
+		return -1;
+
+	barg = (arg ? TRUE : FALSE);
+
+	/* Return error if they are trying to turn pipelining off and
+	 * multiple queries are pending */
+	if (!barg && conn->queryHead && conn->queryHead != conn->queryTail)
+		return -1;
+
+	conn->pipelining = barg;
+	return 0;
+}
+
+int
+PQisPipelining(PGconn *conn)
+{
+	return conn->pipelining ? 1 : 0;
+}
+
 /* libpq is thread-safe? */
 int
 PQisthreadsafe(void)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..d0c5110 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,29 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
 static int build_startup_packet(const PGconn *conn, char *packet,
 					 const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
 
+void
+pqQueryAdvance(PGconn *conn)
+{
+	PGquery * query;
+
+	query = conn->queryHead;
+	if (query == NULL)
+		return;
+
+	conn->queryLast = query;
+	/* Advance queryHead */
+	conn->queryHead = query->next;
+	/* Push last query onto free stack */
+	query->next = conn->queryFree;
+	conn->queryFree = query;
+	free(query->querycmd);
+	query->querycmd = NULL;
+
+	if (conn->queryHead == NULL)
+		conn->queryTail = NULL;
+}
 
 /*
  * parseInput: if appropriate, parse input data from backend
@@ -218,7 +240,15 @@ pqParseInput3(PGconn *conn)
 				case 'Z':		/* backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+
+					pqQueryAdvance(conn);
+					/* initialize async result-accumulation state */
+					conn->result = NULL;
+					conn->next_result = NULL;
+					if (conn->queryHead != NULL)
+						conn->asyncStatus = PGASYNC_BUSY;
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -232,7 +262,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->queryHead->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -266,7 +296,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case 'T':		/* Row Description */
 					if (conn->result == NULL ||
-						conn->queryclass == PGQUERY_DESCRIBE)
+						conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -299,7 +329,7 @@ pqParseInput3(PGconn *conn)
 					 * instead of TUPLES_OK.  Otherwise we can just ignore
 					 * this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -422,6 +452,8 @@ pqParseInput3(PGconn *conn)
 static void
 handleSyncLoss(PGconn *conn, char id, int msgLength)
 {
+	PGquery * query;
+
 	printfPQExpBuffer(&conn->errorMessage,
 					  libpq_gettext(
 	"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +462,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 	pqSaveErrorResult(conn);
 	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
 
+	/* All queries are canceled, move them to the free list and free the query commands */
+	while ((query = conn->queryHead) != NULL)
+	{
+		free(query->querycmd);
+		query->querycmd = NULL;
+		conn->queryHead = query->next;
+		query->next = conn->queryFree;
+	}
+
 	pqDropConnection(conn);
 	conn->status = CONNECTION_BAD;		/* No more connection to backend */
 }
@@ -455,7 +496,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		if (conn->result)
 			result = conn->result;
@@ -562,7 +603,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -865,10 +906,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
 	if (val)
 	{
-		if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+		if (conn->verbosity != PQERRORS_TERSE && conn->queryHead && conn->queryHead->querycmd != NULL)
 		{
 			/* emit position as a syntax cursor display */
-			querytext = conn->last_query;
+			querytext = conn->queryHead->querycmd;
 			querypos = atoi(val);
 		}
 		else
@@ -1696,7 +1737,7 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead && conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..ca54116 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
  */
 typedef struct pg_cancel PGcancel;
 
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
 /* PGnotify represents the occurrence of a NOTIFY message.
  * Ideally this would be an opaque typedef, but it's so simple that it's
  * unlikely to change.
@@ -404,6 +411,14 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+extern int PQsetPipelining(PGconn *conn, int arg);
+extern int PQisPipelining(PGconn *conn);
+
+extern const char * PQgetQueryCommand(PGquery *query);
+extern PGquery *PQgetResultQuery(PGconn *conn);
+extern PGquery *PQgetSentQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..7d84d89 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+typedef struct pg_query
+{
+	PGQueryClass queryclass;
+	char	   *querycmd;		/* last SQL command, or NULL if unknown */
+	bool		singleRowMode;	/* return query result row-by-row? */
+	struct pg_query * next;
+	void	   *userptr;        /* convenience for the user */
+} PGquery;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -350,13 +360,20 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
+
+	/* queryHead and queryTail form a FIFO representing queries sent
+	 * to the backend.  queryHead is the first query sent, and is the
+	 * query we are receiving results from, or have received results from */
+	bool pipelining;
+	PGquery *queryHead;
+	PGquery *queryTail;
+	PGquery *queryFree; /* Reuse PGQuery allocations */
+	PGquery *queryLast; /* Ensure we never use a query twice in a row */
+
 	char		last_sqlstate[6];		/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
-	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
 										 * OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..3996760 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline testlibpqpipeline2
 
 all: $(PROGS)
 
testlibpqpipeline2.ctext/x-csrc; charset=UTF-8; name=testlibpqpipeline2.cDownload
testlibpqpipeline.ctext/x-csrc; charset=UTF-8; name=testlibpqpipeline.cDownload