[Patch proposal] libpq portal support

Started by Sergei Fedorovabout 6 years ago5 messages
#1Sergei Fedorov
sergei.a.fedorov@gmail.com

Hello everybody,

Our company was in desperate need of portals in async interface of libpq,
so we patched it.

We would be happy to upstream the changes.

The description of changes:

Two functions in libpq-fe.h:
PQsendPortalBindParams for sending a command to bind a portal to a
previously prepared statement;
PQsendPortalExecute for executing a previously bound portal with a given
number of rows.

A patch to pqParseInput3 in fe-protocol3.c to handle the `portal suspended`
message tag.

The patch is ready for review, but it lacks documentation, tests and usage
examples.

There are no functions for sending bind without params and no functions for
sync interface, but they can easily be added to the feature.

--
Thank you,
Sergei Fedorov

#2Craig Ringer
craig@2ndquadrant.com
In reply to: Sergei Fedorov (#1)
Re: [Patch proposal] libpq portal support

On Thu, 17 Oct 2019 at 03:12, Sergei Fedorov <sergei.a.fedorov@gmail.com>
wrote:

Hello everybody,

Our company was in desperate need of portals in async interface of libpq,
so we patched it.

We would be happy to upstream the changes.

The description of changes:

Two functions in libpq-fe.h:
PQsendPortalBindParams for sending a command to bind a portal to a
previously prepared statement;
PQsendPortalExecute for executing a previously bound portal with a given
number of rows.

A patch to pqParseInput3 in fe-protocol3.c to handle the `portal
suspended` message tag.

The patch is ready for review, but it lacks documentation, tests and usage
examples.

There are no functions for sending bind without params and no functions
for sync interface, but they can easily be added to the feature.

If you are happy to put it under The PostgreSQL License, then sending it as
an attachment here is the first step.

If possible, please rebase it on top of git master.

Some explanation for why you have this need and what problems this solves
for you would be helpful as well.

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

#3Sergei Fedorov
sergei.a.fedorov@gmail.com
In reply to: Craig Ringer (#2)
1 attachment(s)
Re: [Patch proposal] libpq portal support

Hello everybody,

Yes, we will be happy to put our patch under the PostgreSQL License.

Patch is attached to this email, master was rebased to head prior to
creating the patch.

We are using a C++ wrapper on top of libpq for using database connections
in multithreaded asynchronous applications. For security reasons (and
partially because we are too lazy to escape query parameters) we use
prepared queries and parameter binding for execution. There are situations
when we need to fetch the query results not in one batch but in a `paged`
way, the most convenient way is to use the portals feature of PosgreSQL
protocol.

пт, 18 окт. 2019 г. в 15:21, Craig Ringer <craig@2ndquadrant.com>:

On Thu, 17 Oct 2019 at 03:12, Sergei Fedorov <sergei.a.fedorov@gmail.com>
wrote:

Hello everybody,

Our company was in desperate need of portals in async interface of libpq,
so we patched it.

We would be happy to upstream the changes.

The description of changes:

Two functions in libpq-fe.h:
PQsendPortalBindParams for sending a command to bind a portal to a
previously prepared statement;
PQsendPortalExecute for executing a previously bound portal with a given
number of rows.

A patch to pqParseInput3 in fe-protocol3.c to handle the `portal
suspended` message tag.

The patch is ready for review, but it lacks documentation, tests and
usage examples.

There are no functions for sending bind without params and no functions
for sync interface, but they can easily be added to the feature.

If you are happy to put it under The PostgreSQL License, then sending it
as an attachment here is the first step.

If possible, please rebase it on top of git master.

Some explanation for why you have this need and what problems this solves
for you would be helpful as well.

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

--
Thank you,
Sergei Fedorov

Attachments:

libpq-portals.patchapplication/octet-stream; name=libpq-portals.patchDownload
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 051f548594..f49b5acf99 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1670,6 +1670,233 @@ sendFailed:
 	return 0;
 }
 
+/*
+ * PQsendPortalBind
+ *
+ *   Bind a portal to a previously prepared statement with parameters
+ */
+int
+PQsendPortalBindParams(PGconn* conn,
+								  const char *stmtName,
+								  const char *portalName,
+								  int nParams,
+								  const char *const *paramValues,
+								  const int *paramLengths,
+								  const int *paramFormats,
+								  int resultFormat)
+{
+	int			i;
+
+	if (!PQsendQueryStart(conn))
+		return 0;
+
+	/* This isn't gonna work on a 2.0 server */
+	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("function requires at least protocol version 3.0\n"));
+		return 0;
+	}
+
+	/* check the arguments */
+	if (!stmtName)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("statement name is a null pointer\n"));
+		return 0;
+	}
+	if (!portalName)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("portal name is a null pointer\n"));
+		return 0;
+	}
+	if (nParams < 0 || nParams > 65535)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("number of parameters must be between 0 and 65535\n"));
+		return 0;
+	}
+
+	if (conn->xactStatus != PQTRANS_INTRANS)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("a portal must be bound inside a transaction block\n"));
+		return 0;
+	}
+
+	/* Construct the Bind message */
+	if (pqPutMsgStart('B', false, conn) < 0 ||
+		pqPuts(portalName, conn) < 0 ||
+		pqPuts(stmtName, conn) < 0)
+		goto sendFailed;
+
+	/* Send parameter formats */
+	if (nParams > 0 && paramFormats)
+	{
+		if (pqPutInt(nParams, 2, conn) < 0)
+			goto sendFailed;
+		for (i = 0; i < nParams; i++)
+		{
+			if (pqPutInt(paramFormats[i], 2, conn) < 0)
+				goto sendFailed;
+		}
+	}
+	else
+	{
+		if (pqPutInt(0, 2, conn) < 0)
+			goto sendFailed;
+	}
+
+	if (pqPutInt(nParams, 2, conn) < 0)
+		goto sendFailed;
+
+	/* Send parameters */
+	for (i = 0; i < nParams; i++)
+	{
+		if (paramValues && paramValues[i])
+		{
+			int			nbytes;
+
+			if (paramFormats && paramFormats[i] != 0)
+			{
+				/* binary parameter */
+				if (paramLengths)
+					nbytes = paramLengths[i];
+				else
+				{
+					printfPQExpBuffer(&conn->errorMessage,
+									  libpq_gettext("length must be given for binary parameter\n"));
+					goto sendFailed;
+				}
+			}
+			else
+			{
+				/* text parameter, do not use paramLengths */
+				nbytes = strlen(paramValues[i]);
+			}
+			if (pqPutInt(nbytes, 4, conn) < 0 ||
+				pqPutnchar(paramValues[i], nbytes, conn) < 0)
+				goto sendFailed;
+		}
+		else
+		{
+			/* take the param as NULL */
+			if (pqPutInt(-1, 4, conn) < 0)
+				goto sendFailed;
+		}
+	}
+	if (pqPutInt(1, 2, conn) < 0 ||
+		pqPutInt(resultFormat, 2, conn))
+		goto sendFailed;
+	if (pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	/* remember we are using extended query protocol */
+	conn->queryclass = PGQUERY_EXTENDED;
+
+	/* we don't have query text here, so just clean up the old one */
+	if (conn->last_query)
+		free(conn->last_query);
+	else
+		conn->last_query = NULL;
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (pqFlush(conn) < 0)
+		goto sendFailed;
+
+	/* OK, it's launched! */
+	return 1;
+
+sendFailed:
+	/* error message should be set up already */
+	return 0;
+}
+
+/*
+ * PQsendPortalExecute
+ *
+ *   Execute a portal previously bound with PQsendPortalBindParams.
+ *   Will send a command to the backend to execute portal and return
+ *   nRows rows.
+ */
+int PQsendPortalExecute(PGconn* conn,
+							   const char* portalName,
+							   int nRows)
+{
+	if (!PQsendQueryStart(conn))
+		return 0;
+
+	/* This isn't gonna work on a 2.0 server */
+	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("function requires at least protocol version 3.0\n"));
+		return 0;
+	}
+
+	/* check the arguments */
+	if (!portalName)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("portal name is a null pointer\n"));
+		return 0;
+	}
+
+	/* TODO Some sanity check for nRows */
+
+	/* construct the Describe Portal message */
+	if (pqPutMsgStart('D', false, conn) < 0 ||
+		pqPutc('P', conn) < 0 ||
+		pqPuts(portalName, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	/* construct the Execute message */
+	if (pqPutMsgStart('E', false, conn) < 0 ||
+		pqPuts(portalName, conn) < 0 ||
+		pqPutInt(nRows, 4, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+
+	/* remember we are using extended query protocol */
+	conn->queryclass = PGQUERY_EXTENDED;
+
+	/* we don't have query text here, so just clean up the old one */
+	if (conn->last_query)
+		free(conn->last_query);
+	else
+		conn->last_query = NULL;
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (pqFlush(conn) < 0)
+		goto sendFailed;
+
+	/* OK, it's launched! */
+	return 1;
+
+sendFailed:
+	/* error message should be set up already */
+	return 0;
+}
+
 /*
  * Select row-by-row processing mode
  */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c97841eb13..922319b67e 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -403,6 +403,14 @@ pqParseInput3(PGconn *conn)
 					 * the COPY command.
 					 */
 					break;
+				case 's':		/* Portal Suspended */
+					/*
+					* We see this message only when an application issued an
+					* execute portal command with row limit. We can effectively
+					* ignore the message
+					*/
+					conn->asyncStatus = PGASYNC_READY;
+					break;
 				default:
 					printfPQExpBuffer(&conn->errorMessage,
 									  libpq_gettext(
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 5f65db30e4..8323fea492 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -428,6 +428,17 @@ extern int	PQsendQueryPrepared(PGconn *conn,
 								const int *paramLengths,
 								const int *paramFormats,
 								int resultFormat);
+extern int PQsendPortalBindParams(PGconn* conn,
+								  const char *stmtName,
+								  const char *portalName,
+								  int nParams,
+								  const char *const *paramValues,
+								  const int *paramLengths,
+								  const int *paramFormats,
+								  int resultFormat);
+extern int PQsendPortalExecute(PGconn* conn,
+							   const char* portalName,
+							   int nRows);
 extern int	PQsetSingleRowMode(PGconn *conn);
 extern PGresult *PQgetResult(PGconn *conn);
 
#4Craig Ringer
craig@2ndquadrant.com
In reply to: Sergei Fedorov (#3)
Re: [Patch proposal] libpq portal support

On Thu, 7 Nov 2019 at 17:43, Sergei Fedorov <sergei.a.fedorov@gmail.com>
wrote:

Hello everybody,

Yes, we will be happy to put our patch under the PostgreSQL License.

Patch is attached to this email, master was rebased to head prior to
creating the patch.

We are using a C++ wrapper on top of libpq for using database connections
in multithreaded asynchronous applications. For security reasons (and
partially because we are too lazy to escape query parameters) we use
prepared queries and parameter binding for execution. There are situations
when we need to fetch the query results not in one batch but in a `paged`
way, the most convenient way is to use the portals feature of PosgreSQL
protocol.

Thanks. That's a really good reason. It'd also bring libpq closer to
feature-parity with PgJDBC.

Please add it to the commitfest app https://commitfest.postgresql.org/

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

#5Craig Ringer
craig@2ndquadrant.com
In reply to: Sergei Fedorov (#3)
Re: [Patch proposal] libpq portal support

On Thu, 7 Nov 2019 at 17:43, Sergei Fedorov <sergei.a.fedorov@gmail.com>
wrote:

Hello everybody,

Yes, we will be happy to put our patch under the PostgreSQL License.

Patch is attached to this email, master was rebased to head prior to
creating the patch.

We are using a C++ wrapper on top of libpq for using database connections
in multithreaded asynchronous applications. For security reasons (and
partially because we are too lazy to escape query parameters) we use
prepared queries and parameter binding for execution. There are situations
when we need to fetch the query results not in one batch but in a `paged`
way, the most convenient way is to use the portals feature of PosgreSQL
protocol.

By way of initial patch review: there's a lot of copy/paste here that
should be avoided if possible. It looks like the added function
PQsendPortalBindParams() is heavily based on PQsendQueryGuts(), which is
the common implementation shared by the existing PQsendQueryParams()
and PQsendQueryPrepared() .

Similar for PQsendPortalExecute().

I'd like to see the common code factored out, perhaps by adding the needed
functionality into PQsendQueryGuts() etc.

The patch is also missing documentation; please add it
to doc/src/sgml/libpq.sgml in docbook XML format. See the existing function
examples.

I'd ask you to add test cover, but we don't really have a useful test suite
for libpq yet, so there's not much you can do there. It definitely won't
fly without the docs and copy/paste reduction though.

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise