walreceiver is uninterruptible on win32

Started by Fujii Masaoalmost 16 years ago33 messages
#1Fujii Masao
masao.fujii@gmail.com
1 attachment(s)

Hi,

http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.

I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

win32_interruptible_walreceiver_v1.patchtext/x-patch; charset=US-ASCII; name=win32_interruptible_walreceiver_v1.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 18,23 ****
--- 18,24 ----
  
  #include <unistd.h>
  #include <sys/time.h>
+ #include <time.h>
  
  #include "libpq-fe.h"
  #include "access/xlog.h"
***************
*** 53,59 **** static bool libpqrcv_receive(int timeout, unsigned char *type,
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
! static bool libpq_select(int timeout_ms);
  
  /*
   * Module load callback
--- 54,61 ----
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
! static int libpq_select(bool forRead, bool forWrite, int timeout_ms);
! static PGresult *libpqrcv_PQexec(const char *query);
  
  /*
   * Module load callback
***************
*** 83,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	TimeLineID	standby_tli;
  	PGresult   *res;
  	char		cmd[64];
  
  	/* Connect */
  	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
  
! 	streamConn = PQconnectdb(conninfo_repl);
! 	if (PQstatus(streamConn) != CONNECTION_OK)
  		ereport(ERROR,
  				(errmsg("could not connect to the primary server : %s",
  						PQerrorMessage(streamConn))));
  
  	/*
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
--- 85,198 ----
  	TimeLineID	standby_tli;
  	PGresult   *res;
  	char		cmd[64];
+ 	PQconninfoOption   *options;
+ 	time_t		finish_time = ((time_t) -1);
+ 
+ 	/*
+ 	 * Extract timeout from the connection string
+ 	 */
+ 	options = PQconninfoParse(conninfo, NULL);
+ 	if (options)
+ 	{
+ 		PQconninfoOption *option;
+ 		for (option = options; option->keyword != NULL; option++)
+ 		{
+ 			if (strcmp(option->keyword, "connect_timeout") == 0)
+ 			{
+ 				if (option->val != NULL && option->val[0] != '\0')
+ 				{
+ 					int		timeout = atoi(option->val);
+ 
+ 					if (timeout > 0)
+ 					{
+ 						/*
+ 						 * Rounding could cause connection to fail;
+ 						 * need at least 2 secs
+ 						 */
+ 						if (timeout < 2)
+ 							timeout = 2;
+ 						/* calculate the finish time based on start + timeout */
+ 						finish_time = time(NULL) + timeout;
+ 					}
+ 				}
+ 			}
+ 		}
+ 		PQconninfoFree(options);
+ 	}
  
  	/* Connect */
  	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
  
! 	streamConn = PQconnectStart(conninfo_repl);
! 	if (PQstatus(streamConn) == CONNECTION_BAD)
  		ereport(ERROR,
  				(errmsg("could not connect to the primary server : %s",
  						PQerrorMessage(streamConn))));
  
  	/*
+ 	 * Wait for connection to be established
+ 	 */
+ 	for (;;)
+ 	{
+ 		PostgresPollingStatusType	status;
+ 		bool		established = false;
+ 		bool		forRead = false;
+ 		bool		forWrite = false;
+ 		int		timeout_ms;
+ 		int		ret;
+ 
+ 		status = PQconnectPoll(streamConn);
+ 		switch (status)
+ 		{
+ 			case PGRES_POLLING_READING:
+ 				forRead = true;
+ 				break;
+ 			case PGRES_POLLING_WRITING:
+ 				forWrite = true;
+ 				break;
+ 			case PGRES_POLLING_OK:
+ 				established = true;
+ 				break;
+ 			case PGRES_POLLING_FAILED:
+ 			default:
+ 				ereport(ERROR,
+ 						(errmsg("could not connect to the primary server : %s",
+ 								PQerrorMessage(streamConn))));
+ 		}
+ 
+ 		if (established)
+ 			break;
+ 
+ 	retry:
+ 		/* Compute appropriate timeout interval */
+ 		if (finish_time == ((time_t) -1))
+ 			timeout_ms = -1;
+ 		else
+ 		{
+ 			time_t		now = time(NULL);
+ 
+ 			if (finish_time > now)
+ 				timeout_ms = (finish_time - now) * 1000;
+ 			else
+ 				timeout_ms = 0;
+ 		}
+ 
+ 		/*
+ 		 * Wait until we can read or write the connection socket
+ 		 */
+ 		ret = libpq_select(forRead, forWrite, timeout_ms);
+ 		if (ret == 0)	/* timeout */
+ 			ereport(ERROR,
+ 					(errmsg("could not connect to the primary server : timeout expired")));
+ 		if (ret < 0) /* interrupted */
+ 			goto retry;
+ 	}
+ 
+ 	/*
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = PQexec(streamConn, cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start XLOG streaming: %s",
  						PQerrorMessage(streamConn))));
  	PQclear(res);
  
  	justconnected = true;
--- 244,257 ----
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = libpqrcv_PQexec(cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
+ 	{
+ 		PQclear(res);
  		ereport(ERROR,
  				(errmsg("could not start XLOG streaming: %s",
  						PQerrorMessage(streamConn))));
+ 	}
  	PQclear(res);
  
  	justconnected = true;
***************
*** 162,176 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  }
  
  /*
!  * Wait until we can read WAL stream, or timeout.
   *
!  * Returns true if data has become available for reading, false if timed out
!  * or interrupted by signal.
   *
   * This is based on pqSocketCheck.
   */
! static bool
! libpq_select(int timeout_ms)
  {
  	int			ret;
  
--- 260,275 ----
  }
  
  /*
!  * Wait until we can read or write the connection socket
   *
!  * Returns >0 if data has been ready to be read, written,
!  * or both, 0 if timed out, -1 if interrupted by signal.
!  * Throws an error if an error occurred.
   *
   * This is based on pqSocketCheck.
   */
! static int
! libpq_select(bool forRead, bool forWrite, int timeout_ms)
  {
  	int			ret;
  
***************
*** 180,203 **** libpq_select(int timeout_ms)
  				(errcode_for_socket_access(),
  				 errmsg("socket not open")));
  
  	/* We use poll(2) if available, otherwise select(2) */
  	{
  #ifdef HAVE_POLL
  		struct pollfd input_fd;
  
  		input_fd.fd = PQsocket(streamConn);
! 		input_fd.events = POLLIN | POLLERR;
  		input_fd.revents = 0;
  
  		ret = poll(&input_fd, 1, timeout_ms);
  #else							/* !HAVE_POLL */
  
  		fd_set		input_mask;
  		struct timeval timeout;
  		struct timeval *ptr_timeout;
  
  		FD_ZERO(&input_mask);
! 		FD_SET		(PQsocket(streamConn), &input_mask);
  
  		if (timeout_ms < 0)
  			ptr_timeout = NULL;
--- 279,314 ----
  				(errcode_for_socket_access(),
  				 errmsg("socket not open")));
  
+ 	Assert(forRead || forWrite);
+ 
  	/* We use poll(2) if available, otherwise select(2) */
  	{
  #ifdef HAVE_POLL
  		struct pollfd input_fd;
  
  		input_fd.fd = PQsocket(streamConn);
! 		input_fd.events = POLLERR;
  		input_fd.revents = 0;
  
+ 		if (forRead)
+ 			input_fd.events |= POLLIN;
+ 		if (forWrite)
+ 			input_fd.events |= POLLOUT;
+ 
  		ret = poll(&input_fd, 1, timeout_ms);
  #else							/* !HAVE_POLL */
  
  		fd_set		input_mask;
+ 		fd_set		output_mask;
  		struct timeval timeout;
  		struct timeval *ptr_timeout;
  
  		FD_ZERO(&input_mask);
! 		FD_ZERO(&output_mask);
! 		if (forRead)
! 			FD_SET		(PQsocket(streamConn), &input_mask);
! 		if (forWrite)
! 			FD_SET		(PQsocket(streamConn), &output_mask);
  
  		if (timeout_ms < 0)
  			ptr_timeout = NULL;
***************
*** 209,225 **** libpq_select(int timeout_ms)
  		}
  
  		ret = select(PQsocket(streamConn) + 1, &input_mask,
! 					 NULL, NULL, ptr_timeout);
  #endif   /* HAVE_POLL */
  	}
  
! 	if (ret == 0 || (ret < 0 && errno == EINTR))
! 		return false;
  	if (ret < 0)
  		ereport(ERROR,
  				(errcode_for_socket_access(),
  				 errmsg("select() failed: %m")));
! 	return true;
  }
  
  /*
--- 320,397 ----
  		}
  
  		ret = select(PQsocket(streamConn) + 1, &input_mask,
! 					 &output_mask, NULL, ptr_timeout);
  #endif   /* HAVE_POLL */
  	}
  
! 	if (ret == 0)	/* timeout */
! 		return 0;
! 	if (ret < 0 && errno == EINTR)	/* interrupted */
! 		return -1;
  	if (ret < 0)
  		ereport(ERROR,
  				(errcode_for_socket_access(),
  				 errmsg("select() failed: %m")));
! 	return ret;
! }
! 
! /*
!  * Send a query and wait for the result by using the asynchronous libpq
!  * functions and the backend version of select().
!  *
!  * On Windows, walreceiver must use this function instead of the blocking
!  * libpq functions like PQexec() since they use the vanilla select() and
!  * are uninterruptible by our emulated signals. On the other hand, this
!  * function is interruptible since it uses the signal emulation layer
!  * compatible select().
!  */
! static PGresult *
! libpqrcv_PQexec(const char *query)
! {
! 	PGresult   *res = NULL;
! 
! 	/*
! 	 * Submit a query. Since we don't use non-blocking mode, this also
! 	 * can block. But its risk is relatively small, so we ignore that
! 	 * for now.
! 	 */
! 	if (!PQsendQuery(streamConn, query))
! 		return NULL;
! 
! 	for (;;)
! 	{
! 		PGresult   *next;
! 
! 		/*
! 		 * Receive data until PQgetResult has been ready to get the
! 		 * result without blocking.
! 		 */
! 		while (PQisBusy(streamConn))
! 		{
! 			if (libpq_select(true, false, -1) < 0)
! 				continue;			/* interrupted */
! 			if (PQconsumeInput(streamConn) == 0)
! 				return NULL;		/* trouble */
! 		}
! 
! 		/*
! 		 * Don't emulate the PQexec()'s behavior of returning the last
! 		 * result, if there's many, since walreceiver never sends a query
! 		 * returning multiple results.
! 		 */
! 		if ((next = PQgetResult(streamConn)) == NULL)
! 			break;				/* query is complete */
! 		if (PQresultStatus(next) == PGRES_FATAL_ERROR)
! 			return next;
! 		PQclear(res);
! 		res = next;
! 		if (PQresultStatus(res) == PGRES_COPY_IN ||
! 			PQresultStatus(res) == PGRES_COPY_OUT ||
! 			PQstatus(streamConn) == CONNECTION_BAD)
! 			break;
! 	}
! 
! 	return res;
  }
  
  /*
***************
*** 268,274 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
  	 */
  	if (timeout > 0 && !justconnected)
  	{
! 		if (!libpq_select(timeout))
  			return false;
  
  		if (PQconsumeInput(streamConn) == 0)
--- 440,446 ----
  	 */
  	if (timeout > 0 && !justconnected)
  	{
! 		if (libpq_select(true, false, timeout) <= 0)
  			return false;
  
  		if (PQconsumeInput(streamConn) == 0)
#2Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#1)
Re: walreceiver is uninterruptible on win32

On Wed, Mar 10, 2010 at 10:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Hi,

http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.

I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.

These are issues that affect other things running libpq in the backend
as well, right? Such as dblink? Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#3Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#2)
Re: walreceiver is uninterruptible on win32

On Fri, Mar 12, 2010 at 8:13 PM, Magnus Hagander <magnus@hagander.net> wrote:

On Wed, Mar 10, 2010 at 10:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Hi,

http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.

I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.

These are issues that affect other things running libpq in the backend
as well, right? Such as dblink?

Yes. So Heikki wrote the patch for dblink.
http://archives.postgresql.org/pgsql-hackers/2010-01/msg02072.php

Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?

Sorry. I couldn't get your point. Could you explain it in detail?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#4Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#3)
Re: walreceiver is uninterruptible on win32

On Mon, Mar 15, 2010 at 10:14, Fujii Masao <masao.fujii@gmail.com> wrote:

On Fri, Mar 12, 2010 at 8:13 PM, Magnus Hagander <magnus@hagander.net> wrote:

On Wed, Mar 10, 2010 at 10:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Hi,

http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.

I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.

These are issues that affect other things running libpq in the backend
as well, right? Such as dblink?

Yes. So Heikki wrote the patch for dblink.
http://archives.postgresql.org/pgsql-hackers/2010-01/msg02072.php

IIRC that was never applied.

Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?

Sorry. I couldn't get your point. Could you explain it in detail?

What I'm referring to is the part that Heikki writes as "The
implementation should be shared between the two, but I'm not sure
how". I think we should try to factor out things that can be shared
into separate functions and stick those in port/win32 (assuming
they're win32-specific, otherwise, in another suitable location), and
then call them from both. There seems to be a lot of things that
should be doable that way.

I notice for example that the dblink patch doesn't have the code for
timeout handling - shouldn't it?

I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#5Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#4)
Re: walreceiver is uninterruptible on win32

On Mon, Mar 15, 2010 at 6:42 PM, Magnus Hagander <magnus@hagander.net> wrote:

Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?

Sorry. I couldn't get your point. Could you explain it in detail?

What I'm referring to is the part that Heikki writes as "The
implementation should be shared between the two, but I'm not sure
how". I think we should try to factor out things that can be shared
into separate functions and stick those in port/win32 (assuming
they're win32-specific, otherwise, in another suitable location), and
then call them from both. There seems to be a lot of things that
should be doable that way.

I notice for example that the dblink patch doesn't have the code for
timeout handling - shouldn't it?

I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.

Thanks for the explanation. I agree that the code should be shared,
but am not sure how, too.

Something like libpq_select() which waits for the socket to become
ready would be required for walreceiver and dblink. But it's necessary
for walreceiver on not only win32 but also the other, so some functions
might need to be placed in the location other than port/win32.

I'll think of this issue for a while.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#6Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Fujii Masao (#5)
Re: walreceiver is uninterruptible on win32

Fujii Masao wrote:

On Mon, Mar 15, 2010 at 6:42 PM, Magnus Hagander <magnus@hagander.net> wrote:

I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.

Agreed.

Something like libpq_select() which waits for the socket to become
ready would be required for walreceiver and dblink. But it's necessary
for walreceiver on not only win32 but also the other, ...

Really, why? I thought this is a purely Windows specific problem.

Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#7Joe Conway
mail@joeconway.com
In reply to: Magnus Hagander (#4)
Re: walreceiver is uninterruptible on win32

On 03/15/2010 02:42 AM, Magnus Hagander wrote:

I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.

+1

Joe

#8Fujii Masao
masao.fujii@gmail.com
In reply to: Heikki Linnakangas (#6)
Re: walreceiver is uninterruptible on win32

On Tue, Mar 16, 2010 at 12:32 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

Something like libpq_select() which waits for the socket to become
ready would be required for walreceiver and dblink. But it's necessary
for walreceiver on not only win32 but also the other, ...

Really, why? I thought this is a purely Windows specific problem.

Because, on all the platforms, libpq_receive() needs to call libpq_select().
Or you mean that we should leave the existing libpq_select() as it is, and
create new win32 specific function which waits for the socket to become ready,
for this issue?

Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.

I'll try to replace PQexec() first, and PQconnectdb() second if I have
enough time.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#9Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#8)
1 attachment(s)
Re: walreceiver is uninterruptible on win32

On Tue, Mar 16, 2010 at 10:35 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.

I'll try to replace PQexec() first, and PQconnectdb() second if I have
enough time.

Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().

pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.

I'm not sure if my patch is in the right way. If you notice anything,
please feel free to comment!

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

pgwin32_PQexec_v1.patchapplication/octet-stream; name=pgwin32_PQexec_v1.patchDownload
*** a/contrib/dblink/Makefile
--- b/contrib/dblink/Makefile
***************
*** 20,22 **** top_builddir = ../..
--- 20,27 ----
  include $(top_builddir)/src/Makefile.global
  include $(top_srcdir)/contrib/contrib-global.mk
  endif
+ 
+ ifeq ($(PORTNAME), win32)
+ override PG_CPPFLAGS := -I$(libpqbe_srcdir) $(PG_CPPFLAGS)
+ override SHLIB_LINK := $(libpqbe) $(SHLIB_LINK)
+ endif
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
***************
*** 67,72 ****
--- 67,76 ----
  
  #include "dblink.h"
  
+ #ifdef WIN32
+ #include "port/win32/libpqbe.h"
+ #endif
+ 
  PG_MODULE_MAGIC;
  
  typedef struct remoteConn
*** a/src/Makefile
--- b/src/Makefile
***************
*** 21,26 **** all install installdirs uninstall distprep:
--- 21,27 ----
  	$(MAKE) -C backend/snowball $@
  	$(MAKE) -C include $@
  	$(MAKE) -C interfaces $@
+ 	$(MAKE) -C backend/port/win32/libpqbe $@
  	$(MAKE) -C backend/replication/libpqwalreceiver $@
  	$(MAKE) -C bin $@
  	$(MAKE) -C pl $@
***************
*** 52,57 **** clean:
--- 53,59 ----
  	$(MAKE) -C backend/snowball $@
  	$(MAKE) -C include $@
  	$(MAKE) -C interfaces $@
+ 	$(MAKE) -C backend/port/win32/libpqbe $@
  	$(MAKE) -C backend/replication/libpqwalreceiver $@
  	$(MAKE) -C bin $@
  	$(MAKE) -C pl $@
***************
*** 67,72 **** distclean maintainer-clean:
--- 69,75 ----
  	$(MAKE) -C backend/snowball $@
  	$(MAKE) -C include $@
  	$(MAKE) -C interfaces $@
+ 	$(MAKE) -C backend/port/win32/libpqbe $@
  	$(MAKE) -C backend/replication/libpqwalreceiver $@
  	$(MAKE) -C bin $@
  	$(MAKE) -C pl $@
***************
*** 82,87 **** coverage:
--- 85,91 ----
  	$(MAKE) -C backend/utils/mb/conversion_procs $@
  	$(MAKE) -C backend/snowball $@
  	$(MAKE) -C interfaces $@
+ 	$(MAKE) -C backend/port/win32/libpqbe $@
  	$(MAKE) -C backend/replication/libpqwalreceiver $@
  	$(MAKE) -C bin $@
  	$(MAKE) -C pl $@
*** a/src/Makefile.global.in
--- b/src/Makefile.global.in
***************
*** 395,400 **** else
--- 395,411 ----
  libpq_pgport = -L$(top_builddir)/src/port -lpgport $(libpq)
  endif
  
+ # This macro is for use by backend libraries that use libpq.
+ ifdef PGXS
+ libpqbe_srcdir = $(includedir)
+ libpqbe_builddir = $(libdir)
+ libpqbe = -L$(libdir) -lpqbe $(libpq)
+ else
+ libpqbe_srcdir = $(top_srcdir)/src/include/port/win32
+ libpqbe_builddir = $(top_builddir)/src/backend/port/win32/libpqbe
+ libpqbe = -L$(libpqbe_builddir) -lpqbe $(libpq)
+ endif
+ 
  
  submake-libpq:
  	$(MAKE) -C $(libpq_builddir) all
***************
*** 402,408 **** submake-libpq:
  submake-libpgport:
  	$(MAKE) -C $(top_builddir)/src/port all
  
! .PHONY: submake-libpq submake-libpgport
  
  
  ##########################################################################
--- 413,422 ----
  submake-libpgport:
  	$(MAKE) -C $(top_builddir)/src/port all
  
! submake-libpqbe: submake-libpq
! 	$(MAKE) -C $(libpqbe_builddir) all
! 
! .PHONY: submake-libpq submake-libpgport submake-libpqbe
  
  
  ##########################################################################
*** /dev/null
--- b/src/backend/port/win32/libpqbe/Makefile
***************
*** 0 ****
--- 1,36 ----
+ #-------------------------------------------------------------------------
+ #
+ # Makefile--
+ #    Makefile for src/backend/port/win32/libpqbe
+ #
+ # IDENTIFICATION
+ #    $PostgreSQL$
+ #
+ #-------------------------------------------------------------------------
+ 
+ subdir = src/backend/port/win32/libpqbe
+ top_builddir = ../../../../..
+ include $(top_builddir)/src/Makefile.global
+ 
+ override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+ 
+ OBJS = libpqbe.o
+ SHLIB_LINK = $(libpq)
+ NAME = libpqbe
+ 
+ SHLIB_EXPORTS = exports.txt
+ 
+ ifeq ($(PORTNAME),win32)
+ all: submake-libpq all-shared-lib
+ 
+ include $(top_srcdir)/src/Makefile.shlib
+ 
+ install: all installdirs install-lib
+ 
+ installdirs: installdirs-lib
+ 
+ uninstall: uninstall-lib
+ 
+ clean distclean maintainer-clean: clean-lib
+ 	rm -f $(OBJS)
+ endif
*** /dev/null
--- b/src/backend/port/win32/libpqbe/exports.txt
***************
*** 0 ****
--- 1,3 ----
+ # $PostgreSQL$
+ # Functions to be exported by libpqbe DLL
+ pgwin32_PQexec   1
*** /dev/null
--- b/src/backend/port/win32/libpqbe/libpqbe.c
***************
*** 0 ****
--- 1,102 ----
+ /*-------------------------------------------------------------------------
+  *
+  * libpqbe.c
+  *	  functions related to sending a query from the backend
+  *
+  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+  *
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "libpq-fe.h"
+ #include "port/win32/libpqbe.h"
+ 
+ /*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * On Windows, dblink and walreceiver must use this function instead of
+ * the blocking libpq functions like PQexec() since they use the vanilla
+ * select() and are uninterruptible by our emulated signals. On the other
+ * hand, this function is interruptible since it uses the signal emulation
+ * layer compatible select().
+  */
+ PGresult *
+ pgwin32_PQexec(PGconn *conn, const char *query)
+ {
+ 	PGresult   *result = NULL;
+ 	PGresult   *lastResult = NULL;
+ 
+ 	/*
+ 	 * Submit a query. Since we don't use non-blocking mode, this also
+ 	 * can block. But its risk is relatively small, so we ignore that
+ 	 * for now.
+ 	 */
+ 	if (!PQsendQuery(conn, query))
+ 		return NULL;
+ 
+ 	for (;;)
+ 	{
+ 		/*
+ 		 * Receive data until PQgetResult has been ready to get the
+ 		 * result without blocking.
+ 		 */
+ 		while (PQisBusy(conn))
+ 		{
+ 			int	ret;
+ 			int	sock = PQsocket(conn);
+ 			fd_set		input_mask;
+ 
+ 			if (sock < 0)
+ 				ereport(ERROR,
+ 						(errcode_for_socket_access(),
+ 						 errmsg("socket not open")));
+ 
+ 			FD_ZERO(&input_mask);
+ 			FD_SET		(sock, &input_mask);
+ 
+ 			ret = select(sock + 1, &input_mask, NULL, NULL, NULL);
+ 			if (ret == 0 || (ret < 0 && errno == EINTR))
+ 				continue;			/* interrupted */
+ 			if (ret < 0)
+ 				ereport(ERROR,
+ 						(errcode_for_socket_access(),
+ 						 errmsg("select() failed: %m")));
+ 			if (PQconsumeInput(conn) == 0)
+ 				return NULL;
+ 		}
+ 
+ 		/*
+ 		 * Emulate the PQexec()'s behavior of returning the last result,
+ 		 * if there's many.
+ 		 *
+ 		 * We don't try to concatenate error messages like PQexec() does.
+ 		 * Doesn't seem worth the effort.
+ 		 */
+ 		if ((result = PQgetResult(conn)) == NULL)
+ 			break;				/* query is complete */
+ 		if (lastResult)
+ 		{
+ 			if (PQresultStatus(lastResult) == PGRES_FATAL_ERROR &&
+ 				PQresultStatus(result) == PGRES_FATAL_ERROR)
+ 			{
+ 				PQclear(result);
+ 				result = lastResult;
+ 			}
+ 			else
+ 				PQclear(lastResult);
+ 		}
+ 		lastResult = result;
+ 		if (PQresultStatus(result) == PGRES_COPY_IN ||
+ 			PQresultStatus(result) == PGRES_COPY_OUT ||
+ 			PQstatus(conn) == CONNECTION_BAD)
+ 			break;
+ 	}
+ 
+ 	return lastResult;
+ }
*** a/src/backend/replication/libpqwalreceiver/Makefile
--- b/src/backend/replication/libpqwalreceiver/Makefile
***************
*** 15,24 **** include $(top_builddir)/src/Makefile.global
  override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
  
  OBJS = libpqwalreceiver.o
  SHLIB_LINK = $(libpq)
  NAME = libpqwalreceiver
  
! all: submake-libpq all-shared-lib
  
  include $(top_srcdir)/src/Makefile.shlib
  
--- 15,33 ----
  override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
  
  OBJS = libpqwalreceiver.o
+ ifeq ($(PORTNAME), win32)
+ override CPPFLAGS := -I$(libpqbe_srcdir) $(CPPFLAGS)
+ SHLIB_LINK = $(libpqbe)
+ else
  SHLIB_LINK = $(libpq)
+ endif
  NAME = libpqwalreceiver
  
! ifeq ($(PORTNAME), win32)
! all:	submake-libpqbe	all-shared-lib
! else
! all:	submake-libpq	all-shared-lib
! endif
  
  include $(top_srcdir)/src/Makefile.shlib
  
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 25,30 ****
--- 25,34 ----
  #include "replication/walreceiver.h"
  #include "utils/builtins.h"
  
+ #ifdef WIN32
+ #include "port/win32/libpqbe.h"
+ #endif
+ 
  #ifdef HAVE_POLL_H
  #include <poll.h>
  #endif
*** /dev/null
--- b/src/include/port/win32/libpqbe.h
***************
*** 0 ****
--- 1,23 ----
+ /*-------------------------------------------------------------------------
+  *
+  * libpqbe.h
+  *	  functions related to sending a query from the backend
+  *
+  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+  *
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #ifndef LIBPQBE_H
+ #define LIBPQBE_H
+ 
+ #include "libpq-fe.h"
+ 
+ #define PQexec(conn, query) pgwin32_PQexec(conn, query)
+ PGresult *pgwin32_PQexec(PGconn *conn, const char *query);
+ 
+ #endif   /* LIBPQBE_H */
#10Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#9)
Re: walreceiver is uninterruptible on win32

On Thu, Mar 25, 2010 at 15:33, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Mar 16, 2010 at 10:35 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.

I'll try to replace PQexec() first, and PQconnectdb() second if I have
enough time.

Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().

pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.

I'm not sure if my patch is in the right way. If you notice anything,
please feel free to comment!

Well, first of all, it would require the addition of the new target to
the msvc build system, but that's easy - I can do that for you.

More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.

How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.

The downside would be that other third-party modules who need to call
libpq wouldn't be able to access this without copying the function.
But is this really something that's useful for third party modules?

As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#11Tom Lane
tgl@sss.pgh.pa.us
In reply to: Magnus Hagander (#10)
Re: walreceiver is uninterruptible on win32

Magnus Hagander <magnus@hagander.net> writes:

On Thu, Mar 25, 2010 at 15:33, Fujii Masao <masao.fujii@gmail.com> wrote:

Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().

pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.

[ assorted objections ]

I disapprove of the whole approach, actually. The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly. Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
You've more or less done that loop, but you've put it in the wrong
place.

The larger point is that I don't believe this issue exists only on
Windows. I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.

The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

regards, tom lane

#12Magnus Hagander
magnus@hagander.net
In reply to: Tom Lane (#11)
Re: walreceiver is uninterruptible on win32

On Fri, Apr 2, 2010 at 17:26, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Magnus Hagander <magnus@hagander.net> writes:

On Thu, Mar 25, 2010 at 15:33, Fujii Masao <masao.fujii@gmail.com> wrote:

Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().

pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.

[ assorted objections ]

I disapprove of the whole approach, actually.  The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly.  Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
You've more or less done that loop, but you've put it in the wrong
place.

Any particular reason not to wrap that in a function? Not called
pgwin32_PQexec() then, but something more generic? And not doing any
#defines to change PQexec, but call that wrapper directly?

The larger point is that I don't believe this issue exists only on
Windows.  I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.

The most important part of the issue doesn't (because PQexec will be
interrupted by a signal), but there may certainly be others.

The patch also seems confused about whether it's intending to be a
general-purpose solution or not.  You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

Yeah, good point.

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#13Tom Lane
tgl@sss.pgh.pa.us
In reply to: Magnus Hagander (#12)
Re: walreceiver is uninterruptible on win32

Magnus Hagander <magnus@hagander.net> writes:

On Fri, Apr 2, 2010 at 17:26, Tom Lane <tgl@sss.pgh.pa.us> wrote:

I disapprove of the whole approach, actually. �The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly. �Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
You've more or less done that loop, but you've put it in the wrong
place.

Any particular reason not to wrap that in a function? Not called
pgwin32_PQexec() then, but something more generic? And not doing any
#defines to change PQexec, but call that wrapper directly?

Yeah, that's fine. I just think it's easier to deal with this as a
local issue in walreceiver and dblink than to try to pretend we're
changing libpq's API.

The larger point is that I don't believe this issue exists only on
Windows. �I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.

The most important part of the issue doesn't (because PQexec will be
interrupted by a signal), but there may certainly be others.

Really? As you pointed out yourself, if control doesn't reach a
CHECK_FOR_INTERRUPTS then we have a problem. We also know that select
isn't interrupted by signals on all platforms.

regards, tom lane

#14Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#10)
Re: walreceiver is uninterruptible on win32

On Fri, Apr 2, 2010 at 11:11 PM, Magnus Hagander <magnus@hagander.net> wrote:

More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.

How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.

ISTM that we cannot compile dblink using USE_PGXS=1, if that DLL doesn't
exist in the installation directory. No?

As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?

Yes. I'll add a CHECK_FOR_INTERRUPTS in the loop.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#15Fujii Masao
masao.fujii@gmail.com
In reply to: Tom Lane (#11)
Re: walreceiver is uninterruptible on win32

On Sat, Apr 3, 2010 at 12:26 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

I disapprove of the whole approach, actually.  The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly.  Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.

Yes.

You've more or less done that loop, but you've put it in the wrong
place.

OK. I'll reconsider about how to use those asynchronous libpq functions.
But, if possible, could you point out where "the right place" is?

The larger point is that I don't believe this issue exists only on
Windows.  I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.

OK. I'll change the part so that poll() is used if HAVE_POLL is defined,
select() otherwise.

The patch also seems confused about whether it's intending to be a
general-purpose solution or not.  You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

OK. I'll address this problem. Since PGconn->errorMessage cannot be
updated from outside of libpq, I'm thinking of making the caller give
the StringInfo variable as a parameter to pgwin32_PQexec(), and
putting error messages in it. Then the caller use the StringInfo
instead of PQerrorMessage(PGconn), to get the error messages.

And ISTM that dblink needs to hold the StringInfo using hash as
do the PGconn.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#16Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#14)
Re: walreceiver is uninterruptible on win32

On Mon, Apr 5, 2010 at 3:18 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Fri, Apr 2, 2010 at 11:11 PM, Magnus Hagander <magnus@hagander.net> wrote:

More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.

How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.

ISTM that we cannot compile dblink using USE_PGXS=1, if that DLL doesn't
exist in the installation directory. No?

I might have misinterpreted your point. You mean that the same source
file defining something like pgwin32_PQexec should be placed in both
contrib/dblink and src/backend/replication/libpqwalreceiver? If so,
we can compile dblink using USE_PGXS without the DLL.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#17Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#16)
Re: walreceiver is uninterruptible on win32

On Tue, Apr 6, 2010 at 2:25 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Mon, Apr 5, 2010 at 3:18 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Fri, Apr 2, 2010 at 11:11 PM, Magnus Hagander <magnus@hagander.net> wrote:

More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.

How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.

ISTM that we cannot compile dblink using USE_PGXS=1, if that DLL doesn't
exist in the installation directory. No?

I might have misinterpreted your point. You mean that the same source
file defining something like pgwin32_PQexec should be placed in both
contrib/dblink and src/backend/replication/libpqwalreceiver? If so,
we can compile dblink using USE_PGXS without the DLL.

No, I don't mean that. I mean store it in one place, and copy/link it
into where it's used. Look at for example how crypt.c and
getaddrinfo.c are handled in libpq.

Not sure how that will play with PGXS, though, but I'm not entirely
sure we care if it can be built that way? If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.

Also note that per Tom's comments this is not a win32 only fix, so it
shouldn't be called pgwin32_*().

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#18Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#17)
Re: walreceiver is uninterruptible on win32

On Wed, Apr 7, 2010 at 1:45 AM, Magnus Hagander <magnus@hagander.net> wrote:

No, I don't mean that. I mean store it in one place, and copy/link it
into where it's used. Look at for example how crypt.c and
getaddrinfo.c are handled in libpq.

Thanks for the advice!

Not sure how that will play with PGXS, though, but I'm not entirely
sure we care if it can be built that way?

Probably Yes.

If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.

If we can copy/link the source file defining "new PQexec" when
we compile the dblink, DLL doesn't seem to be required. So I
stop creating new DLL for PGXS.

Also note that per Tom's comments this is not a win32 only fix, so it
shouldn't be called pgwin32_*().

Yep.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#19Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#18)
Re: walreceiver is uninterruptible on win32

On Thu, Apr 8, 2010 at 5:01 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.

If we can copy/link the source file defining "new PQexec" when
we compile the dblink, DLL doesn't seem to be required. So I
stop creating new DLL for PGXS.

On second thought, ISTM that we cannot use any source files which exist
in places other than contrib/dblink and installation directory when we
compile dblink under USE_PGXS=1. But we can put the file implementing
new PQexec on those neither. So I'm thinking again that it should be
provided as the shared library and be linked from walreceiver and dblink.
Is this right?

If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#20Joseph Conway
mail@joeconway.com
In reply to: Fujii Masao (#19)
Re: walreceiver is uninterruptible on win32

Fujii Masao wrote:

If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?

I would agree with this. No one has ever complained that I am aware of.

Joe

#21Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#19)
Re: walreceiver is uninterruptible on win32

On Mon, Apr 12, 2010 at 13:54, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Apr 8, 2010 at 5:01 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.

If we can copy/link the source file defining "new PQexec" when
we compile the dblink, DLL doesn't seem to be required. So I
stop creating new DLL for PGXS.

On second thought, ISTM that we cannot use any source files which exist
in places other than contrib/dblink and installation directory when we
compile dblink under USE_PGXS=1. But we can put the file implementing
new PQexec on those neither. So I'm thinking again that it should be
provided as the shared library and be linked from walreceiver and dblink.
Is this right?

If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?

+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#22Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#21)
Re: walreceiver is uninterruptible on win32

On Tue, Apr 13, 2010 at 1:56 AM, Magnus Hagander <magnus@hagander.net> wrote:

If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?

+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...

OK. I'll focus on walreceiver now.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#23Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#22)
1 attachment(s)
Re: walreceiver is uninterruptible on win32

On Tue, Apr 13, 2010 at 9:21 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Apr 13, 2010 at 1:56 AM, Magnus Hagander <magnus@hagander.net> wrote:

If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?

+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...

OK. I'll focus on walreceiver now.

The attached patch changes walreceiver so that it uses new function
libpqrcv_PQexec() instead of PQexec() for sending handshake message.
libpqrcv_PQexec() sends a query and wait for the results by using
the asynchronous libpq functions and the backend version of select().
It's interruptible by signals. You would be able to shut the standby
server down in the middle of handshake for replication connection.

http://archives.postgresql.org/pgsql-hackers/2010-03/msg00625.php

Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.

According to the suggestion, I replaced only the PQexec() and didn't
add the replacement of PQconnect() for now.

http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php

As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?

Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.

http://archives.postgresql.org/pgsql-hackers/2010-04/msg00084.php

The larger point is that I don't believe this issue exists only on
Windows. I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.

The patch uses libpq_select() to check for the socket. It supports
poll() and select().

The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_PQexec_v1.patchapplication/octet-stream; name=libpqrcv_PQexec_v1.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 54,59 **** static void libpqrcv_disconnect(void);
--- 54,60 ----
  
  /* Prototypes for private functions */
  static bool libpq_select(int timeout_ms);
+ static PGresult *libpqrcv_PQexec(const char *query);
  
  /*
   * Module load callback
***************
*** 97,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
--- 98,104 ----
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
***************
*** 149,155 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = PQexec(streamConn, cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start WAL streaming: %s",
--- 150,156 ----
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = libpqrcv_PQexec(cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start WAL streaming: %s",
***************
*** 225,230 **** libpq_select(int timeout_ms)
--- 226,302 ----
  }
  
  /*
+  * Send a query and wait for the results by using the asynchronous libpq
+  * functions and the backend version of select().
+  *
+  * We must not use the blocking libpq functions like PQexec() for that
+  * purpose because they are uninterruptible by signals on some platforms.
+  * Similarly, we must not use the vanilla select() here because it cannot
+  * handle the signals emulated for Windows. The signal emulation layer
+  * compatible select() must be called instead.
+  */
+ static PGresult *
+ libpqrcv_PQexec(const char *query)
+ {
+ 	PGresult   *res = NULL;
+ 
+ 	/*
+ 	 * PQexec() silently discards any prior query results at first.
+ 	 * But this preparation is not required for walreceiver because
+ 	 * it's expected that walsender doesn't generate such junk results.
+ 	 */
+ 
+ 	/*
+ 	 * Submit a query. Since we don't use non-blocking mode, this also
+ 	 * can block. But its risk is relatively small, so we ignore that
+ 	 * for now.
+ 	 */
+ 	if (!PQsendQuery(streamConn, query))
+ 		return NULL;
+ 
+ 	for (;;)
+ 	{
+ 		PGresult   *next;
+ 
+ 		/*
+ 		 * Receive data until PQgetResult has been ready to get the
+ 		 * result without blocking.
+ 		 */
+ 		while (PQisBusy(streamConn))
+ 		{
+ 			/*
+ 			 * We don't need to break down the sleep into smaller increments,
+ 			 * and check for interrupts after each nap. Because we can just
+ 			 * elog(FATAL) within SIGTERM signal handler when the signal
+ 			 * arrives in the middle of establishment of replication connection.
+ 			 */
+ 			if (!libpq_select(-1))
+ 				continue;		/* interrupted */
+ 			if (PQconsumeInput(streamConn) == 0)
+ 				return NULL;	/* trouble */
+ 		}
+ 
+ 		/*
+ 		 * Don't emulate the PQexec()'s behavior of returning the last
+ 		 * result when there are many, since walreceiver never sends a
+ 		 * query returning multiple results.
+ 		 */
+ 		if ((next = PQgetResult(streamConn)) == NULL)
+ 			break;				/* query is complete */
+ 		if (PQresultStatus(next) == PGRES_FATAL_ERROR)
+ 			return next;
+ 		PQclear(res);
+ 		res = next;
+ 		if (PQresultStatus(res) == PGRES_COPY_IN ||
+ 			PQresultStatus(res) == PGRES_COPY_OUT ||
+ 			PQstatus(streamConn) == CONNECTION_BAD)
+ 			break;
+ 	}
+ 
+ 	return res;
+ }
+ 
+ /*
   * Disconnect connection to primary, if any.
   */
  static void
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 86,93 **** static void DisableWalRcvImmediateExit(void);
   * We can't just exit(1) within SIGTERM signal handler, because the signal
   * might arrive in the middle of some critical operation, like while we're
   * holding a spinlock. We also can't just set a flag in signal handler and
!  * check it in the main loop, because we perform some blocking libpq
!  * operations like PQexec(), which can take a long time to finish.
   *
   * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
   * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
--- 86,93 ----
   * We can't just exit(1) within SIGTERM signal handler, because the signal
   * might arrive in the middle of some critical operation, like while we're
   * holding a spinlock. We also can't just set a flag in signal handler and
!  * check it in the main loop, because we perform some blocking operations
!  * like libpqrcv_PQexec(), which can take a long time to finish.
   *
   * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
   * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
#24Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#23)
1 attachment(s)
Re: walreceiver is uninterruptible on win32

On Wed, Apr 14, 2010 at 10:02 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Apr 13, 2010 at 9:21 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Apr 13, 2010 at 1:56 AM, Magnus Hagander <magnus@hagander.net> wrote:

If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?

+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...

OK. I'll focus on walreceiver now.

The attached patch changes walreceiver so that it uses new function
libpqrcv_PQexec() instead of PQexec() for sending handshake message.
libpqrcv_PQexec() sends a query and wait for the results by using
the asynchronous libpq functions and the backend version of select().
It's interruptible by signals. You would be able to shut the standby
server down in the middle of handshake for replication connection.

http://archives.postgresql.org/pgsql-hackers/2010-03/msg00625.php

Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.

According to the suggestion, I replaced only the PQexec() and didn't
add the replacement of PQconnect() for now.

http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php

As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?

Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.

It will call the signal handler, yes. Normally, the signal handler
just sets a flag somewhere, which needs to be checked with
CHECK_FOR_INTERRUPTS.

From how I read the walreceiver.c code (which I'm not that familiar
with), the signal handlers call ProcessWalRcvInterrupts() which in
turn has CHECK_FOR_INTERRUPTS in it, and this is where it ends up
being called.

The patch also seems confused about whether it's intending to be a
general-purpose solution or not.  You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.

Given those shortcuts, can't we simplify it even further like
attached? (If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)

Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.

Finally, I've updated some of the comments.

Note: I've only tested this patch as far as that it compiles. I don't
have a SR env around right now, so I'll have to complete with that
later. But if you have a chance to test that it fixes your test case,
please do!

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

Attachments:

libpqrcv_PQexec_v2.patchapplication/octet-stream; name=libpqrcv_PQexec_v2.patchDownload
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e451c55..5e759b6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
 static bool libpq_select(int timeout_ms);
+static PGresult *libpqrcv_PQexec(const char *query);
 
 /*
  * Module load callback
@@ -97,10 +98,11 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
+	if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		PQclear(res);
+		if (res)
+			PQclear(res);
 		ereport(ERROR,
 				(errmsg("could not receive database system identifier and timeline ID from "
 						"the primary server: %s",
@@ -149,11 +151,15 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
 	/* Start streaming from the point requested by startup process */
 	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
 			 startpoint.xlogid, startpoint.xrecoff);
-	res = PQexec(streamConn, cmd);
-	if (PQresultStatus(res) != PGRES_COPY_OUT)
+	res = libpqrcv_PQexec(cmd);
+	if (res == NULL || PQresultStatus(res) != PGRES_COPY_OUT)
+	{
+		if (res)
+			PQclear(res);
 		ereport(ERROR,
 				(errmsg("could not start WAL streaming: %s",
 						PQerrorMessage(streamConn))));
+	}
 	PQclear(res);
 
 	justconnected = true;
@@ -225,6 +231,66 @@ libpq_select(int timeout_ms)
 }
 
 /*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the regular blocking libpq functions like PQexec()
+ * since they are uninterruptible by signals on some platforms, such as
+ * Windows.
+ *
+ * We must also not use vanilla select() here since it cannot handle the
+ * signal emulation layer on Windows.
+ *
+ * The function is modeled on PQexec() in libpq, but only implements
+ * those parts that are in use in the walreceiver.
+ *
+ * Queries are always executed on the connection in streamConn.
+ */
+static PGresult *
+libpqrcv_PQexec(const char *query)
+{
+	/*
+	 * PQexec() silently discards any prior query results on the
+	 * connection. This is not required for walreceiver since it's
+	 * expected that walsender won't generate any such junk results.
+	 */
+
+	/*
+	 * Submit a query. Since we don't use non-blocking mode, this also
+	 * can block. But the risk is relatively small, so we ignore that
+	 * for now.
+	 */
+	if (!PQsendQuery(streamConn, query))
+		return NULL;
+
+	/*
+	 * Receive data until PQgetResult is ready to get the result
+	 * without blocking.
+	 */
+	while (PQisBusy(streamConn))
+	{
+		/*
+		 * We don't need to break down the sleep into smaller increments,
+		 * and check for interrupts after each nap, since we can just
+		 * elog(FATAL) within SIGTERM signal handler if the signal
+		 * arrives in the middle of establishment of replication connection.
+		 */
+		if (!libpq_select(-1))
+			continue;		/* interrupted */
+		if (PQconsumeInput(streamConn) == 0)
+			return NULL;	/* trouble */
+	}
+
+	/*
+	 * Don't emulate the PQexec()'s behavior of returning the last
+	 * result when there are many, since walreceiver never sends a
+	 * query returning multiple results. Just return the first result
+	 * available on the connection.
+	 */
+	return PQgetResult(streamConn);
+}
+
+/*
  * Disconnect connection to primary, if any.
  */
 static void
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ff5f293..5ab8f68 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void);
  * We can't just exit(1) within SIGTERM signal handler, because the signal
  * might arrive in the middle of some critical operation, like while we're
  * holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking libpq
- * operations like PQexec(), which can take a long time to finish.
+ * check it in the main loop, because we perform some blocking operations
+ * like libpqrcv_PQexec(), which can take a long time to finish.
  *
  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
#25Tom Lane
tgl@sss.pgh.pa.us
In reply to: Magnus Hagander (#24)
Re: walreceiver is uninterruptible on win32

Magnus Hagander <magnus@hagander.net> writes:

Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.

I think you're just adding useless complexity there. PQresultStatus
defends itself just fine against a NULL input, and most other libpq
functions likewise.

regards, tom lane

#26Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#24)
1 attachment(s)
Re: walreceiver is uninterruptible on win32

On Wed, Apr 14, 2010 at 11:15 PM, Magnus Hagander <magnus@hagander.net> wrote:

http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php

As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?

Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.

It will call the signal handler, yes. Normally, the signal handler
just sets a flag somewhere, which needs to be checked with
CHECK_FOR_INTERRUPTS.

From how I read the walreceiver.c code (which I'm not that familiar
with), the signal handlers call ProcessWalRcvInterrupts() which in
turn has CHECK_FOR_INTERRUPTS in it, and this is where it ends up
being called.

Yes. While establishing replication connection (i.e., executing
walrcv_connect function), the SIGTERM signal handler directly calls
ProcessWalRcvInterrupts() which does CHECK_FOR_INTERRUPTS() and
elog(FATAL).

The patch also seems confused about whether it's intending to be a
general-purpose solution or not.  You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.

Given those shortcuts, can't we simplify it even further like
attached?

No, we need to repeat PQgetResult() at least two times. The first call
of it reads the RowDescription, DataRow and CommandComplete messages
and switches the state to PGASYNC_READY. The second one reads the
ReadyForQuery message and switches the state to PGASYNC_IDLE. So if we
don't repeat it, libpqrcv_PQexec() would end in a half-finished state.

(If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)

PQclear(NULL) might be called in my patch, but this is not a problem
since PQclear() does nothing if the specified PGresult argument is NULL.

Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.

As Tom pointed out in another post, we don't need to treat the
"result is NULL" case as special. OTOH, as you pointed out, I
forgot calling PQclear() when the second call of libpqrcv_PQexec()
in libpqrcv_connect() fails. I added it to the patch. Thanks!

Finally, I've updated some of the comments.

Thanks a lot! I applied that improvements to the patch.

I attached the revised patch.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_PQexec_v3.patchapplication/octet-stream; name=libpqrcv_PQexec_v3.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 54,59 **** static void libpqrcv_disconnect(void);
--- 54,60 ----
  
  /* Prototypes for private functions */
  static bool libpq_select(int timeout_ms);
+ static PGresult *libpqrcv_PQexec(const char *query);
  
  /*
   * Module load callback
***************
*** 97,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
--- 98,104 ----
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = PQexec(streamConn, cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start WAL streaming: %s",
  						PQerrorMessage(streamConn))));
  	PQclear(res);
  
  	justconnected = true;
--- 150,163 ----
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = libpqrcv_PQexec(cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
+ 	{
+ 		PQclear(res);
  		ereport(ERROR,
  				(errmsg("could not start WAL streaming: %s",
  						PQerrorMessage(streamConn))));
+ 	}
  	PQclear(res);
  
  	justconnected = true;
***************
*** 225,230 **** libpq_select(int timeout_ms)
--- 229,311 ----
  }
  
  /*
+  * Send a query and wait for the results by using the asynchronous libpq
+  * functions and the backend version of select().
+  *
+  * We must not use the regular blocking libpq functions like PQexec()
+  * since they are uninterruptible by signals on some platforms, such as
+  * Windows.
+  *
+  * We must also not use vanilla select() here since it cannot handle the
+  * signal emulation layer on Windows.
+  *
+  * The function is modeled on PQexec() in libpq, but only implements
+  * those parts that are in use in the walreceiver.
+  *
+  * Queries are always executed on the connection in streamConn.
+  */
+ static PGresult *
+ libpqrcv_PQexec(const char *query)
+ {
+ 	PGresult   *res = NULL;
+ 
+ 	/*
+ 	 * PQexec() silently discards any prior query results on the
+ 	 * connection. This is not required for walreceiver since it's
+ 	 * expected that walsender won't generate any such junk results.
+ 	 */
+ 
+ 	/*
+ 	 * Submit a query. Since we don't use non-blocking mode, this also
+ 	 * can block. But its risk is relatively small, so we ignore that
+ 	 * for now.
+ 	 */
+ 	if (!PQsendQuery(streamConn, query))
+ 		return NULL;
+ 
+ 	for (;;)
+ 	{
+ 		PGresult   *next;
+ 
+ 		/*
+ 		 * Receive data until PQgetResult is ready to get the result
+ 		 * without blocking.
+ 		 */
+ 		while (PQisBusy(streamConn))
+ 		{
+ 			/*
+ 			 * We don't need to break down the sleep into smaller increments,
+ 			 * and check for interrupts after each nap, since we can just
+ 			 * elog(FATAL) within SIGTERM signal handler if the signal
+ 			 * arrives in the middle of establishment of replication connection.
+ 			 */
+ 			if (!libpq_select(-1))
+ 				continue;		/* interrupted */
+ 			if (PQconsumeInput(streamConn) == 0)
+ 				return NULL;	/* trouble */
+ 		}
+ 
+ 		/*
+ 		 * Don't emulate the PQexec()'s behavior of returning the last
+ 		 * result when there are many, since walreceiver never sends a
+ 		 * query returning multiple results.
+ 		 */
+ 		if ((next = PQgetResult(streamConn)) == NULL)
+ 			break;				/* query is complete */
+ 		if (PQresultStatus(next) == PGRES_FATAL_ERROR)
+ 			return next;
+ 		PQclear(res);
+ 		res = next;
+ 		if (PQresultStatus(res) == PGRES_COPY_IN ||
+ 			PQresultStatus(res) == PGRES_COPY_OUT ||
+ 			PQstatus(streamConn) == CONNECTION_BAD)
+ 			break;
+ 	}
+ 
+ 	return res;
+ }
+ 
+ /*
   * Disconnect connection to primary, if any.
   */
  static void
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 86,93 **** static void DisableWalRcvImmediateExit(void);
   * We can't just exit(1) within SIGTERM signal handler, because the signal
   * might arrive in the middle of some critical operation, like while we're
   * holding a spinlock. We also can't just set a flag in signal handler and
!  * check it in the main loop, because we perform some blocking libpq
!  * operations like PQexec(), which can take a long time to finish.
   *
   * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
   * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
--- 86,93 ----
   * We can't just exit(1) within SIGTERM signal handler, because the signal
   * might arrive in the middle of some critical operation, like while we're
   * holding a spinlock. We also can't just set a flag in signal handler and
!  * check it in the main loop, because we perform some blocking operations
!  * like libpqrcv_PQexec(), which can take a long time to finish.
   *
   * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
   * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
#27Magnus Hagander
magnus@hagander.net
In reply to: Tom Lane (#25)
Re: walreceiver is uninterruptible on win32

On Thu, Apr 15, 2010 at 4:17 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Magnus Hagander <magnus@hagander.net> writes:

Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.

I think you're just adding useless complexity there.  PQresultStatus
defends itself just fine against a NULL input, and most other libpq
functions likewise.

Yeah, I realized that after posting it. I was still stuck in ancient
times when at least some of those functions couldn't be called with
NULL pointers, so I just put that in there by default :-)

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#28Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#26)
Re: walreceiver is uninterruptible on win32

On Thu, Apr 15, 2010 at 5:13 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Wed, Apr 14, 2010 at 11:15 PM, Magnus Hagander <magnus@hagander.net> wrote:

The patch also seems confused about whether it's intending to be a
general-purpose solution or not.  You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.

Given those shortcuts, can't we simplify it even further like
attached?

No, we need to repeat PQgetResult() at least two times. The first call
of it reads the RowDescription, DataRow and CommandComplete messages
and switches the state to PGASYNC_READY. The second one reads the
ReadyForQuery message and switches the state to PGASYNC_IDLE. So if we
don't repeat it, libpqrcv_PQexec() would end in a half-finished state.

Ah, ok. That's what I get for not testing it :-)

I still think that could be implemented in a much clearer way though.
Just calling PQgetResult() twice, and checking the return values
sequentially would be much easier to read, imho. Looking through taht
set of "break" statements at the end of the loop is just confusing. If
nothing else, it needs more comments.

But maybe I'm just bikeshedding ;)

(If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)

PQclear(NULL) might be called in my patch, but this is not a problem
since PQclear() does nothing if the specified PGresult argument is NULL.

Ah, I missed that you initialized it to NULL.

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#29Robert Haas
robertmhaas@gmail.com
In reply to: Fujii Masao (#26)
Re: walreceiver is uninterruptible on win32

On Wed, Apr 14, 2010 at 11:13 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Wed, Apr 14, 2010 at 11:15 PM, Magnus Hagander <magnus@hagander.net> wrote:

http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php

As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?

Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.

It will call the signal handler, yes. Normally, the signal handler
just sets a flag somewhere, which needs to be checked with
CHECK_FOR_INTERRUPTS.

From how I read the walreceiver.c code (which I'm not that familiar
with), the signal handlers call ProcessWalRcvInterrupts() which in
turn has CHECK_FOR_INTERRUPTS in it, and this is where it ends up
being called.

Yes. While establishing replication connection (i.e., executing
walrcv_connect function), the SIGTERM signal handler directly calls
ProcessWalRcvInterrupts() which does CHECK_FOR_INTERRUPTS() and
elog(FATAL).

The patch also seems confused about whether it's intending to be a
general-purpose solution or not.  You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.

The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.

Given those shortcuts, can't we simplify it even further like
attached?

No, we need to repeat PQgetResult() at least two times. The first call
of it reads the RowDescription, DataRow and CommandComplete messages
and switches the state to PGASYNC_READY. The second one reads the
ReadyForQuery message and switches the state to PGASYNC_IDLE. So if we
don't repeat it, libpqrcv_PQexec() would end in a half-finished state.

(If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)

PQclear(NULL) might be called in my patch, but this is not a problem
since PQclear() does nothing if the specified PGresult argument is NULL.

Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.

As Tom pointed out in another post, we don't need to treat the
"result is NULL" case as special. OTOH, as you pointed out, I
forgot calling PQclear() when the second call of libpqrcv_PQexec()
in libpqrcv_connect() fails. I added it to the patch. Thanks!

Finally, I've updated some of the comments.

Thanks a lot! I applied that improvements to the patch.

I attached the revised patch.

I have to admit to finding this confusing. According to the comments:

+ 		/*
+ 		 * Don't emulate the PQexec()'s behavior of returning the last
+ 		 * result when there are many, since walreceiver never sends a
+ 		 * query returning multiple results.
+ 		 */

...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.

...Robert

#30Fujii Masao
masao.fujii@gmail.com
In reply to: Robert Haas (#29)
1 attachment(s)
Re: walreceiver is uninterruptible on win32

On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:

I have to admit to finding this confusing.  According to the comments:

+               /*
+                * Don't emulate the PQexec()'s behavior of returning the last
+                * result when there are many, since walreceiver never sends a
+                * query returning multiple results.
+                */

...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.

Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.

So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_PQexec_v4.patchapplication/octet-stream; name=libpqrcv_PQexec_v4.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 54,59 **** static void libpqrcv_disconnect(void);
--- 54,60 ----
  
  /* Prototypes for private functions */
  static bool libpq_select(int timeout_ms);
+ static PGresult *libpqrcv_PQexec(const char *query);
  
  /*
   * Module load callback
***************
*** 97,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
--- 98,104 ----
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = PQexec(streamConn, cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start WAL streaming: %s",
  						PQerrorMessage(streamConn))));
  	PQclear(res);
  
  	justconnected = true;
--- 150,163 ----
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = libpqrcv_PQexec(cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
+ 	{
+ 		PQclear(res);
  		ereport(ERROR,
  				(errmsg("could not start WAL streaming: %s",
  						PQerrorMessage(streamConn))));
+ 	}
  	PQclear(res);
  
  	justconnected = true;
***************
*** 225,230 **** libpq_select(int timeout_ms)
--- 229,309 ----
  }
  
  /*
+  * Send a query and wait for the results by using the asynchronous libpq
+  * functions and the backend version of select().
+  *
+  * We must not use the regular blocking libpq functions like PQexec()
+  * since they are uninterruptible by signals on some platforms, such as
+  * Windows.
+  *
+  * We must also not use vanilla select() here since it cannot handle the
+  * signal emulation layer on Windows.
+  *
+  * The function is modeled on PQexec() in libpq, but only implements
+  * those parts that are in use in the walreceiver.
+  *
+  * Queries are always executed on the connection in streamConn.
+  */
+ static PGresult *
+ libpqrcv_PQexec(const char *query)
+ {
+ 	PGresult   *result		= NULL;
+ 	PGresult   *lastResult	= NULL;
+ 
+ 	/*
+ 	 * PQexec() silently discards any prior query results on the
+ 	 * connection. This is not required for walreceiver since it's
+ 	 * expected that walsender won't generate any such junk results.
+ 	 */
+ 
+ 	/*
+ 	 * Submit a query. Since we don't use non-blocking mode, this also
+ 	 * can block. But its risk is relatively small, so we ignore that
+ 	 * for now.
+ 	 */
+ 	if (!PQsendQuery(streamConn, query))
+ 		return NULL;
+ 
+ 	for (;;)
+ 	{
+ 		/*
+ 		 * Receive data until PQgetResult is ready to get the result
+ 		 * without blocking.
+ 		 */
+ 		while (PQisBusy(streamConn))
+ 		{
+ 			/*
+ 			 * We don't need to break down the sleep into smaller increments,
+ 			 * and check for interrupts after each nap, since we can just
+ 			 * elog(FATAL) within SIGTERM signal handler if the signal
+ 			 * arrives in the middle of establishment of replication connection.
+ 			 */
+ 			if (!libpq_select(-1))
+ 				continue;		/* interrupted */
+ 			if (PQconsumeInput(streamConn) == 0)
+ 				return NULL;	/* trouble */
+ 		}
+ 
+ 		/*
+ 		 * Emulate the PQexec()'s behavior of returning the last result
+ 		 * when there are many. But we don't try to concatenate error
+ 		 * results like PQexec() does since it's expected that walsender
+ 		 * won't return multiple error results.
+ 		 */
+ 		if ((result = PQgetResult(streamConn)) == NULL)
+ 			break;				/* query is complete */
+ 		PQclear(lastResult);
+ 		lastResult = result;
+ 		if (PQresultStatus(result) == PGRES_COPY_IN ||
+ 			PQresultStatus(result) == PGRES_COPY_OUT ||
+ 			PQstatus(streamConn) == CONNECTION_BAD)
+ 			break;
+ 	}
+ 
+ 	return lastResult;
+ }
+ 
+ /*
   * Disconnect connection to primary, if any.
   */
  static void
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 86,93 **** static void DisableWalRcvImmediateExit(void);
   * We can't just exit(1) within SIGTERM signal handler, because the signal
   * might arrive in the middle of some critical operation, like while we're
   * holding a spinlock. We also can't just set a flag in signal handler and
!  * check it in the main loop, because we perform some blocking libpq
!  * operations like PQexec(), which can take a long time to finish.
   *
   * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
   * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
--- 86,93 ----
   * We can't just exit(1) within SIGTERM signal handler, because the signal
   * might arrive in the middle of some critical operation, like while we're
   * holding a spinlock. We also can't just set a flag in signal handler and
!  * check it in the main loop, because we perform some blocking operations
!  * like libpqrcv_PQexec(), which can take a long time to finish.
   *
   * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
   * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
#31Robert Haas
robertmhaas@gmail.com
In reply to: Fujii Masao (#30)
Re: walreceiver is uninterruptible on win32

On Fri, Apr 16, 2010 at 3:03 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:

I have to admit to finding this confusing.  According to the comments:

+               /*
+                * Don't emulate the PQexec()'s behavior of returning the last
+                * result when there are many, since walreceiver never sends a
+                * query returning multiple results.
+                */

...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.

Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.

So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?

Well, the comment definitely makes more sense in this version. I
can't speak to the behavior.

...Robert

#32Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#30)
Re: walreceiver is uninterruptible on win32

On Fri, Apr 16, 2010 at 4:03 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:

I have to admit to finding this confusing.  According to the comments:

+               /*
+                * Don't emulate the PQexec()'s behavior of returning the last
+                * result when there are many, since walreceiver never sends a
+                * query returning multiple results.
+                */

...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.

Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.

So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?

BTW, even if you apply the patch, you would not be able to interrupt the
walreceiver by using smart shutdown because of the bug reported in another
thread. Please be careful about that when you test the patch.
http://archives.postgresql.org/pgsql-hackers/2010-04/msg00592.php

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#33Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#30)
Re: walreceiver is uninterruptible on win32

On Fri, Apr 16, 2010 at 09:03, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:

I have to admit to finding this confusing.  According to the comments:

+               /*
+                * Don't emulate the PQexec()'s behavior of returning the last
+                * result when there are many, since walreceiver never sends a
+                * query returning multiple results.
+                */

...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.

Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.

So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?

Applied with only minor stylistic changes. Thanks!

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/