>From cd9b902fa0dfd046fa32f3101b17e692757596a6 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Thu, 8 Jan 2015 15:50:10 +0100
Subject: [PATCH 02/10] Use a nonblocking socket for FE/BE communication and
 block using latches.

This allows to introduce more elaborate handling of interrupts while
reading from a socket.  Currently some interrupt handlers have to do
significant work from inside signal handlers, and it's very hard to
correctly write code to do so.  Generic signal handler limitations,
combined with the fact that we can't safely jump out of a signal
handler while reading from the client have prohibited implemenation of
features like timeouts for idle-in-transaction.

This commit probably can't actually safely be used without the next
patch in the series, as WaitLatchOrSocket might not be fully signal
safe. Reviewing them separately is easier though, so I'll just fold
them before the final commit.

Author: Andres Freund
Reviewed-By: Heikki Linnakangas
---
 src/backend/libpq/be-secure-openssl.c | 32 ++++-----------
 src/backend/libpq/be-secure.c         | 76 ++++++++++++++++++++++++++++++++++-
 src/backend/libpq/pqcomm.c            | 41 ++++++++-----------
 3 files changed, 99 insertions(+), 50 deletions(-)

diff --git a/src/backend/libpq/be-secure-openssl.c b/src/backend/libpq/be-secure-openssl.c
index 1dd7770..729b746 100644
--- a/src/backend/libpq/be-secure-openssl.c
+++ b/src/backend/libpq/be-secure-openssl.c
@@ -371,12 +371,8 @@ aloop:
 		{
 			case SSL_ERROR_WANT_READ:
 			case SSL_ERROR_WANT_WRITE:
-#ifdef WIN32
-				pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl),
-											(err == SSL_ERROR_WANT_READ) ?
-						FD_READ | FD_CLOSE | FD_ACCEPT : FD_WRITE | FD_CLOSE,
-											INFINITE);
-#endif
+				/* not allowed during connection establishment */
+				Assert(!port->noblock);
 				goto aloop;
 			case SSL_ERROR_SYSCALL:
 				if (r < 0)
@@ -516,18 +512,9 @@ rloop:
 			break;
 		case SSL_ERROR_WANT_READ:
 		case SSL_ERROR_WANT_WRITE:
+			/* Don't retry if the socket is in nonblocking mode. */
 			if (port->noblock)
-			{
-				errno = EWOULDBLOCK;
-				n = -1;
 				break;
-			}
-#ifdef WIN32
-			pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl),
-										(err == SSL_ERROR_WANT_READ) ?
-									FD_READ | FD_CLOSE : FD_WRITE | FD_CLOSE,
-										INFINITE);
-#endif
 			goto rloop;
 		case SSL_ERROR_SYSCALL:
 			/* leave it to caller to ereport the value of errno */
@@ -630,12 +617,9 @@ wloop:
 			break;
 		case SSL_ERROR_WANT_READ:
 		case SSL_ERROR_WANT_WRITE:
-#ifdef WIN32
-			pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl),
-										(err == SSL_ERROR_WANT_READ) ?
-									FD_READ | FD_CLOSE : FD_WRITE | FD_CLOSE,
-										INFINITE);
-#endif
+			/* Don't retry if the socket is in nonblocking mode. */
+			if (port->noblock)
+				break;
 			goto wloop;
 		case SSL_ERROR_SYSCALL:
 			/* leave it to caller to ereport the value of errno */
@@ -722,7 +706,7 @@ my_sock_read(BIO *h, char *buf, int size)
 		if (res <= 0)
 		{
 			/* If we were interrupted, tell caller to retry */
-			if (errno == EINTR)
+			if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
 			{
 				BIO_set_retry_read(h);
 			}
@@ -741,7 +725,7 @@ my_sock_write(BIO *h, const char *buf, int size)
 	BIO_clear_retry_flags(h);
 	if (res <= 0)
 	{
-		if (errno == EINTR)
+		if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
 		{
 			BIO_set_retry_write(h);
 		}
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index c592f85..709131f 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -18,6 +18,8 @@
 
 #include "postgres.h"
 
+#include "miscadmin.h"
+
 #include <sys/stat.h>
 #include <signal.h>
 #include <fcntl.h>
@@ -34,6 +36,7 @@
 #include "libpq/libpq.h"
 #include "tcop/tcopprot.h"
 #include "utils/memutils.h"
+#include "storage/proc.h"
 
 
 char	   *ssl_cert_file;
@@ -147,7 +150,37 @@ secure_raw_read(Port *port, void *ptr, size_t len)
 
 	prepare_for_client_read();
 
+	/*
+	 * Try to read from the socket without blocking. If it suceeds we're
+	 * done, otherwise we'll wait for the socket using the latch mechanism.
+	 */
+rloop:
+#ifdef WIN32
+	pgwin32_noblock = true;
+#endif
 	n = recv(port->sock, ptr, len, 0);
+#ifdef WIN32
+	pgwin32_noblock = false;
+#endif
+
+	if (!port->noblock && n < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+	{
+		int		w;
+		int		save_errno = errno;
+
+		w = WaitLatchOrSocket(MyLatch,
+							  WL_SOCKET_READABLE,
+							  port->sock, 0);
+
+		if (w & WL_SOCKET_READABLE)
+			goto rloop;
+
+		/*
+		 * Restore errno, clobbered by WaitLatchOrSocket, so the caller can
+		 * react properly.
+		 */
+		errno = save_errno;
+	}
 
 	client_read_ended();
 
@@ -170,7 +203,9 @@ secure_write(Port *port, void *ptr, size_t len)
 	}
 	else
 #endif
+	{
 		n = secure_raw_write(port, ptr, len);
+	}
 
 	return n;
 }
@@ -178,5 +213,44 @@ secure_write(Port *port, void *ptr, size_t len)
 ssize_t
 secure_raw_write(Port *port, const void *ptr, size_t len)
 {
-	return send(port->sock, ptr, len, 0);
+	ssize_t		n;
+
+wloop:
+
+#ifdef WIN32
+	pgwin32_noblock = true;
+#endif
+	n = send(port->sock, ptr, len, 0);
+#ifdef WIN32
+	pgwin32_noblock = false;
+#endif
+
+	if (!port->noblock && n < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+	{
+		int		w;
+		int		save_errno = errno;
+
+		/*
+		 * We probably want to check for latches being set at some point
+		 * here. That'd allow us to handle interrupts while blocked on
+		 * writes. If set we'd not retry directly, but return. That way we
+		 * don't do anything while (possibly) inside a ssl library.
+		 */
+		w = WaitLatchOrSocket(MyLatch,
+							  WL_SOCKET_WRITEABLE,
+							  port->sock, 0);
+
+		if (w & WL_SOCKET_WRITEABLE)
+		{
+			goto wloop;
+		}
+
+		/*
+		 * Restore errno, clobbered by WaitLatchOrSocket, so the caller can
+		 * react properly.
+		 */
+		errno = save_errno;
+	}
+
+	return n;
 }
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index e3efac3..6f35508 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -179,6 +179,22 @@ pq_init(void)
 	PqCommBusy = false;
 	DoingCopyOut = false;
 	on_proc_exit(socket_close, 0);
+
+	/*
+	 * In individual backends we operate the underlying socket in nonblocking
+	 * mode and use latches to implement blocking semantics if needed. That
+	 * allows us to provide safely interruptible reads.
+	 *
+	 * Use COMMERROR on failure, because ERROR would try to send the error to
+	 * the client, which might require changing the mode again, leading to
+	 * infinite recursion.
+	 */
+#ifndef WIN32
+	if (!pg_set_noblock(MyProcPort->sock))
+		ereport(COMMERROR,
+				(errmsg("could not set socket to nonblocking mode: %m")));
+#endif
+
 }
 
 /* --------------------------------
@@ -818,31 +834,6 @@ socket_set_nonblocking(bool nonblocking)
 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
 				 errmsg("there is no client connection")));
 
-	if (MyProcPort->noblock == nonblocking)
-		return;
-
-#ifdef WIN32
-	pgwin32_noblock = nonblocking ? 1 : 0;
-#else
-
-	/*
-	 * Use COMMERROR on failure, because ERROR would try to send the error to
-	 * the client, which might require changing the mode again, leading to
-	 * infinite recursion.
-	 */
-	if (nonblocking)
-	{
-		if (!pg_set_noblock(MyProcPort->sock))
-			ereport(COMMERROR,
-					(errmsg("could not set socket to nonblocking mode: %m")));
-	}
-	else
-	{
-		if (!pg_set_block(MyProcPort->sock))
-			ereport(COMMERROR,
-					(errmsg("could not set socket to blocking mode: %m")));
-	}
-#endif
 	MyProcPort->noblock = nonblocking;
 }
 
-- 
2.0.0.rc2.4.g1dc51c6.dirty

