*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 18,23 ****
--- 18,24 ----
  
  #include <unistd.h>
  #include <sys/time.h>
+ #include <time.h>
  
  #include "libpq-fe.h"
  #include "access/xlog.h"
***************
*** 53,59 **** static bool libpqrcv_receive(int timeout, unsigned char *type,
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
! static bool libpq_select(int timeout_ms);
  
  /*
   * Module load callback
--- 54,61 ----
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
! static int libpq_select(bool forRead, bool forWrite, int timeout_ms);
! static PGresult *libpqrcv_PQexec(const char *query);
  
  /*
   * Module load callback
***************
*** 83,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	TimeLineID	standby_tli;
  	PGresult   *res;
  	char		cmd[64];
  
  	/* Connect */
  	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
  
! 	streamConn = PQconnectdb(conninfo_repl);
! 	if (PQstatus(streamConn) != CONNECTION_OK)
  		ereport(ERROR,
  				(errmsg("could not connect to the primary server : %s",
  						PQerrorMessage(streamConn))));
  
  	/*
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
--- 85,198 ----
  	TimeLineID	standby_tli;
  	PGresult   *res;
  	char		cmd[64];
+ 	PQconninfoOption   *options;
+ 	time_t		finish_time = ((time_t) -1);
+ 
+ 	/*
+ 	 * Extract timeout from the connection string
+ 	 */
+ 	options = PQconninfoParse(conninfo, NULL);
+ 	if (options)
+ 	{
+ 		PQconninfoOption *option;
+ 		for (option = options; option->keyword != NULL; option++)
+ 		{
+ 			if (strcmp(option->keyword, "connect_timeout") == 0)
+ 			{
+ 				if (option->val != NULL && option->val[0] != '\0')
+ 				{
+ 					int		timeout = atoi(option->val);
+ 
+ 					if (timeout > 0)
+ 					{
+ 						/*
+ 						 * Rounding could cause connection to fail;
+ 						 * need at least 2 secs
+ 						 */
+ 						if (timeout < 2)
+ 							timeout = 2;
+ 						/* calculate the finish time based on start + timeout */
+ 						finish_time = time(NULL) + timeout;
+ 					}
+ 				}
+ 			}
+ 		}
+ 		PQconninfoFree(options);
+ 	}
  
  	/* Connect */
  	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
  
! 	streamConn = PQconnectStart(conninfo_repl);
! 	if (PQstatus(streamConn) == CONNECTION_BAD)
  		ereport(ERROR,
  				(errmsg("could not connect to the primary server : %s",
  						PQerrorMessage(streamConn))));
  
  	/*
+ 	 * Wait for connection to be established
+ 	 */
+ 	for (;;)
+ 	{
+ 		PostgresPollingStatusType	status;
+ 		bool		established = false;
+ 		bool		forRead = false;
+ 		bool		forWrite = false;
+ 		int		timeout_ms;
+ 		int		ret;
+ 
+ 		status = PQconnectPoll(streamConn);
+ 		switch (status)
+ 		{
+ 			case PGRES_POLLING_READING:
+ 				forRead = true;
+ 				break;
+ 			case PGRES_POLLING_WRITING:
+ 				forWrite = true;
+ 				break;
+ 			case PGRES_POLLING_OK:
+ 				established = true;
+ 				break;
+ 			case PGRES_POLLING_FAILED:
+ 			default:
+ 				ereport(ERROR,
+ 						(errmsg("could not connect to the primary server : %s",
+ 								PQerrorMessage(streamConn))));
+ 		}
+ 
+ 		if (established)
+ 			break;
+ 
+ 	retry:
+ 		/* Compute appropriate timeout interval */
+ 		if (finish_time == ((time_t) -1))
+ 			timeout_ms = -1;
+ 		else
+ 		{
+ 			time_t		now = time(NULL);
+ 
+ 			if (finish_time > now)
+ 				timeout_ms = (finish_time - now) * 1000;
+ 			else
+ 				timeout_ms = 0;
+ 		}
+ 
+ 		/*
+ 		 * Wait until we can read or write the connection socket
+ 		 */
+ 		ret = libpq_select(forRead, forWrite, timeout_ms);
+ 		if (ret == 0)	/* timeout */
+ 			ereport(ERROR,
+ 					(errmsg("could not connect to the primary server : timeout expired")));
+ 		if (ret < 0) /* interrupted */
+ 			goto retry;
+ 	}
+ 
+ 	/*
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = PQexec(streamConn, cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start XLOG streaming: %s",
  						PQerrorMessage(streamConn))));
  	PQclear(res);
  
  	justconnected = true;
--- 244,257 ----
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = libpqrcv_PQexec(cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
+ 	{
+ 		PQclear(res);
  		ereport(ERROR,
  				(errmsg("could not start XLOG streaming: %s",
  						PQerrorMessage(streamConn))));
+ 	}
  	PQclear(res);
  
  	justconnected = true;
***************
*** 162,176 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  }
  
  /*
!  * Wait until we can read WAL stream, or timeout.
   *
!  * Returns true if data has become available for reading, false if timed out
!  * or interrupted by signal.
   *
   * This is based on pqSocketCheck.
   */
! static bool
! libpq_select(int timeout_ms)
  {
  	int			ret;
  
--- 260,275 ----
  }
  
  /*
!  * Wait until we can read or write the connection socket
   *
!  * Returns >0 if data has been ready to be read, written,
!  * or both, 0 if timed out, -1 if interrupted by signal.
!  * Throws an error if an error occurred.
   *
   * This is based on pqSocketCheck.
   */
! static int
! libpq_select(bool forRead, bool forWrite, int timeout_ms)
  {
  	int			ret;
  
***************
*** 180,203 **** libpq_select(int timeout_ms)
  				(errcode_for_socket_access(),
  				 errmsg("socket not open")));
  
  	/* We use poll(2) if available, otherwise select(2) */
  	{
  #ifdef HAVE_POLL
  		struct pollfd input_fd;
  
  		input_fd.fd = PQsocket(streamConn);
! 		input_fd.events = POLLIN | POLLERR;
  		input_fd.revents = 0;
  
  		ret = poll(&input_fd, 1, timeout_ms);
  #else							/* !HAVE_POLL */
  
  		fd_set		input_mask;
  		struct timeval timeout;
  		struct timeval *ptr_timeout;
  
  		FD_ZERO(&input_mask);
! 		FD_SET		(PQsocket(streamConn), &input_mask);
  
  		if (timeout_ms < 0)
  			ptr_timeout = NULL;
--- 279,314 ----
  				(errcode_for_socket_access(),
  				 errmsg("socket not open")));
  
+ 	Assert(forRead || forWrite);
+ 
  	/* We use poll(2) if available, otherwise select(2) */
  	{
  #ifdef HAVE_POLL
  		struct pollfd input_fd;
  
  		input_fd.fd = PQsocket(streamConn);
! 		input_fd.events = POLLERR;
  		input_fd.revents = 0;
  
+ 		if (forRead)
+ 			input_fd.events |= POLLIN;
+ 		if (forWrite)
+ 			input_fd.events |= POLLOUT;
+ 
  		ret = poll(&input_fd, 1, timeout_ms);
  #else							/* !HAVE_POLL */
  
  		fd_set		input_mask;
+ 		fd_set		output_mask;
  		struct timeval timeout;
  		struct timeval *ptr_timeout;
  
  		FD_ZERO(&input_mask);
! 		FD_ZERO(&output_mask);
! 		if (forRead)
! 			FD_SET		(PQsocket(streamConn), &input_mask);
! 		if (forWrite)
! 			FD_SET		(PQsocket(streamConn), &output_mask);
  
  		if (timeout_ms < 0)
  			ptr_timeout = NULL;
***************
*** 209,225 **** libpq_select(int timeout_ms)
  		}
  
  		ret = select(PQsocket(streamConn) + 1, &input_mask,
! 					 NULL, NULL, ptr_timeout);
  #endif   /* HAVE_POLL */
  	}
  
! 	if (ret == 0 || (ret < 0 && errno == EINTR))
! 		return false;
  	if (ret < 0)
  		ereport(ERROR,
  				(errcode_for_socket_access(),
  				 errmsg("select() failed: %m")));
! 	return true;
  }
  
  /*
--- 320,397 ----
  		}
  
  		ret = select(PQsocket(streamConn) + 1, &input_mask,
! 					 &output_mask, NULL, ptr_timeout);
  #endif   /* HAVE_POLL */
  	}
  
! 	if (ret == 0)	/* timeout */
! 		return 0;
! 	if (ret < 0 && errno == EINTR)	/* interrupted */
! 		return -1;
  	if (ret < 0)
  		ereport(ERROR,
  				(errcode_for_socket_access(),
  				 errmsg("select() failed: %m")));
! 	return ret;
! }
! 
! /*
!  * Send a query and wait for the result by using the asynchronous libpq
!  * functions and the backend version of select().
!  *
!  * On Windows, walreceiver must use this function instead of the blocking
!  * libpq functions like PQexec() since they use the vanilla select() and
!  * are uninterruptible by our emulated signals. On the other hand, this
!  * function is interruptible since it uses the signal emulation layer
!  * compatible select().
!  */
! static PGresult *
! libpqrcv_PQexec(const char *query)
! {
! 	PGresult   *res = NULL;
! 
! 	/*
! 	 * Submit a query. Since we don't use non-blocking mode, this also
! 	 * can block. But its risk is relatively small, so we ignore that
! 	 * for now.
! 	 */
! 	if (!PQsendQuery(streamConn, query))
! 		return NULL;
! 
! 	for (;;)
! 	{
! 		PGresult   *next;
! 
! 		/*
! 		 * Receive data until PQgetResult has been ready to get the
! 		 * result without blocking.
! 		 */
! 		while (PQisBusy(streamConn))
! 		{
! 			if (libpq_select(true, false, -1) < 0)
! 				continue;			/* interrupted */
! 			if (PQconsumeInput(streamConn) == 0)
! 				return NULL;		/* trouble */
! 		}
! 
! 		/*
! 		 * Don't emulate the PQexec()'s behavior of returning the last
! 		 * result, if there's many, since walreceiver never sends a query
! 		 * returning multiple results.
! 		 */
! 		if ((next = PQgetResult(streamConn)) == NULL)
! 			break;				/* query is complete */
! 		if (PQresultStatus(next) == PGRES_FATAL_ERROR)
! 			return next;
! 		PQclear(res);
! 		res = next;
! 		if (PQresultStatus(res) == PGRES_COPY_IN ||
! 			PQresultStatus(res) == PGRES_COPY_OUT ||
! 			PQstatus(streamConn) == CONNECTION_BAD)
! 			break;
! 	}
! 
! 	return res;
  }
  
  /*
***************
*** 268,274 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
  	 */
  	if (timeout > 0 && !justconnected)
  	{
! 		if (!libpq_select(timeout))
  			return false;
  
  		if (PQconsumeInput(streamConn) == 0)
--- 440,446 ----
  	 */
  	if (timeout > 0 && !justconnected)
  	{
! 		if (libpq_select(true, false, timeout) <= 0)
  			return false;
  
  		if (PQconsumeInput(streamConn) == 0)
