Non-blocking communication between a frontend and a backend (pqcomm)

Started by Fujii Masaoover 16 years ago13 messages
#1Fujii Masao
masao.fujii@gmail.com
1 attachment(s)

Hi,

http://archives.postgresql.org/pgsql-hackers/2009-07/msg00191.php

In line with Robert's suggestion, I submit non-blocking pqcomm patch
as a self-contained one.

This patch provides support for non-blocking communication between
a frontend and a backend. The upcoming synchronous replication patch
needs this to make walsender send XLOG records and receive a reply
from the standby server concurrently. Specifically, this patch provides
the function to check for a socket by using "select() / poll()", and
the functions to read the data from local buffer instead of the connection.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

nonblocking_pqcomm_0703.patchapplication/octet-stream; name=nonblocking_pqcomm_0703.patchDownload
diff -rc base/src/backend/libpq/be-secure.c new/src/backend/libpq/be-secure.c
*** base/src/backend/libpq/be-secure.c	2008-12-20 10:28:26.000000000 +0900
--- new/src/backend/libpq/be-secure.c	2008-12-20 11:29:46.000000000 +0900
***************
*** 71,76 ****
--- 71,86 ----
  #endif
  #endif   /* USE_SSL */
  
+ #ifdef HAVE_POLL_H
+ #include <poll.h>
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include <sys/poll.h>
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include <sys/select.h>
+ #endif
+ 
  #include "libpq/libpq.h"
  #include "tcop/tcopprot.h"
  
***************
*** 388,393 ****
--- 398,471 ----
  	return n;
  }
  
+ /*
+  * Checks a socket, using poll or select, for data to be read, written,
+  * or both.  Returns >0 if one or more conditions are met, 0 if it timed
+  * out, -1 if an error occurred.
+  *
+  * Timeout is specified in millisec. Timeout is infinite if timeout_ms
+  * is negative.  Timeout is immediate (no blocking) if timeout_ms is 0.
+  *
+  * If SSL is in use, the SSL buffer is checked prior to checking the socket
+  * for read data directly.
+  *
+  * This function is based on pqSocketCheck and pqSocketPoll.
+  */
+ int
+ secure_poll(Port *port, bool forRead, bool forWrite, int timeout_ms)
+ {
+ #ifdef USE_SSL
+ 	/* Check for SSL library buffering read bytes */
+ 	if (forRead && conn->ssl && SSL_pending(port->ssl) > 0)
+ 	{
+ 		/* short-circuit the select */
+ 		return 1;
+ 	}
+ #endif
+ 
+ 	if (!forRead && !forWrite)
+ 		return 0;
+ 	else
+ 	{
+ 		/* We use poll(2) if available, otherwise select(2) */
+ #ifdef HAVE_POLL
+ 		struct pollfd input_fd;
+ 		
+ 		input_fd.fd = port->sock;
+ 		input_fd.events = POLLERR;
+ 		input_fd.revents = 0;
+ 		
+ 		if (forRead)
+ 			input_fd.events |= POLLIN;
+ 		if (forWrite)
+ 			input_fd.events |= POLLOUT;
+ 		
+ 		return poll(&input_fd, 1, timeout_ms);
+ #else							/* !HAVE_POLL */
+ 
+ 		fd_set		input_mask;
+ 		fd_set		output_mask;
+ 		fd_set		except_mask;
+ 		struct timeval timeout;
+ 		
+ 		FD_ZERO(&input_mask);
+ 		FD_ZERO(&output_mask);
+ 		FD_ZERO(&except_mask);
+ 		if (forRead)
+ 			FD_SET(port->sock, &input_mask);
+ 		if (forWrite)
+ 			FD_SET(port->sock, &output_mask);
+ 		FD_SET(port->sock, &except_mask);
+ 		
+ 		timeout.tv_sec	= timeout_ms / 1000;
+ 		timeout.tv_usec	= (timeout_ms % 1000) * 1000;
+ 
+ 		return select(port->sock + 1, &input_mask, &output_mask,
+ 					  &except_mask, &timeout);
+ #endif   /* HAVE_POLL */
+ 	}
+ }
+ 
  /* ------------------------------------------------------------ */
  /*						  SSL specific code						*/
  /* ------------------------------------------------------------ */
diff -rc base/src/backend/libpq/pqcomm.c new/src/backend/libpq/pqcomm.c
*** base/src/backend/libpq/pqcomm.c	2008-12-20 10:28:26.000000000 +0900
--- new/src/backend/libpq/pqcomm.c	2008-12-20 11:29:46.000000000 +0900
***************
*** 49,60 ****
--- 49,64 ----
   *
   * low-level I/O:
   *		pq_getbytes		- get a known number of bytes from connection
+  *		pq_getbufbytes	- get a known number of bytes from buffer
   *		pq_getstring	- get a null terminated string from connection
   *		pq_getmessage	- get a message with length word from connection
   *		pq_getbyte		- get next byte from connection
+  *		pq_getbufbyte	- get next byte from buffer
   *		pq_peekbyte		- peek at next byte from connection
+  *		pq_peekbufbyte	- peek at next byte from buffer
   *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
   *		pq_flush		- flush pending output
+  *		pq_wait			- wait until we can read or write the connection
   *
   * message-level I/O (and old-style-COPY-OUT cruft):
   *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
***************
*** 729,735 ****
   *		returns 0 if OK, EOF if trouble
   * --------------------------------
   */
! static int
  pq_recvbuf(void)
  {
  	if (PqRecvPointer > 0)
--- 733,739 ----
   *		returns 0 if OK, EOF if trouble
   * --------------------------------
   */
! int
  pq_recvbuf(void)
  {
  	if (PqRecvPointer > 0)
***************
*** 799,804 ****
--- 803,821 ----
  }
  
  /* --------------------------------
+  *		pq_getbufbyte	- get a single byte from buffer, or return EOF
+  * --------------------------------
+  */
+ int
+ pq_getbufbyte(void)
+ {
+ 	if (PqRecvPointer >= PqRecvLength)
+ 		return EOF;			/* Failed to read buffered data */
+ 
+ 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
+ }
+ 
+ /* --------------------------------
   *		pq_peekbyte		- peek at next byte from connection
   *
   *	 Same as pq_getbyte() except we don't advance the pointer.
***************
*** 816,821 ****
--- 833,853 ----
  }
  
  /* --------------------------------
+  *		pq_peekbufbyte		- peek at next byte from buffer
+  *
+  *	 Same as pq_getbufbyte() except we don't advance the pointer.
+  * --------------------------------
+  */
+ int
+ pq_peekbufbyte(void)
+ {
+ 	if (PqRecvPointer >= PqRecvLength)
+ 		return EOF;			/* Failed to read buffered data */
+ 
+ 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
+ }
+ 
+ /* --------------------------------
   *		pq_getbytes		- get a known number of bytes from connection
   *
   *		returns 0 if OK, EOF if trouble
***************
*** 845,850 ****
--- 877,900 ----
  }
  
  /* --------------------------------
+  *		pq_getbufbytes		- get a known number of bytes from buffer
+  *
+  *		returns 0 if OK, EOF otherwise
+  * --------------------------------
+  */
+ int
+ pq_getbufbytes(char *s, size_t len)
+ {
+ 	if (len < 0 || len > (size_t) (PqRecvLength - PqRecvPointer))
+ 		return EOF;			/* Failed to read buffered data */
+ 	
+ 	memcpy(s, PqRecvBuffer + PqRecvPointer, len);
+ 	PqRecvPointer += len;
+ 	
+ 	return 0;
+ }
+ 
+ /* --------------------------------
   *		pq_discardbytes		- throw away a known number of bytes
   *
   *		same as pq_getbytes except we do not copy the data to anyplace.
***************
*** 1124,1129 ****
--- 1174,1218 ----
  	return 0;
  }
  
+ /* --------------------------------
+  *		pq_wait		- wait until we can read or write the connection socket.
+  *
+  *		returns >0 if one or more conditions are met, 0 if it timed out or
+  *		interrupted, -1	if an error directly.
+  *
+  *		this function is based on pqSocketCheck.
+  * --------------------------------
+  */
+ int
+ pq_wait(bool forRead, bool forWrite, int timeout_ms)
+ {
+ 	int	result;
+ 	
+ 	if (!MyProcPort)
+ 		return -1;
+ 	if (MyProcPort->sock < 0)
+ 	{
+ 		ereport(COMMERROR,
+ 				(errcode_for_socket_access(),
+ 				 errmsg("socket not open")));
+ 		return -1;
+ 	}
+ 	
+ 	result = secure_poll(MyProcPort, forRead, forWrite, timeout_ms);
+ 	
+ 	if (result < 0)
+ 	{
+ 		if (errno == EINTR)
+ 			return 0;		/* Ok if we were interrupted */
+ 
+ 		ereport(COMMERROR,
+ 				(errcode_for_socket_access(),
+ 				 errmsg("select() failed: %m")));
+ 	}
+ 	
+ 	return result;
+ }
+ 
  
  /* --------------------------------
   * Message-level I/O routines begin here.
diff -rc base/src/include/libpq/libpq.h new/src/include/libpq/libpq.h
*** base/src/include/libpq/libpq.h	2008-12-20 10:28:45.000000000 +0900
--- new/src/include/libpq/libpq.h	2008-12-20 11:29:46.000000000 +0900
***************
*** 52,64 ****
--- 52,69 ----
  extern void TouchSocketFile(void);
  extern void pq_init(void);
  extern void pq_comm_reset(void);
+ extern int	pq_recvbuf(void);
  extern int	pq_getbytes(char *s, size_t len);
+ extern int	pq_getbufbytes(char *s, size_t len);
  extern int	pq_getstring(StringInfo s);
  extern int	pq_getmessage(StringInfo s, int maxlen);
  extern int	pq_getbyte(void);
+ extern int	pq_getbufbyte(void);
  extern int	pq_peekbyte(void);
+ extern int	pq_peekbufbyte(void);
  extern int	pq_putbytes(const char *s, size_t len);
  extern int	pq_flush(void);
+ extern int	pq_wait(bool forRead, bool forWrite, int timeout_ms);
  extern int	pq_putmessage(char msgtype, const char *s, size_t len);
  extern void pq_startcopyout(void);
  extern void pq_endcopyout(bool errorAbort);
***************
*** 73,77 ****
--- 78,83 ----
  extern void secure_close(Port *port);
  extern ssize_t secure_read(Port *port, void *ptr, size_t len);
  extern ssize_t secure_write(Port *port, void *ptr, size_t len);
+ extern int	secure_poll(Port *port, bool forRead, bool forWrite, int timeout_ms);
  
  #endif   /* LIBPQ_H */
#2Martin Pihlak
martin.pihlak@gmail.com
In reply to: Fujii Masao (#1)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Fujii Masao wrote:

http://archives.postgresql.org/pgsql-hackers/2009-07/msg00191.php

In line with Robert's suggestion, I submit non-blocking pqcomm patch
as a self-contained one.

Here's my initial review of the non-blocking pqcomm patch. The patch applies
cleanly and passes regression. Generally looks nice and clean. Couple of remarks
from the department of nitpicking:

* In secure_poll() the handling of timeouts is different depending whether
poll(), select() or SSL_pending() is used. The latter doesn't use the
timeout value at all, and for select() it is impossible to specify indefinite
timeout.
* occasional "blank" lines consisting of a single tab character -- maybe
a left-over from editor auto-indent. Not sure of how much a problem this
is, given that the blanks will be removed by pg_indent.
* Comment on pq_wait() seems to have a typo: "-1 if an error directly."

I have done limited testing on Linux i686 (HAVE_POLL only) -- the non-blocking
functions behave as expected.

regards,
Martin

#3Robert Haas
robertmhaas@gmail.com
In reply to: Martin Pihlak (#2)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

On Fri, Jul 17, 2009 at 5:26 PM, Martin Pihlak<martin.pihlak@gmail.com> wrote:

Fujii Masao wrote:

http://archives.postgresql.org/pgsql-hackers/2009-07/msg00191.php

In line with Robert's suggestion, I submit non-blocking pqcomm patch
as a self-contained one.

Here's my initial review of the non-blocking pqcomm patch. The patch applies
cleanly and passes regression. Generally looks nice and clean. Couple of remarks
from the department of nitpicking:

* In secure_poll() the handling of timeouts is different depending whether
 poll(), select() or SSL_pending() is used. The latter doesn't use the
 timeout value at all, and for select() it is impossible to specify indefinite
 timeout.
* occasional "blank" lines consisting of a single tab character -- maybe
 a left-over from editor auto-indent. Not sure of how much a problem this
 is, given that the blanks will be removed by pg_indent.
* Comment on pq_wait() seems to have a typo: "-1 if an error directly."

I have done limited testing on Linux i686 (HAVE_POLL only) -- the non-blocking
functions behave as expected.

Fujii Masao,

Are you planning to update this patch based on Martin's review?

...Robert

#4Fujii Masao
masao.fujii@gmail.com
In reply to: Robert Haas (#3)
1 attachment(s)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Hi,

On Wed, Jul 22, 2009 at 2:20 AM, Robert Haas<robertmhaas@gmail.com> wrote:

Fujii Masao,

Are you planning to update this patch based on Martin's review?

Sure. Attached is an updated patch.

On Fri, Jul 17, 2009 at 5:26 PM, Martin Pihlak<martin.pihlak@gmail.com> wrote:

Here's my initial review of the non-blocking pqcomm patch. The patch applies
cleanly and passes regression. Generally looks nice and clean. Couple of remarks
from the department of nitpicking:

Thanks for reviewing the patch!

* In secure_poll() the handling of timeouts is different depending whether
 poll(), select() or SSL_pending() is used. The latter doesn't use the
 timeout value at all, and for select() it is impossible to specify indefinite
 timeout.

Fixed. I tweaked the handling of the fifth argument 'timeout' of select(); when
a negative number is specified to a timeout of secure_poll(), NULL is set to
that 'timeout', which can block select() indefinitely.

Since SSL_pending() doesn't wait for data to arrive (i.e., doesn't use timeout),
I didn't change the code related to that function.

* occasional "blank" lines consisting of a single tab character -- maybe
 a left-over from editor auto-indent. Not sure of how much a problem this
 is, given that the blanks will be removed by pg_indent.

Fixed.

* Comment on pq_wait() seems to have a typo: "-1 if an error directly."

Fixed.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

nonblocking_pqcomm_0722.patchapplication/octet-stream; name=nonblocking_pqcomm_0722.patchDownload
diff -rcN base/src/backend/libpq/be-secure.c new/src/backend/libpq/be-secure.c
*** base/src/backend/libpq/be-secure.c	2009-07-22 14:41:58.000000000 +0900
--- new/src/backend/libpq/be-secure.c	2009-07-22 14:42:05.000000000 +0900
***************
*** 71,76 ****
--- 71,86 ----
  #endif
  #endif   /* USE_SSL */
  
+ #ifdef HAVE_POLL_H
+ #include <poll.h>
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include <sys/poll.h>
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include <sys/select.h>
+ #endif
+ 
  #include "libpq/libpq.h"
  #include "tcop/tcopprot.h"
  
***************
*** 388,393 ****
--- 398,478 ----
  	return n;
  }
  
+ /*
+  * Checks a socket, using poll or select, for data to be read, written,
+  * or both.  Returns >0 if one or more conditions are met, 0 if it timed
+  * out, -1 if an error occurred.
+  *
+  * Timeout is specified in millisec. Timeout is infinite if timeout_ms
+  * is negative.  Timeout is immediate (no blocking) if timeout_ms is 0.
+  *
+  * If SSL is in use, the SSL buffer is checked prior to checking the socket
+  * for read data directly.
+  *
+  * This function is based on pqSocketCheck and pqSocketPoll.
+  */
+ int
+ secure_poll(Port *port, bool forRead, bool forWrite, int timeout_ms)
+ {
+ #ifdef USE_SSL
+ 	/* Check for SSL library buffering read bytes */
+ 	if (forRead && conn->ssl && SSL_pending(port->ssl) > 0)
+ 	{
+ 		/* short-circuit the select */
+ 		return 1;
+ 	}
+ #endif
+ 
+ 	if (!forRead && !forWrite)
+ 		return 0;
+ 	else
+ 	{
+ 		/* We use poll(2) if available, otherwise select(2) */
+ #ifdef HAVE_POLL
+ 		struct pollfd input_fd;
+ 
+ 		input_fd.fd = port->sock;
+ 		input_fd.events = POLLERR;
+ 		input_fd.revents = 0;
+ 
+ 		if (forRead)
+ 			input_fd.events |= POLLIN;
+ 		if (forWrite)
+ 			input_fd.events |= POLLOUT;
+ 
+ 		return poll(&input_fd, 1, timeout_ms);
+ #else							/* !HAVE_POLL */
+ 
+ 		fd_set		input_mask;
+ 		fd_set		output_mask;
+ 		fd_set		except_mask;
+ 		struct timeval timeout;
+ 		struct timeval *ptr_timeout;
+ 
+ 		FD_ZERO(&input_mask);
+ 		FD_ZERO(&output_mask);
+ 		FD_ZERO(&except_mask);
+ 		if (forRead)
+ 			FD_SET(port->sock, &input_mask);
+ 		if (forWrite)
+ 			FD_SET(port->sock, &output_mask);
+ 		FD_SET(port->sock, &except_mask);
+ 
+ 		if (timeout_ms < 0)
+ 			ptr_timeout = NULL;
+ 		else
+ 		{
+ 			timeout.tv_sec	= timeout_ms / 1000;
+ 			timeout.tv_usec	= (timeout_ms % 1000) * 1000;
+ 			ptr_timeout		= &timeout;
+ 		}
+ 
+ 		return select(port->sock + 1, &input_mask, &output_mask,
+ 					  &except_mask, ptr_timeout);
+ #endif   /* HAVE_POLL */
+ 	}
+ }
+ 
  /* ------------------------------------------------------------ */
  /*						  SSL specific code						*/
  /* ------------------------------------------------------------ */
diff -rcN base/src/backend/libpq/pqcomm.c new/src/backend/libpq/pqcomm.c
*** base/src/backend/libpq/pqcomm.c	2009-07-22 14:41:58.000000000 +0900
--- new/src/backend/libpq/pqcomm.c	2009-07-22 14:43:12.000000000 +0900
***************
*** 49,60 ****
--- 49,64 ----
   *
   * low-level I/O:
   *		pq_getbytes		- get a known number of bytes from connection
+  *		pq_getbufbytes	- get a known number of bytes from buffer
   *		pq_getstring	- get a null terminated string from connection
   *		pq_getmessage	- get a message with length word from connection
   *		pq_getbyte		- get next byte from connection
+  *		pq_getbufbyte	- get next byte from buffer
   *		pq_peekbyte		- peek at next byte from connection
+  *		pq_peekbufbyte	- peek at next byte from buffer
   *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
   *		pq_flush		- flush pending output
+  *		pq_wait			- wait until we can read or write the connection
   *
   * message-level I/O (and old-style-COPY-OUT cruft):
   *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
***************
*** 729,735 ****
   *		returns 0 if OK, EOF if trouble
   * --------------------------------
   */
! static int
  pq_recvbuf(void)
  {
  	if (PqRecvPointer > 0)
--- 733,739 ----
   *		returns 0 if OK, EOF if trouble
   * --------------------------------
   */
! int
  pq_recvbuf(void)
  {
  	if (PqRecvPointer > 0)
***************
*** 799,804 ****
--- 803,821 ----
  }
  
  /* --------------------------------
+  *		pq_getbufbyte	- get a single byte from buffer, or return EOF
+  * --------------------------------
+  */
+ int
+ pq_getbufbyte(void)
+ {
+ 	if (PqRecvPointer >= PqRecvLength)
+ 		return EOF;			/* Failed to read buffered data */
+ 
+ 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
+ }
+ 
+ /* --------------------------------
   *		pq_peekbyte		- peek at next byte from connection
   *
   *	 Same as pq_getbyte() except we don't advance the pointer.
***************
*** 816,821 ****
--- 833,853 ----
  }
  
  /* --------------------------------
+  *		pq_peekbufbyte		- peek at next byte from buffer
+  *
+  *	 Same as pq_getbufbyte() except we don't advance the pointer.
+  * --------------------------------
+  */
+ int
+ pq_peekbufbyte(void)
+ {
+ 	if (PqRecvPointer >= PqRecvLength)
+ 		return EOF;			/* Failed to read buffered data */
+ 
+ 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
+ }
+ 
+ /* --------------------------------
   *		pq_getbytes		- get a known number of bytes from connection
   *
   *		returns 0 if OK, EOF if trouble
***************
*** 845,850 ****
--- 877,900 ----
  }
  
  /* --------------------------------
+  *		pq_getbufbytes		- get a known number of bytes from buffer
+  *
+  *		returns 0 if OK, EOF otherwise
+  * --------------------------------
+  */
+ int
+ pq_getbufbytes(char *s, size_t len)
+ {
+ 	if (len < 0 || len > (size_t) (PqRecvLength - PqRecvPointer))
+ 		return EOF;			/* Failed to read buffered data */
+ 
+ 	memcpy(s, PqRecvBuffer + PqRecvPointer, len);
+ 	PqRecvPointer += len;
+ 
+ 	return 0;
+ }
+ 
+ /* --------------------------------
   *		pq_discardbytes		- throw away a known number of bytes
   *
   *		same as pq_getbytes except we do not copy the data to anyplace.
***************
*** 1124,1129 ****
--- 1174,1218 ----
  	return 0;
  }
  
+ /* --------------------------------
+  *		pq_wait		- wait until we can read or write the connection socket.
+  *
+  *		returns >0 if one or more conditions are met, 0 if it timed out or
+  *		interrupted, -1	if an error occurred.
+  *
+  *		this function is based on pqSocketCheck.
+  * --------------------------------
+  */
+ int
+ pq_wait(bool forRead, bool forWrite, int timeout_ms)
+ {
+ 	int	result;
+ 
+ 	if (!MyProcPort)
+ 		return -1;
+ 	if (MyProcPort->sock < 0)
+ 	{
+ 		ereport(COMMERROR,
+ 				(errcode_for_socket_access(),
+ 				 errmsg("socket not open")));
+ 		return -1;
+ 	}
+ 
+ 	result = secure_poll(MyProcPort, forRead, forWrite, timeout_ms);
+ 
+ 	if (result < 0)
+ 	{
+ 		if (errno == EINTR)
+ 			return 0;		/* Ok if we were interrupted */
+ 
+ 		ereport(COMMERROR,
+ 				(errcode_for_socket_access(),
+ 				 errmsg("select() failed: %m")));
+ 	}
+ 
+ 	return result;
+ }
+ 
  
  /* --------------------------------
   * Message-level I/O routines begin here.
diff -rcN base/src/include/libpq/libpq.h new/src/include/libpq/libpq.h
*** base/src/include/libpq/libpq.h	2009-07-22 14:41:58.000000000 +0900
--- new/src/include/libpq/libpq.h	2009-07-22 14:42:05.000000000 +0900
***************
*** 52,64 ****
--- 52,69 ----
  extern void TouchSocketFile(void);
  extern void pq_init(void);
  extern void pq_comm_reset(void);
+ extern int	pq_recvbuf(void);
  extern int	pq_getbytes(char *s, size_t len);
+ extern int	pq_getbufbytes(char *s, size_t len);
  extern int	pq_getstring(StringInfo s);
  extern int	pq_getmessage(StringInfo s, int maxlen);
  extern int	pq_getbyte(void);
+ extern int	pq_getbufbyte(void);
  extern int	pq_peekbyte(void);
+ extern int	pq_peekbufbyte(void);
  extern int	pq_putbytes(const char *s, size_t len);
  extern int	pq_flush(void);
+ extern int	pq_wait(bool forRead, bool forWrite, int timeout_ms);
  extern int	pq_putmessage(char msgtype, const char *s, size_t len);
  extern void pq_startcopyout(void);
  extern void pq_endcopyout(bool errorAbort);
***************
*** 73,77 ****
--- 78,83 ----
  extern void secure_close(Port *port);
  extern ssize_t secure_read(Port *port, void *ptr, size_t len);
  extern ssize_t secure_write(Port *port, void *ptr, size_t len);
+ extern int	secure_poll(Port *port, bool forRead, bool forWrite, int timeout_ms);
  
  #endif   /* LIBPQ_H */
#5Tom Lane
tgl@sss.pgh.pa.us
In reply to: Fujii Masao (#4)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Fujii Masao <masao.fujii@gmail.com> writes:

On Wed, Jul 22, 2009 at 2:20 AM, Robert Haas<robertmhaas@gmail.com> wrote:

Are you planning to update this patch based on Martin's review?

Sure. Attached is an updated patch.

I looked at this patch. I don't see how we can consider accepting it
by itself. It adds a bunch of code that is not used anywhere and hence
can't be tested, in service of goals explained nowhere, but presumably
part of some other patch that hasn't been reviewed and might or might
not get accepted when it is presented. The only thing that's really
clear is that it pokes holes in the abstraction (such as it is)
presented by pqcomm.c.

The reason I want to see the calling code is that I doubt this is a very
useful API extension as-is. I can see the point of probing to see if
any more bytes are available, but it's not clear that there is a reason
to collect only part of a message once the client has sent one. I am
also thinking that if you do need the ability to get control back
without blocking on the socket, you probably will need that for writes
as well as reads; and this patch doesn't cover the write case.

I think you should just submit this with the code that uses it, so we
can evaluate whether the overall concept is a good one or not.

regards, tom lane

#6Tom Lane
tgl@sss.pgh.pa.us
In reply to: Tom Lane (#5)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

I wrote:

I am also thinking that if you do need the ability to get control back
without blocking on the socket, you probably will need that for writes
as well as reads; and this patch doesn't cover the write case.

Oh, another gripe: I'll bet a nickel that this doesn't work very nicely
under SSL. Bytes available on the socket doesn't necessarily equate to
decrypted payload bytes being available. Depending on how you're using
secure_poll, that might be okay, but it seems like a hazard waiting to
trap unwary maintainers.

regards, tom lane

#7Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#5)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

On Fri, Jul 24, 2009 at 7:21 PM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

Fujii Masao <masao.fujii@gmail.com> writes:

On Wed, Jul 22, 2009 at 2:20 AM, Robert Haas<robertmhaas@gmail.com> wrote:

Are you planning to update this patch based on Martin's review?

Sure. Attached is an updated patch.

I looked at this patch.  I don't see how we can consider accepting it
by itself.  It adds a bunch of code that is not used anywhere and hence
can't be tested, in service of goals explained nowhere, but presumably
part of some other patch that hasn't been reviewed and might or might
not get accepted when it is presented.  The only thing that's really
clear is that it pokes holes in the abstraction (such as it is)
presented by pqcomm.c.

The reason I want to see the calling code is that I doubt this is a very
useful API extension as-is.  I can see the point of probing to see if
any more bytes are available, but it's not clear that there is a reason
to collect only part of a message once the client has sent one.  I am
also thinking that if you do need the ability to get control back
without blocking on the socket, you probably will need that for writes
as well as reads; and this patch doesn't cover the write case.

I think you should just submit this with the code that uses it, so we
can evaluate whether the overall concept is a good one or not.

This was split out from Synch Rep based on my suggestion to submit
separately any parts that are separately committable, but that doesn't
seem to be the case given your comments here. I guess the question is
whether it's necessary and/or desirable to put in the effort to create
a general-purpose facility, or whether we should be satisfied with the
minimum level of infrastructure necessary to support Synch Rep and
just incorporate it into that patch.

Thoughts?

...Robert

#8Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#7)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Robert Haas <robertmhaas@gmail.com> writes:

On Fri, Jul 24, 2009 at 7:21 PM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

I think you should just submit this with the code that uses it, so we
can evaluate whether the overall concept is a good one or not.

This was split out from Synch Rep based on my suggestion to submit
separately any parts that are separately committable, but that doesn't
seem to be the case given your comments here. I guess the question is
whether it's necessary and/or desirable to put in the effort to create
a general-purpose facility, or whether we should be satisfied with the
minimum level of infrastructure necessary to support Synch Rep and
just incorporate it into that patch.

General-purpose facility *for what*? It's impossible to evaluate the
code without a definition of the purpose behind it.

What I actually think should come first is a spec for the client
protocol this is intended to support. It's not apparent to me at
the moment why the backend should need non-blocking read at all.

regards, tom lane

#9Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#8)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

On Sat, Jul 25, 2009 at 11:41 AM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

Robert Haas <robertmhaas@gmail.com> writes:

On Fri, Jul 24, 2009 at 7:21 PM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

I think you should just submit this with the code that uses it, so we
can evaluate whether the overall concept is a good one or not.

This was split out from Synch Rep based on my suggestion to submit
separately any parts that are separately committable, but that doesn't
seem to be the case given your comments here.  I guess the question is
whether it's necessary and/or desirable to put in the effort to create
a general-purpose facility, or whether we should be satisfied with the
minimum level of infrastructure necessary to support Synch Rep and
just incorporate it into that patch.

General-purpose facility *for what*?  It's impossible to evaluate the
code without a definition of the purpose behind it.

What I actually think should come first is a spec for the client
protocol this is intended to support.  It's not apparent to me at
the moment why the backend should need non-blocking read at all.

[ reads the patch ]

OK, I agree, I can't see what this is for either from the code that is
here. I think I read a little more meaning into the title of the
patch than was actually there. It seems like the appropriate thing to
do is mark this returned with feedback, so I'm going to go do that.

...Robert

#10Fujii Masao
masao.fujii@gmail.com
In reply to: Robert Haas (#9)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Hi,

On Sun, Jul 26, 2009 at 6:42 AM, Robert Haas<robertmhaas@gmail.com> wrote:

OK, I agree, I can't see what this is for either from the code that is
here.  I think I read a little more meaning into the title of the
patch than was actually there.  It seems like the appropriate thing to
do is mark this returned with feedback, so I'm going to go do that.

OK. I'll change and resubmit the patch with Synch Rep which uses it
in the next CommitFest.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#11Fujii Masao
masao.fujii@gmail.com
In reply to: Tom Lane (#6)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Hi,

On Sat, Jul 25, 2009 at 8:39 AM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

Oh, another gripe: I'll bet a nickel that this doesn't work very nicely
under SSL.  Bytes available on the socket doesn't necessarily equate to
decrypted payload bytes being available.  Depending on how you're using
secure_poll, that might be okay, but it seems like a hazard waiting to
trap unwary maintainers.

Is it only necessary to add the comment about how to use secure_poll?

There is the assumption that secure_poll must be used with secure_write/read
(e.g., in read case, pq_recvbuf instead of native recv should be called after
passing pq_wait). So, it's assumed that encrypted data are resolved in those
R/W functions and only decrypted data are located in buffer.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#12Tom Lane
tgl@sss.pgh.pa.us
In reply to: Fujii Masao (#11)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Fujii Masao <masao.fujii@gmail.com> writes:

On Sat, Jul 25, 2009 at 8:39 AM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

Oh, another gripe: I'll bet a nickel that this doesn't work very nicely
under SSL. �Bytes available on the socket doesn't necessarily equate to
decrypted payload bytes being available. �Depending on how you're using
secure_poll, that might be okay, but it seems like a hazard waiting to
trap unwary maintainers.

Is it only necessary to add the comment about how to use secure_poll?

There is the assumption that secure_poll must be used with secure_write/read
(e.g., in read case, pq_recvbuf instead of native recv should be called after
passing pq_wait). So, it's assumed that encrypted data are resolved in those
R/W functions and only decrypted data are located in buffer.

Well, actually, this description perfectly illustrates my basic
complaint: the patch breaks the API abstraction provided by pqcomm.c.
Callers are encouraged/forced to deal with the next layer down, and to
the extent that pqcomm.c does anything useful, callers have to allow for
that too.

As far as the read side of pq_wait goes, it should probably be more
like
* return true if any bytes are in the buffer
* else try to read some bytes into the buffer, without blocking
* now return true or false depending on whether bytes are in
the buffer.
(Or perhaps instead of true/false, return the count of available bytes.)
Also, I suspect the API needs to make a distinction between "no more
bytes available yet" and EOF (channel closed, so no more bytes ever
will be available).

I'm not sure about the write side. The patch isn't really addressing
blocking-on-write, except in the offhand way of having a forWrite option
in pq_wait, but that doesn't seem too well thought out to me. Again,
the buffering pqcomm does makes a direct query of the underlying state
seem pretty suspicious.

regards, tom lane

#13Fujii Masao
masao.fujii@gmail.com
In reply to: Tom Lane (#12)
Re: Non-blocking communication between a frontend and a backend (pqcomm)

Hi,

Thanks for the comment!

On Tue, Jul 28, 2009 at 12:27 AM, Tom Lane<tgl@sss.pgh.pa.us> wrote:

Well, actually, this description perfectly illustrates my basic
complaint: the patch breaks the API abstraction provided by pqcomm.c.
Callers are encouraged/forced to deal with the next layer down, and to
the extent that pqcomm.c does anything useful, callers have to allow for
that too.

As far as the read side of pq_wait goes, it should probably be more
like
* return true if any bytes are in the buffer
* else try to read some bytes into the buffer, without blocking
* now return true or false depending on whether bytes are in
the buffer.
(Or perhaps instead of true/false, return the count of available bytes.)

Seems good. But when we wait for 4 bytes and there is 1 byte in the buffer,
the above pq_wait always returns immediately, and we cannot get 4 bytes.
So, I think that we should make pq_wait monitor only the socket. And, we
should make the newly-introduced low-level I/O functions (such as
pq_getbufbytes) load some bytes into the buffer without blocking (i.e.,
pq_wait with no timeout is called by those functions), and return true/false
according to whether the specified bytes are in there. What is your opinion?

Also, I suspect the API needs to make a distinction between "no more
bytes available yet" and EOF (channel closed, so no more bytes ever
will be available).

That's right.

I'm not sure about the write side. The patch isn't really addressing
blocking-on-write, except in the offhand way of having a forWrite option
in pq_wait, but that doesn't seem too well thought out to me.

I'm inclined to use the write side in Synch Rep. So, I'll consider it carefully.

Again,
the buffering pqcomm does makes a direct query of the underlying state
seem pretty suspicious.

Sorry, I didn't get your point. Your concern is only the receiving of a query
from the frontend?

I'm not planning to change the existing communication of a query and result
between the backend and frontend. Non-blocking communication is used in
log-shipping between walsender and walreceiver. In order to speed up log-
shipping, the walsender should send the XLOG records and receive the
reply from walreceiver as concurrently as possible. In other words, I'd like to
prevent the pause that the walsender cannot send the outstanding records
during receiving with blocking. That pause might take long because of slow
network.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center