Patching dblink.c to avoid warning about open transaction

Started by Jonathan Beit-Aharonover 20 years ago16 messages
#1Jonathan Beit-Aharon
jbeitaharon@intrusic.com
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
  <meta content="text/html;charset=ISO-8859-1" http-equiv="Content-Type">
  <title></title>
</head>
<body bgcolor="#ffffff" text="#3333ff">
<meta content="text/html;charset=ISO-8859-1" http-equiv="Content-Type">
<title></title>
<font size="+1">Hi,<br>
</font><font size="+1">I'm not a member of this list (yet), so please
CC me on responses and
discussion.<br>
</font><br>
<font size="+1">The patch below seems to be completion of work already
started, because the boolean remoteTrFlag was already defined, and all
I had to add was its setting and two references.<br>
<br>
I hope someone will find it useful,<br>
Jonathan<br>
<br>
<br>
--- dblink.c&nbsp;&nbsp;&nbsp; Sat Jan&nbsp; 1 00:43:05 2005<br>
+++ /home/jbeitaharon/dev/third/postgreSQL/contrib/dblink/dblink.c&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
Thu Sep 22 16:10:20 2005<br>
@@ -329,12 +329,16 @@<br>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!conn)<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_CONN_NOT_AVAIL;<br>
+<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (rcon)<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rcon-&gt;remoteTrFlag = (PQtransactionStatus(conn) !=
PQTRANS_IDLE);<br>
<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; res = PQexec(conn, "BEGIN");<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (PQresultStatus(res) != PGRES_COMMAND_OK)<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_RES_INTERNALERROR("begin error");<br>
-<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PQclear(res);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ((!rcon) || (!(rcon-&gt;remoteTrFlag))) {<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; res = PQexec(conn, "BEGIN");<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (PQresultStatus(res) != PGRES_COMMAND_OK)<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_RES_INTERNALERROR("begin error");<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PQclear(res);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; res = PQexec(conn, str-&gt;data);<br>
@@ -424,12 +428,13 @@<br>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PQclear(res);<br>
<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /* commit the transaction */<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; res = PQexec(conn, "COMMIT");<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (PQresultStatus(res) != PGRES_COMMAND_OK)<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_RES_INTERNALERROR("commit error");<br>
-<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PQclear(res);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ((!rcon) || (!(rcon-&gt;remoteTrFlag))) {<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; /* commit the transaction */<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; res = PQexec(conn, "COMMIT");<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (PQresultStatus(res) != PGRES_COMMAND_OK)<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_RES_INTERNALERROR("commit error");<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PQclear(res);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>
<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; PG_RETURN_TEXT_P(GET_TEXT("OK"));<br>
&nbsp;}</font>
</body>
</html>
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Jonathan Beit-Aharon (#1)
Re: Patching dblink.c to avoid warning about open transaction

Jonathan Beit-Aharon <jbeitaharon@intrusic.com> writes:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!conn)<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_CONN_NOT_AVAIL;<br>
+<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (rcon)<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rcon-&gt;remoteTrFlag = (PQtransactionStatus(conn) !=
PQTRANS_IDLE);<br>
<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; res = PQexec(conn, "BEGIN");<br>
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (PQresultStatus(res) != PGRES_COMMAND_OK)<br>

[etc]

Could we see this in a less broken format?

regards, tom lane

#3Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Tom Lane (#2)
1 attachment(s)
Re: Patching dblink.c to avoid warning about open transaction

Tom Lane wrote:

Jonathan Beit-Aharon <jbeitaharon@intrusic.com> writes:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!conn)<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_CONN_NOT_AVAIL;<br>

Could we see this in a less broken format?

Here is the patch in text format.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

Attachments:

/rtmp/1text/plainDownload
--- dblink.c    Sat Jan  1 00:43:05 2005
+++
/home/jbeitaharon/dev/third/postgreSQL/contrib/dblink/dblink.c
Thu Sep 22 16:10:20 2005
@@ -329,12 +329,16 @@
        if (!conn)
                DBLINK_CONN_NOT_AVAIL;
+
+       if (rcon)
+               rcon->remoteTrFlag = (PQtransactionStatus(conn) !=
PQTRANS_IDLE);
-       res = PQexec(conn, "BEGIN");
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               DBLINK_RES_INTERNALERROR("begin error");
-
-       PQclear(res);
+       if ((!rcon) || (!(rcon->remoteTrFlag))) {
+               res = PQexec(conn, "BEGIN");
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       DBLINK_RES_INTERNALERROR("begin error");
+               PQclear(res);
+       }
        appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname,
sql);
        res = PQexec(conn, str->data);
@@ -424,12 +428,13 @@
        PQclear(res);
-       /* commit the transaction */
-       res = PQexec(conn, "COMMIT");
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               DBLINK_RES_INTERNALERROR("commit error");
-
-       PQclear(res);
+       if ((!rcon) || (!(rcon->remoteTrFlag))) {
+               /* commit the transaction */
+               res = PQexec(conn, "COMMIT");
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       DBLINK_RES_INTERNALERROR("commit error");
+               PQclear(res);
+       }
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
 }
#4Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Bruce Momjian (#3)
Re: Patching dblink.c to avoid warning about open transaction

[ Joe, would you review this? ]

Your patch has been added to the PostgreSQL unapplied patches list at:

http://momjian.postgresql.org/cgi-bin/pgpatches

It will be applied as soon as one of the PostgreSQL committers reviews
and approves it.

---------------------------------------------------------------------------

Bruce Momjian wrote:

Tom Lane wrote:

Jonathan Beit-Aharon <jbeitaharon@intrusic.com> writes:

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (!conn)<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DBLINK_CONN_NOT_AVAIL;<br>

Could we see this in a less broken format?

Here is the patch in text format.

-- 
Bruce Momjian                        |  http://candle.pha.pa.us
pgman@candle.pha.pa.us               |  (610) 359-1001
+  If your life is a hard drive,     |  13 Roberts Road
+  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
--- dblink.c    Sat Jan  1 00:43:05 2005
+++
/home/jbeitaharon/dev/third/postgreSQL/contrib/dblink/dblink.c
Thu Sep 22 16:10:20 2005
@@ -329,12 +329,16 @@
if (!conn)
DBLINK_CONN_NOT_AVAIL;
+
+       if (rcon)
+               rcon->remoteTrFlag = (PQtransactionStatus(conn) !=
PQTRANS_IDLE);
-       res = PQexec(conn, "BEGIN");
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               DBLINK_RES_INTERNALERROR("begin error");
-
-       PQclear(res);
+       if ((!rcon) || (!(rcon->remoteTrFlag))) {
+               res = PQexec(conn, "BEGIN");
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       DBLINK_RES_INTERNALERROR("begin error");
+               PQclear(res);
+       }
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname,
sql);
res = PQexec(conn, str->data);
@@ -424,12 +428,13 @@
PQclear(res);
-       /* commit the transaction */
-       res = PQexec(conn, "COMMIT");
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               DBLINK_RES_INTERNALERROR("commit error");
-
-       PQclear(res);
+       if ((!rcon) || (!(rcon->remoteTrFlag))) {
+               /* commit the transaction */
+               res = PQexec(conn, "COMMIT");
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       DBLINK_RES_INTERNALERROR("commit error");
+               PQclear(res);
+       }
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}

---------------------------(end of broadcast)---------------------------
TIP 5: don't forget to increase your free space map settings

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
#5Joe Conway
mail@joeconway.com
In reply to: Bruce Momjian (#4)
Re: Patching dblink.c to avoid warning about open transaction

Bruce Momjian wrote:

[ Joe, would you review this? ]

Your patch has been added to the PostgreSQL unapplied patches list at:

http://momjian.postgresql.org/cgi-bin/pgpatches

It will be applied as soon as one of the PostgreSQL committers reviews
and approves it.

The patch itself is pretty simple, but I'm unclear on the use case.
Jonathan, can you elaborate a bit?

Thanks,

Joe

p.s. I'm on a business trip in Asia again, so my responses may be
delayed a bit.

#6Jonathan Beit-Aharon
jbeitaharon@intrusic.com
In reply to: Joe Conway (#5)
Re: Patching dblink.c to avoid warning about open transaction

Joe Conway wrote:

Bruce Momjian wrote:

[ Joe, would you review this? ]

Your patch has been added to the PostgreSQL unapplied patches list at:

http://momjian.postgresql.org/cgi-bin/pgpatches

It will be applied as soon as one of the PostgreSQL committers reviews
and approves it.

The patch itself is pretty simple, but I'm unclear on the use case.
Jonathan, can you elaborate a bit?

Thanks,

Joe

p.s. I'm on a business trip in Asia again, so my responses may be
delayed a bit.

Hi Joe,
We are using the dblink module on Sensor servers to provide
summarization services to a Central server. Sensors are in the business
of populating certain Postgres databases, and the Central is in the
business of populating a summary Postgres database. The issue in our
situation is that the Central server does an explicit BEGIN transaction
some time before it calls the dblink_open() routine. On the Sensors, we
were getting many "there is already a transaction in progress" warnings,
and overflowing our log storage. Is this patch the right way to go
about this?
Thanks,
Jonathan

#7Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Jonathan Beit-Aharon (#1)
1 attachment(s)
Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

I'm not a member of this list (yet), so please CC me on responses and
discussion. The patch below seems to be completion of work already
started, because the boolean remoteTrFlag was already defined, and all I
had to add was its setting and two references. I hope someone will find
it useful,

Jonathan

I have worked on this issue and have an extensive patch to dblink to fix
it.

The reported problem is that dblink_open/dblink_close() (for cursor
reads) do a BEGIN/COMMIT regardless of the transaction state of the
remote connection. There was code in dblink.c to track the remote
transaction state (rconn), but it was not being maintained or used.

This patch fixes that by routing all connections through an rconn
structure and using the transaction status properly. I removed the
global persistent connection and function-local 'conn' structures in
favor of using rconn consistently. This cleans up a lot of error-prone
code that tried to track what type of connection was being used.

I don't know if people want this for 8.1 or 8.2.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

Attachments:

/pgpatches/dblinktext/plainDownload
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.43
diff -c -c -r1.43 dblink.c
*** contrib/dblink/dblink.c	30 May 2005 23:09:06 -0000	1.43
--- contrib/dblink/dblink.c	6 Oct 2005 04:18:34 -0000
***************
*** 60,67 ****
  
  typedef struct remoteConn
  {
! 	PGconn	   *con;			/* Hold the remote connection */
! 	bool		remoteTrFlag;	/* Indicates whether or not a transaction
  								 * on remote database is in progress */
  }	remoteConn;
  
--- 60,67 ----
  
  typedef struct remoteConn
  {
! 	PGconn	   *conn;			/* Hold the remote connection */
! 	bool		remoteXactOpen;	/* Indicates whether or not a transaction
  								 * on remote database is in progress */
  }	remoteConn;
  
***************
*** 86,110 ****
  /* Global */
  List	   *res_id = NIL;
  int			res_id_index = 0;
- PGconn	   *persistent_conn = NULL;
  static HTAB *remoteConnHash = NULL;
  
  /*
! Following is list that holds multiple remote connections.
! Calling convention of each dblink function changes to accept
! connection name as the first parameter. The connection list is
! much like ecpg e.g. a mapping between a name and a PGconn object.
  */
  
  typedef struct remoteConnHashEnt
  {
  	char		name[NAMEDATALEN];
! 	remoteConn *rcon;
  }	remoteConnHashEnt;
  
  /* initial number of connection hashes */
  #define NUMCONN 16
  
  /* general utility */
  #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
  #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
--- 86,115 ----
  /* Global */
  List	   *res_id = NIL;
  int			res_id_index = 0;
  static HTAB *remoteConnHash = NULL;
  
  /*
!  *	Following is list that holds multiple remote connections.
!  *	Calling convention of each dblink function changes to accept
!  *	connection name as the first parameter. The connection list is
!  *	much like ecpg e.g. a mapping between a name and a PGconn object.
  */
  
  typedef struct remoteConnHashEnt
  {
  	char		name[NAMEDATALEN];
! 	remoteConn *rconn;	/* EMPTY_CONNECTION_NAME also possible */
  }	remoteConnHashEnt;
  
  /* initial number of connection hashes */
  #define NUMCONN 16
  
+ /*
+  *	Because the argument protocol is V1, no connection name behaves
+  *	the same as a NULL-passed connection name
+  */
+ #define	EMPTY_CONNECTION_NAME	""
+ 
  /* general utility */
  #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
  #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
***************
*** 116,131 ****
  			var_ = NULL; \
  		} \
  	} while (0)
  #define DBLINK_RES_INTERNALERROR(p2) \
  	do { \
! 			msg = pstrdup(PQerrorMessage(conn)); \
  			if (res) \
  				PQclear(res); \
  			elog(ERROR, "%s: %s", p2, msg); \
  	} while (0)
  #define DBLINK_RES_ERROR(p2) \
  	do { \
! 			msg = pstrdup(PQerrorMessage(conn)); \
  			if (res) \
  				PQclear(res); \
  			ereport(ERROR, \
--- 121,138 ----
  			var_ = NULL; \
  		} \
  	} while (0)
+ 
  #define DBLINK_RES_INTERNALERROR(p2) \
  	do { \
! 			msg = pstrdup(PQerrorMessage(rconn->conn)); \
  			if (res) \
  				PQclear(res); \
  			elog(ERROR, "%s: %s", p2, msg); \
  	} while (0)
+ 
  #define DBLINK_RES_ERROR(p2) \
  	do { \
! 			msg = pstrdup(PQerrorMessage(rconn->conn)); \
  			if (res) \
  				PQclear(res); \
  			ereport(ERROR, \
***************
*** 133,141 ****
  					 errmsg("%s", p2), \
  					 errdetail("%s", msg))); \
  	} while (0)
  #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
  	do { \
! 			msg = pstrdup(PQerrorMessage(conn)); \
  			if (res) \
  				PQclear(res); \
  			ereport(NOTICE, \
--- 140,149 ----
  					 errmsg("%s", p2), \
  					 errdetail("%s", msg))); \
  	} while (0)
+ 
  #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
  	do { \
! 			msg = pstrdup(PQerrorMessage(rconn->conn)); \
  			if (res) \
  				PQclear(res); \
  			ereport(NOTICE, \
***************
*** 143,151 ****
  					 errmsg("%s", p2), \
  					 errdetail("%s", msg))); \
  	} while (0)
  #define DBLINK_CONN_NOT_AVAIL \
  	do { \
! 		if(conname) \
  			ereport(ERROR, \
  					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
  					 errmsg("connection \"%s\" not available", conname))); \
--- 151,160 ----
  					 errmsg("%s", p2), \
  					 errdetail("%s", msg))); \
  	} while (0)
+ 
  #define DBLINK_CONN_NOT_AVAIL \
  	do { \
! 		if (conname && strlen(conname) != 0) \
  			ereport(ERROR, \
  					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
  					 errmsg("connection \"%s\" not available", conname))); \
***************
*** 154,181 ****
  					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
  					 errmsg("connection not available"))); \
  	} while (0)
  #define DBLINK_GET_CONN \
  	do { \
! 			char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \
! 			rcon = getConnectionByName(conname_or_str); \
! 			if(rcon) \
! 			{ \
! 				conn = rcon->con; \
! 			} \
! 			else \
  			{ \
! 				connstr = conname_or_str; \
! 				conn = PQconnectdb(connstr); \
! 				if (PQstatus(conn) == CONNECTION_BAD) \
  				{ \
! 					msg = pstrdup(PQerrorMessage(conn)); \
! 					PQfinish(conn); \
  					ereport(ERROR, \
  							(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
  							 errmsg("could not establish connection"), \
  							 errdetail("%s", msg))); \
  				} \
! 				freeconn = true; \
  			} \
  	} while (0)
  
--- 163,194 ----
  					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
  					 errmsg("connection not available"))); \
  	} while (0)
+ 
  #define DBLINK_GET_CONN \
  	do { \
! 			rconn = getConnectionByName(conname); \
! 			if(!rconn) \
  			{ \
! 				/* \
! 				 *	Does not match connection name, so must be conn string. \
! 				 *	Create an rconn structure that we will free before the \
! 				 *	function completes.  Don't bother storing it in the hash. \
! 				 */ \
! 				rconn = (remoteConn *) palloc(sizeof(remoteConn)); \
! 				rconn->conn = PQconnectdb(conname); \
! 				rconn->remoteXactOpen = false; \
! 				conname = EMPTY_CONNECTION_NAME; \
! 				if (PQstatus(rconn->conn) == CONNECTION_BAD) \
  				{ \
! 					msg = pstrdup(PQerrorMessage(rconn->conn)); \
! 					PQfinish(rconn->conn); \
! 					pfree(rconn); \
  					ereport(ERROR, \
  							(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
  							 errmsg("could not establish connection"), \
  							 errdetail("%s", msg))); \
  				} \
! 				rconn_is_local = true; \
  			} \
  	} while (0)
  
***************
*** 187,221 ****
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
! 	char	   *connstr = NULL;
! 	char	   *connname = NULL;
  	char	   *msg;
  	MemoryContext oldcontext;
! 	PGconn	   *conn = NULL;
! 	remoteConn *rcon = NULL;
  
  	if (PG_NARGS() == 2)
  	{
  		connstr = GET_STR(PG_GETARG_TEXT_P(1));
- 		connname = GET_STR(PG_GETARG_TEXT_P(0));
  	}
! 	else if (PG_NARGS() == 1)
  		connstr = GET_STR(PG_GETARG_TEXT_P(0));
! 
  	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
  
! 	if (connname)
! 		rcon = (remoteConn *) palloc(sizeof(remoteConn));
! 	conn = PQconnectdb(connstr);
  
  	MemoryContextSwitchTo(oldcontext);
  
! 	if (PQstatus(conn) == CONNECTION_BAD)
  	{
! 		msg = pstrdup(PQerrorMessage(conn));
! 		PQfinish(conn);
! 		if (rcon)
! 			pfree(rcon);
  
  		ereport(ERROR,
  		   (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
--- 200,236 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
! 	char	   *connstr;
! 	char	   *conname;
  	char	   *msg;
  	MemoryContext oldcontext;
! 	remoteConn *rconn;
  
  	if (PG_NARGS() == 2)
  	{
+ 		conname = GET_STR(PG_GETARG_TEXT_P(0));
  		connstr = GET_STR(PG_GETARG_TEXT_P(1));
  	}
! 	else
! 	{
! 		Assert(PG_NARGS() == 1);
! 		conname = EMPTY_CONNECTION_NAME;
  		connstr = GET_STR(PG_GETARG_TEXT_P(0));
! 	}
! 		
  	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
  
! 	rconn = (remoteConn *) palloc(sizeof(remoteConn));
! 
! 	rconn->conn = PQconnectdb(connstr);
  
  	MemoryContextSwitchTo(oldcontext);
  
! 	if (PQstatus(rconn->conn) == CONNECTION_BAD)
  	{
! 		msg = pstrdup(PQerrorMessage(rconn->conn));
! 		PQfinish(rconn->conn);
! 		pfree(rconn);
  
  		ereport(ERROR,
  		   (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
***************
*** 223,235 ****
  			errdetail("%s", msg)));
  	}
  
! 	if (connname)
! 	{
! 		rcon->con = conn;
! 		createNewConnection(connname, rcon);
! 	}
! 	else
! 		persistent_conn = conn;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 238,244 ----
  			errdetail("%s", msg)));
  	}
  
! 	createNewConnection(conname, rconn);
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 241,271 ****
  Datum
  dblink_disconnect(PG_FUNCTION_ARGS)
  {
! 	char	   *conname = NULL;
! 	remoteConn *rcon = NULL;
! 	PGconn	   *conn = NULL;
  
  	if (PG_NARGS() == 1)
- 	{
  		conname = GET_STR(PG_GETARG_TEXT_P(0));
- 		rcon = getConnectionByName(conname);
- 		if (rcon)
- 			conn = rcon->con;
- 	}
  	else
! 		conn = persistent_conn;
  
! 	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	PQfinish(conn);
! 	if (rcon)
! 	{
! 		deleteConnection(conname);
! 		pfree(rcon);
! 	}
! 	else
! 		persistent_conn = NULL;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 250,275 ----
  Datum
  dblink_disconnect(PG_FUNCTION_ARGS)
  {
! 	char	   *conname;
! 	remoteConn *rconn;
  
  	if (PG_NARGS() == 1)
  		conname = GET_STR(PG_GETARG_TEXT_P(0));
  	else
! 	{
! 		Assert(PG_NARGS() == 0);
! 		conname = EMPTY_CONNECTION_NAME;
! 	}
! 	
! 	rconn = getConnectionByName(conname);
  
! 	if (!rconn || !rconn->conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	PQfinish(rconn->conn);
! 
! 	deleteConnection(conname);
! 	pfree(rconn);
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 277,342 ****
  Datum
  dblink_open(PG_FUNCTION_ARGS)
  {
  	char	   *msg;
! 	PGresult   *res = NULL;
! 	PGconn	   *conn = NULL;
! 	char	   *curname = NULL;
! 	char	   *sql = NULL;
! 	char	   *conname = NULL;
  	StringInfo	str = makeStringInfo();
- 	remoteConn *rcon = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
! 	if (PG_NARGS() == 2)
  	{
! 		/* text,text */
! 		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		sql = GET_STR(PG_GETARG_TEXT_P(1));
! 		conn = persistent_conn;
  	}
  	else if (PG_NARGS() == 3)
  	{
  		/* might be text,text,text or text,text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
  		{
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
- 			conn = persistent_conn;
  		}
  		else
  		{
  			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			sql = GET_STR(PG_GETARG_TEXT_P(2));
- 			rcon = getConnectionByName(conname);
- 			if (rcon)
- 				conn = rcon->con;
  		}
  	}
! 	else if (PG_NARGS() == 4)
  	{
! 		/* text,text,text,bool */
! 		conname = GET_STR(PG_GETARG_TEXT_P(0));
! 		curname = GET_STR(PG_GETARG_TEXT_P(1));
! 		sql = GET_STR(PG_GETARG_TEXT_P(2));
! 		fail = PG_GETARG_BOOL(3);
! 		rcon = getConnectionByName(conname);
! 		if (rcon)
! 			conn = rcon->con;
  	}
  
! 	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(conn, "BEGIN");
! 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		DBLINK_RES_INTERNALERROR("begin error");
  
! 	PQclear(res);
  
  	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
! 	res = PQexec(conn, str->data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
  		if (fail)
--- 281,345 ----
  Datum
  dblink_open(PG_FUNCTION_ARGS)
  {
+ 	char	   *curname;
+ 	char	   *conname;
+ 	remoteConn *rconn;
  	char	   *msg;
! 	PGresult   *res;
! 	char	   *sql;
  	StringInfo	str = makeStringInfo();
  	bool		fail = true;	/* default to backward compatible behavior */
  
! 	if (PG_NARGS() == 4)
  	{
! 		/* text,text,text,bool */
! 		conname = GET_STR(PG_GETARG_TEXT_P(0));
! 		curname = GET_STR(PG_GETARG_TEXT_P(1));
! 		sql = GET_STR(PG_GETARG_TEXT_P(2));
! 		fail = PG_GETARG_BOOL(3);
  	}
  	else if (PG_NARGS() == 3)
  	{
  		/* might be text,text,text or text,text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
  		{
+ 			conname = EMPTY_CONNECTION_NAME;
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
  		}
  		else
  		{
  			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			sql = GET_STR(PG_GETARG_TEXT_P(2));
  		}
  	}
! 	else
  	{
! 		/* text,text */
! 		Assert(PG_NARGS() == 2);
! 		conname = EMPTY_CONNECTION_NAME;
! 		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		sql = GET_STR(PG_GETARG_TEXT_P(1));
  	}
  
! 	rconn = getConnectionByName(conname);
! 	if (!rconn || !rconn->conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	rconn->remoteXactOpen = (PQtransactionStatus(rconn->conn) != PQTRANS_IDLE);
  
! 	if (!rconn->remoteXactOpen)
! 	{
! 		res = PQexec(rconn->conn, "BEGIN");
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 			DBLINK_RES_INTERNALERROR("begin error");
! 		PQclear(res);
! 	}
  
  	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
! 	res = PQexec(rconn->conn, str->data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
  		if (fail)
***************
*** 359,415 ****
  Datum
  dblink_close(PG_FUNCTION_ARGS)
  {
! 	PGconn	   *conn = NULL;
! 	PGresult   *res = NULL;
! 	char	   *curname = NULL;
! 	char	   *conname = NULL;
  	StringInfo	str = makeStringInfo();
  	char	   *msg;
- 	remoteConn *rcon = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
! 	if (PG_NARGS() == 1)
  	{
! 		/* text */
! 		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		conn = persistent_conn;
  	}
  	else if (PG_NARGS() == 2)
  	{
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
- 			conn = persistent_conn;
  		}
  		else
  		{
  			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
- 			rcon = getConnectionByName(conname);
- 			if (rcon)
- 				conn = rcon->con;
  		}
  	}
! 	if (PG_NARGS() == 3)
  	{
! 		/* text,text,bool */
! 		conname = GET_STR(PG_GETARG_TEXT_P(0));
! 		curname = GET_STR(PG_GETARG_TEXT_P(1));
! 		fail = PG_GETARG_BOOL(2);
! 		rcon = getConnectionByName(conname);
! 		if (rcon)
! 			conn = rcon->con;
  	}
  
! 	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
  	appendStringInfo(str, "CLOSE %s", curname);
  
  	/* close the cursor */
! 	res = PQexec(conn, str->data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
  		if (fail)
--- 362,413 ----
  Datum
  dblink_close(PG_FUNCTION_ARGS)
  {
! 	char	   *curname;
! 	char	   *conname;
! 	remoteConn *rconn;
  	StringInfo	str = makeStringInfo();
+ 	PGresult   *res;
  	char	   *msg;
  	bool		fail = true;	/* default to backward compatible behavior */
  
! 	if (PG_NARGS() == 3)
  	{
! 		/* text,text,bool */
! 		conname = GET_STR(PG_GETARG_TEXT_P(0));
! 		curname = GET_STR(PG_GETARG_TEXT_P(1));
! 		fail = PG_GETARG_BOOL(2);
  	}
  	else if (PG_NARGS() == 2)
  	{
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
+ 			conname = EMPTY_CONNECTION_NAME;
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
  		else
  		{
  			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  		}
  	}
! 	else
  	{
! 		/* text */
! 		Assert(PG_NARGS() == 1);
! 		conname = EMPTY_CONNECTION_NAME;
! 		curname = GET_STR(PG_GETARG_TEXT_P(0));
  	}
  
! 	rconn = getConnectionByName(conname);
! 	if (!rconn || !rconn->conn)
  		DBLINK_CONN_NOT_AVAIL;
  
  	appendStringInfo(str, "CLOSE %s", curname);
  
  	/* close the cursor */
! 	res = PQexec(rconn->conn, str->data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
  		if (fail)
***************
*** 423,434 ****
  
  	PQclear(res);
  
! 	/* commit the transaction */
! 	res = PQexec(conn, "COMMIT");
! 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		DBLINK_RES_INTERNALERROR("commit error");
! 
! 	PQclear(res);
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 421,435 ----
  
  	PQclear(res);
  
! 	/* We are using the Xact status we recorded during the open */
! 	if (!rconn->remoteXactOpen)
! 	{
! 		/* commit the transaction */
! 		res = PQexec(rconn->conn, "COMMIT");
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 			DBLINK_RES_INTERNALERROR("commit error");
! 		PQclear(res);
! 	}
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 440,445 ****
--- 441,448 ----
  Datum
  dblink_fetch(PG_FUNCTION_ARGS)
  {
+ 	char	   *conname = NULL;
+ 	remoteConn *rconn = NULL;
  	FuncCallContext *funcctx;
  	TupleDesc	tupdesc = NULL;
  	int			call_cntr;
***************
*** 448,462 ****
  	char	   *msg;
  	PGresult   *res = NULL;
  	MemoryContext oldcontext;
- 	char	   *conname = NULL;
- 	remoteConn *rcon = NULL;
  
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
! 		PGconn	   *conn = NULL;
  		StringInfo	str = makeStringInfo();
- 		char	   *curname = NULL;
  		int			howmany = 0;
  		bool		fail = true;	/* default to backward compatible */
  
--- 451,462 ----
  	char	   *msg;
  	PGresult   *res = NULL;
  	MemoryContext oldcontext;
  
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
! 		char	   *curname;
  		StringInfo	str = makeStringInfo();
  		int			howmany = 0;
  		bool		fail = true;	/* default to backward compatible */
  
***************
*** 467,507 ****
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			howmany = PG_GETARG_INT32(2);
  			fail = PG_GETARG_BOOL(3);
- 
- 			rcon = getConnectionByName(conname);
- 			if (rcon)
- 				conn = rcon->con;
  		}
  		else if (PG_NARGS() == 3)
  		{
  			/* text,text,int or text,int,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
  			{
  				curname = GET_STR(PG_GETARG_TEXT_P(0));
  				howmany = PG_GETARG_INT32(1);
  				fail = PG_GETARG_BOOL(2);
- 				conn = persistent_conn;
  			}
  			else
  			{
  				conname = GET_STR(PG_GETARG_TEXT_P(0));
  				curname = GET_STR(PG_GETARG_TEXT_P(1));
  				howmany = PG_GETARG_INT32(2);
- 
- 				rcon = getConnectionByName(conname);
- 				if (rcon)
- 					conn = rcon->con;
  			}
  		}
! 		else if (PG_NARGS() == 2)
  		{
  			/* text,int */
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			howmany = PG_GETARG_INT32(1);
- 			conn = persistent_conn;
  		}
  
! 		if (!conn)
  			DBLINK_CONN_NOT_AVAIL;
  
  		/* create a function context for cross-call persistence */
--- 467,501 ----
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			howmany = PG_GETARG_INT32(2);
  			fail = PG_GETARG_BOOL(3);
  		}
  		else if (PG_NARGS() == 3)
  		{
  			/* text,text,int or text,int,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
  			{
+ 				conname = EMPTY_CONNECTION_NAME;
  				curname = GET_STR(PG_GETARG_TEXT_P(0));
  				howmany = PG_GETARG_INT32(1);
  				fail = PG_GETARG_BOOL(2);
  			}
  			else
  			{
  				conname = GET_STR(PG_GETARG_TEXT_P(0));
  				curname = GET_STR(PG_GETARG_TEXT_P(1));
  				howmany = PG_GETARG_INT32(2);
  			}
  		}
! 		else
  		{
  			/* text,int */
+ 			Assert(PG_NARGS() == 2);
+ 			conname = EMPTY_CONNECTION_NAME;
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			howmany = PG_GETARG_INT32(1);
  		}
  
! 		rconn = getConnectionByName(conname);
! 		if (!rconn || !rconn->conn)
  			DBLINK_CONN_NOT_AVAIL;
  
  		/* create a function context for cross-call persistence */
***************
*** 515,521 ****
  
  		appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
  
! 		res = PQexec(conn, str->data);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 509,515 ----
  
  		appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
  
! 		res = PQexec(rconn->conn, str->data);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 638,657 ****
  	int			max_calls;
  	AttInMetadata *attinmeta;
  	char	   *msg;
! 	PGresult   *res = NULL;
  	bool		is_sql_cmd = false;
  	char	   *sql_cmd_status = NULL;
  	MemoryContext oldcontext;
! 	bool		freeconn = false;
  
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
! 		PGconn	   *conn = NULL;
! 		char	   *connstr = NULL;
  		char	   *sql = NULL;
- 		char	   *conname = NULL;
- 		remoteConn *rcon = NULL;
  		bool		fail = true;	/* default to backward compatible */
  
  		/* create a function context for cross-call persistence */
--- 632,649 ----
  	int			max_calls;
  	AttInMetadata *attinmeta;
  	char	   *msg;
! 	PGresult   *res;
  	bool		is_sql_cmd = false;
  	char	   *sql_cmd_status = NULL;
  	MemoryContext oldcontext;
! 	bool		rconn_is_local = false;
  
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
! 		char	   *conname;
! 		remoteConn *rconn;
  		char	   *sql = NULL;
  		bool		fail = true;	/* default to backward compatible */
  
  		/* create a function context for cross-call persistence */
***************
*** 666,704 ****
  		if (PG_NARGS() == 3)
  		{
  			/* text,text,bool */
! 			DBLINK_GET_CONN;
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
  		}
  		else if (PG_NARGS() == 2)
  		{
  			/* text,text or text,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conn = persistent_conn;
  				sql = GET_STR(PG_GETARG_TEXT_P(0));
  				fail = PG_GETARG_BOOL(1);
  			}
  			else
  			{
! 				DBLINK_GET_CONN;
  				sql = GET_STR(PG_GETARG_TEXT_P(1));
  			}
  		}
! 		else if (PG_NARGS() == 1)
  		{
  			/* text */
! 			conn = persistent_conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  		}
- 		else
- 			/* shouldn't happen */
- 			elog(ERROR, "wrong number of arguments");
  
! 		if (!conn)
  			DBLINK_CONN_NOT_AVAIL;
  
! 		res = PQexec(conn, sql);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 658,698 ----
  		if (PG_NARGS() == 3)
  		{
  			/* text,text,bool */
! 			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
+ 			DBLINK_GET_CONN;
  		}
  		else if (PG_NARGS() == 2)
  		{
  			/* text,text or text,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conname = EMPTY_CONNECTION_NAME;
! 				rconn = getConnectionByName(conname);
  				sql = GET_STR(PG_GETARG_TEXT_P(0));
  				fail = PG_GETARG_BOOL(1);
  			}
  			else
  			{
! 				conname = GET_STR(PG_GETARG_TEXT_P(0));
  				sql = GET_STR(PG_GETARG_TEXT_P(1));
+ 				DBLINK_GET_CONN;
  			}
  		}
! 		else
  		{
  			/* text */
! 			Assert(PG_NARGS() == 1);
! 			conname = EMPTY_CONNECTION_NAME;
! 			rconn = getConnectionByName(conname);
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  		}
  
! 		if (!rconn || !rconn->conn)
  			DBLINK_CONN_NOT_AVAIL;
  
! 		res = PQexec(rconn->conn, sql);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 708,715 ****
  			else
  			{
  				DBLINK_RES_ERROR_AS_NOTICE("sql error");
! 				if (freeconn)
! 					PQfinish(conn);
  				SRF_RETURN_DONE(funcctx);
  			}
  		}
--- 702,712 ----
  			else
  			{
  				DBLINK_RES_ERROR_AS_NOTICE("sql error");
! 				if (rconn_is_local)
! 				{
! 					PQfinish(rconn->conn);
! 					pfree(rconn);
! 				}
  				SRF_RETURN_DONE(funcctx);
  			}
  		}
***************
*** 736,744 ****
  		/* got results, keep track of them */
  		funcctx->user_fctx = res;
  
! 		/* if needed, close the connection to the database and cleanup */
! 		if (freeconn)
! 			PQfinish(conn);
  
  		/* fast track when no results */
  		if (funcctx->max_calls < 1)
--- 733,743 ----
  		/* got results, keep track of them */
  		funcctx->user_fctx = res;
  
! 		if (rconn_is_local)
! 		{
! 			PQfinish(rconn->conn);
! 			pfree(rconn);
! 		}
  
  		/* fast track when no results */
  		if (funcctx->max_calls < 1)
***************
*** 846,895 ****
  	PGresult   *res = NULL;
  	text	   *sql_cmd_status = NULL;
  	TupleDesc	tupdesc = NULL;
- 	PGconn	   *conn = NULL;
- 	char	   *connstr = NULL;
  	char	   *sql = NULL;
  	char	   *conname = NULL;
! 	remoteConn *rcon = NULL;
! 	bool		freeconn = false;
  	bool		fail = true;	/* default to backward compatible behavior */
  
  	if (PG_NARGS() == 3)
  	{
  		/* must be text,text,bool */
! 		DBLINK_GET_CONN;
  		sql = GET_STR(PG_GETARG_TEXT_P(1));
  		fail = PG_GETARG_BOOL(2);
  	}
  	else if (PG_NARGS() == 2)
  	{
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
! 			conn = persistent_conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
  		else
  		{
! 			DBLINK_GET_CONN;
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  		}
  	}
! 	else if (PG_NARGS() == 1)
  	{
  		/* must be single text argument */
! 		conn = persistent_conn;
  		sql = GET_STR(PG_GETARG_TEXT_P(0));
  	}
- 	else
- 		/* shouldn't happen */
- 		elog(ERROR, "wrong number of arguments");
  
! 	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(conn, sql);
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 845,894 ----
  	PGresult   *res = NULL;
  	text	   *sql_cmd_status = NULL;
  	TupleDesc	tupdesc = NULL;
  	char	   *sql = NULL;
  	char	   *conname = NULL;
! 	remoteConn *rconn = NULL;
! 	bool		rconn_is_local = false;
  	bool		fail = true;	/* default to backward compatible behavior */
  
  	if (PG_NARGS() == 3)
  	{
  		/* must be text,text,bool */
! 		conname = GET_STR(PG_GETARG_TEXT_P(0));
  		sql = GET_STR(PG_GETARG_TEXT_P(1));
  		fail = PG_GETARG_BOOL(2);
+ 		DBLINK_GET_CONN;
  	}
  	else if (PG_NARGS() == 2)
  	{
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
! 			conname = EMPTY_CONNECTION_NAME;
! 			rconn = getConnectionByName(conname);
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
  		else
  		{
! 			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
+ 			DBLINK_GET_CONN;
  		}
  	}
! 	else
  	{
  		/* must be single text argument */
! 		Assert(PG_NARGS() == 1);
! 		conname = EMPTY_CONNECTION_NAME;
! 		rconn = getConnectionByName(conname);
  		sql = GET_STR(PG_GETARG_TEXT_P(0));
  	}
  
! 	if (!rconn || !rconn->conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(rconn->conn, sql);
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 933,941 ****
  			   errmsg("statement returning results not allowed")));
  	}
  
! 	/* if needed, close the connection to the database and cleanup */
! 	if (freeconn)
! 		PQfinish(conn);
  
  	PG_RETURN_TEXT_P(sql_cmd_status);
  }
--- 932,942 ----
  			   errmsg("statement returning results not allowed")));
  	}
  
! 	if (rconn_is_local)
! 	{
! 		PQfinish(rconn->conn);
! 		pfree(rconn);
! 	}
  
  	PG_RETURN_TEXT_P(sql_cmd_status);
  }
***************
*** 1864,1870 ****
  	char	   *relname;
  	TupleDesc	tupdesc;
  	StringInfo	str = makeStringInfo();
! 	char	   *sql = NULL;
  	int			ret;
  	HeapTuple	tuple;
  	int			i;
--- 1865,1871 ----
  	char	   *relname;
  	TupleDesc	tupdesc;
  	StringInfo	str = makeStringInfo();
! 	char	   *sql;
  	int			ret;
  	HeapTuple	tuple;
  	int			i;
***************
*** 2022,2028 ****
  											   key, HASH_FIND, NULL);
  
  	if (hentry)
! 		return (hentry->rcon);
  
  	return (NULL);
  }
--- 2023,2029 ----
  											   key, HASH_FIND, NULL);
  
  	if (hentry)
! 		return (hentry->rconn);
  
  	return (NULL);
  }
***************
*** 2039,2045 ****
  }
  
  static void
! createNewConnection(const char *name, remoteConn * con)
  {
  	remoteConnHashEnt *hentry;
  	bool		found;
--- 2040,2046 ----
  }
  
  static void
! createNewConnection(const char *name, remoteConn *rconn)
  {
  	remoteConnHashEnt *hentry;
  	bool		found;
***************
*** 2058,2064 ****
  				(errcode(ERRCODE_DUPLICATE_OBJECT),
  				 errmsg("duplicate connection name")));
  
! 	hentry->rcon = con;
  	strncpy(hentry->name, name, NAMEDATALEN - 1);
  }
  
--- 2059,2065 ----
  				(errcode(ERRCODE_DUPLICATE_OBJECT),
  				 errmsg("duplicate connection name")));
  
! 	hentry->rconn = rconn;
  	strncpy(hentry->name, name, NAMEDATALEN - 1);
  }
  
#8David Fetter
david@fetter.org
In reply to: Bruce Momjian (#7)
Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

On Thu, Oct 06, 2005 at 10:38:54PM -0400, Bruce Momjian wrote:

I'm not a member of this list (yet), so please CC me on responses
and discussion. The patch below seems to be completion of work
already started, because the boolean remoteTrFlag was already
defined, and all I had to add was its setting and two references.
I hope someone will find it useful,

Jonathan

I have worked on this issue and have an extensive patch to dblink to
fix it.

The reported problem is that dblink_open/dblink_close() (for cursor
reads) do a BEGIN/COMMIT regardless of the transaction state of the
remote connection. There was code in dblink.c to track the remote
transaction state (rconn), but it was not being maintained or used.

This patch fixes that by routing all connections through an rconn
structure and using the transaction status properly. I removed the
global persistent connection and function-local 'conn' structures in
favor of using rconn consistently. This cleans up a lot of
error-prone code that tried to track what type of connection was
being used.

I don't know if people want this for 8.1 or 8.2.

8.1, IMHO. It's a bug fix. :)

Cheers,
D
--
David Fetter david@fetter.org http://fetter.org/
phone: +1 510 893 6100 mobile: +1 415 235 3778

Remember to vote!

#9Tom Lane
tgl@sss.pgh.pa.us
In reply to: David Fetter (#8)
Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

David Fetter <david@fetter.org> writes:

On Thu, Oct 06, 2005 at 10:38:54PM -0400, Bruce Momjian wrote:

I don't know if people want this for 8.1 or 8.2.

8.1, IMHO. It's a bug fix. :)

Not unless Joe Conway signs off on it ...

regards, tom lane

#10David Fetter
david@fetter.org
In reply to: Tom Lane (#9)
Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

On Thu, Oct 06, 2005 at 11:31:46PM -0400, Tom Lane wrote:

David Fetter <david@fetter.org> writes:

On Thu, Oct 06, 2005 at 10:38:54PM -0400, Bruce Momjian wrote:

I don't know if people want this for 8.1 or 8.2.

8.1, IMHO. It's a bug fix. :)

Not unless Joe Conway signs off on it ...

Of course it's his to sign or not :)

Cheers,
D
--
David Fetter david@fetter.org http://fetter.org/
phone: +1 510 893 6100 mobile: +1 415 235 3778

Remember to vote!

#11Joe Conway
mail@joeconway.com
In reply to: Tom Lane (#9)
Re: [HACKERS] Patching dblink.c to avoid warning about

Tom Lane wrote:

David Fetter <david@fetter.org> writes:

On Thu, Oct 06, 2005 at 10:38:54PM -0400, Bruce Momjian wrote:

I don't know if people want this for 8.1 or 8.2.

8.1, IMHO. It's a bug fix. :)

Not unless Joe Conway signs off on it ...

I had asked on the original simple patch, and I'll ask again now -- why
is this needed? I don't have a clear understanding of the problem that
this (or the earlier) patch is trying to solve. Without that, it is
impossible to say whether it is a bug fix or feature.

Joe

#12Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Joe Conway (#11)
Re: [HACKERS] Patching dblink.c to avoid warning about open

Joe Conway wrote:

Tom Lane wrote:

David Fetter <david@fetter.org> writes:

On Thu, Oct 06, 2005 at 10:38:54PM -0400, Bruce Momjian wrote:

I don't know if people want this for 8.1 or 8.2.

8.1, IMHO. It's a bug fix. :)

Not unless Joe Conway signs off on it ...

I had asked on the original simple patch, and I'll ask again now -- why
is this needed? I don't have a clear understanding of the problem that
this (or the earlier) patch is trying to solve. Without that, it is
impossible to say whether it is a bug fix or feature.

Well, as I said in the patch email:

The reported problem is that dblink_open/dblink_close() (for cursor
reads) do a BEGIN/COMMIT regardless of the transaction state of the
remote connection. There was code in dblink.c to track the remote
transaction state (rconn), but it was not being maintained or used.

If a remote transactions has already been started by the user,
dblink_open is going to call BEGIN, which is going to fail because there
is already an open transaction, right?

Here is an crude example:

test=> BEGIN;
BEGIN
test=> begin;
WARNING: there is already a transaction in progress
BEGIN
test=> SELECT 1;
?column?
----------
1
(1 row)

test=> COMMIT;
COMMIT
test=> COMMIT;
WARNING: there is no transaction in progress
COMMIT

The BEGIN and COMMIT are only a warning though, but the early COMMIT by
the dblink_close() is a serious issue.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
#13Tom Lane
tgl@sss.pgh.pa.us
In reply to: Bruce Momjian (#12)
Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

Bruce Momjian <pgman@candle.pha.pa.us> writes:

Well, as I said in the patch email:

The reported problem is that dblink_open/dblink_close() (for cursor
reads) do a BEGIN/COMMIT regardless of the transaction state of the
remote connection. There was code in dblink.c to track the remote
transaction state (rconn), but it was not being maintained or used.

You should lose the remoteXactOpen flag entirely, in favor of just
testing PQtransactionStatus() on-the-fly when necessary. Simpler,
more reliable, not notably slower.

With that change, the separate remoteConn struct could be dropped
altogether in favor of just using the PGconn pointer. This would
make things notationally simpler, and in fact perhaps allow undoing
the bulk of the edits in your patch. As-is I think the patch is
pretty risky to apply during beta.

regards, tom lane

#14Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Tom Lane (#13)
Re: [HACKERS] Patching dblink.c to avoid warning about open

Tom Lane wrote:

Bruce Momjian <pgman@candle.pha.pa.us> writes:

Well, as I said in the patch email:

The reported problem is that dblink_open/dblink_close() (for cursor
reads) do a BEGIN/COMMIT regardless of the transaction state of the
remote connection. There was code in dblink.c to track the remote
transaction state (rconn), but it was not being maintained or used.

You should lose the remoteXactOpen flag entirely, in favor of just
testing PQtransactionStatus() on-the-fly when necessary. Simpler,
more reliable, not notably slower.

With that change, the separate remoteConn struct could be dropped
altogether in favor of just using the PGconn pointer. This would
make things notationally simpler, and in fact perhaps allow undoing
the bulk of the edits in your patch. As-is I think the patch is
pretty risky to apply during beta.

The problem with not using rconn is that we are not saving the
transaction status at the _start_ of the cursor open. If we don't do
that, we have no way of knowing on close if _we_ opened the transaction
or whether it was opened by the user.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
#15Tom Lane
tgl@sss.pgh.pa.us
In reply to: Bruce Momjian (#14)
Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

Bruce Momjian <pgman@candle.pha.pa.us> writes:

The problem with not using rconn is that we are not saving the
transaction status at the _start_ of the cursor open. If we don't do
that, we have no way of knowing on close if _we_ opened the transaction
or whether it was opened by the user.

Oh, I see. In that case the flag is horribly misnamed and miscommented.
Something like "newXactForCursor" would be clearer.

regards, tom lane

#16Bruce Momjian
pgman@candle.pha.pa.us
In reply to: Tom Lane (#15)
Re: [HACKERS] Patching dblink.c to avoid warning about open

Tom Lane wrote:

Bruce Momjian <pgman@candle.pha.pa.us> writes:

The problem with not using rconn is that we are not saving the
transaction status at the _start_ of the cursor open. If we don't do
that, we have no way of knowing on close if _we_ opened the transaction
or whether it was opened by the user.

Oh, I see. In that case the flag is horribly misnamed and miscommented.
Something like "newXactForCursor" would be clearer.

OK, renamed to isClientXact.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073