diff -rcN base/src/backend/libpq/be-secure.c new/src/backend/libpq/be-secure.c *** base/src/backend/libpq/be-secure.c 2009-07-22 14:41:58.000000000 +0900 --- new/src/backend/libpq/be-secure.c 2009-07-22 14:42:05.000000000 +0900 *************** *** 71,76 **** --- 71,86 ---- #endif #endif /* USE_SSL */ + #ifdef HAVE_POLL_H + #include + #endif + #ifdef HAVE_SYS_POLL_H + #include + #endif + #ifdef HAVE_SYS_SELECT_H + #include + #endif + #include "libpq/libpq.h" #include "tcop/tcopprot.h" *************** *** 388,393 **** --- 398,478 ---- return n; } + /* + * Checks a socket, using poll or select, for data to be read, written, + * or both. Returns >0 if one or more conditions are met, 0 if it timed + * out, -1 if an error occurred. + * + * Timeout is specified in millisec. Timeout is infinite if timeout_ms + * is negative. Timeout is immediate (no blocking) if timeout_ms is 0. + * + * If SSL is in use, the SSL buffer is checked prior to checking the socket + * for read data directly. + * + * This function is based on pqSocketCheck and pqSocketPoll. + */ + int + secure_poll(Port *port, bool forRead, bool forWrite, int timeout_ms) + { + #ifdef USE_SSL + /* Check for SSL library buffering read bytes */ + if (forRead && conn->ssl && SSL_pending(port->ssl) > 0) + { + /* short-circuit the select */ + return 1; + } + #endif + + if (!forRead && !forWrite) + return 0; + else + { + /* We use poll(2) if available, otherwise select(2) */ + #ifdef HAVE_POLL + struct pollfd input_fd; + + input_fd.fd = port->sock; + input_fd.events = POLLERR; + input_fd.revents = 0; + + if (forRead) + input_fd.events |= POLLIN; + if (forWrite) + input_fd.events |= POLLOUT; + + return poll(&input_fd, 1, timeout_ms); + #else /* !HAVE_POLL */ + + fd_set input_mask; + fd_set output_mask; + fd_set except_mask; + struct timeval timeout; + struct timeval *ptr_timeout; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + FD_ZERO(&except_mask); + if (forRead) + FD_SET(port->sock, &input_mask); + if (forWrite) + FD_SET(port->sock, &output_mask); + FD_SET(port->sock, &except_mask); + + if (timeout_ms < 0) + ptr_timeout = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; + } + + return select(port->sock + 1, &input_mask, &output_mask, + &except_mask, ptr_timeout); + #endif /* HAVE_POLL */ + } + } + /* ------------------------------------------------------------ */ /* SSL specific code */ /* ------------------------------------------------------------ */ diff -rcN base/src/backend/libpq/pqcomm.c new/src/backend/libpq/pqcomm.c *** base/src/backend/libpq/pqcomm.c 2009-07-22 14:41:58.000000000 +0900 --- new/src/backend/libpq/pqcomm.c 2009-07-22 14:43:12.000000000 +0900 *************** *** 49,60 **** --- 49,64 ---- * * low-level I/O: * pq_getbytes - get a known number of bytes from connection + * pq_getbufbytes - get a known number of bytes from buffer * pq_getstring - get a null terminated string from connection * pq_getmessage - get a message with length word from connection * pq_getbyte - get next byte from connection + * pq_getbufbyte - get next byte from buffer * pq_peekbyte - peek at next byte from connection + * pq_peekbufbyte - peek at next byte from buffer * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output + * pq_wait - wait until we can read or write the connection * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) *************** *** 729,735 **** * returns 0 if OK, EOF if trouble * -------------------------------- */ ! static int pq_recvbuf(void) { if (PqRecvPointer > 0) --- 733,739 ---- * returns 0 if OK, EOF if trouble * -------------------------------- */ ! int pq_recvbuf(void) { if (PqRecvPointer > 0) *************** *** 799,804 **** --- 803,821 ---- } /* -------------------------------- + * pq_getbufbyte - get a single byte from buffer, or return EOF + * -------------------------------- + */ + int + pq_getbufbyte(void) + { + if (PqRecvPointer >= PqRecvLength) + return EOF; /* Failed to read buffered data */ + + return (unsigned char) PqRecvBuffer[PqRecvPointer++]; + } + + /* -------------------------------- * pq_peekbyte - peek at next byte from connection * * Same as pq_getbyte() except we don't advance the pointer. *************** *** 816,821 **** --- 833,853 ---- } /* -------------------------------- + * pq_peekbufbyte - peek at next byte from buffer + * + * Same as pq_getbufbyte() except we don't advance the pointer. + * -------------------------------- + */ + int + pq_peekbufbyte(void) + { + if (PqRecvPointer >= PqRecvLength) + return EOF; /* Failed to read buffered data */ + + return (unsigned char) PqRecvBuffer[PqRecvPointer]; + } + + /* -------------------------------- * pq_getbytes - get a known number of bytes from connection * * returns 0 if OK, EOF if trouble *************** *** 845,850 **** --- 877,900 ---- } /* -------------------------------- + * pq_getbufbytes - get a known number of bytes from buffer + * + * returns 0 if OK, EOF otherwise + * -------------------------------- + */ + int + pq_getbufbytes(char *s, size_t len) + { + if (len < 0 || len > (size_t) (PqRecvLength - PqRecvPointer)) + return EOF; /* Failed to read buffered data */ + + memcpy(s, PqRecvBuffer + PqRecvPointer, len); + PqRecvPointer += len; + + return 0; + } + + /* -------------------------------- * pq_discardbytes - throw away a known number of bytes * * same as pq_getbytes except we do not copy the data to anyplace. *************** *** 1124,1129 **** --- 1174,1218 ---- return 0; } + /* -------------------------------- + * pq_wait - wait until we can read or write the connection socket. + * + * returns >0 if one or more conditions are met, 0 if it timed out or + * interrupted, -1 if an error occurred. + * + * this function is based on pqSocketCheck. + * -------------------------------- + */ + int + pq_wait(bool forRead, bool forWrite, int timeout_ms) + { + int result; + + if (!MyProcPort) + return -1; + if (MyProcPort->sock < 0) + { + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("socket not open"))); + return -1; + } + + result = secure_poll(MyProcPort, forRead, forWrite, timeout_ms); + + if (result < 0) + { + if (errno == EINTR) + return 0; /* Ok if we were interrupted */ + + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + } + + return result; + } + /* -------------------------------- * Message-level I/O routines begin here. diff -rcN base/src/include/libpq/libpq.h new/src/include/libpq/libpq.h *** base/src/include/libpq/libpq.h 2009-07-22 14:41:58.000000000 +0900 --- new/src/include/libpq/libpq.h 2009-07-22 14:42:05.000000000 +0900 *************** *** 52,64 **** --- 52,69 ---- extern void TouchSocketFile(void); extern void pq_init(void); extern void pq_comm_reset(void); + extern int pq_recvbuf(void); extern int pq_getbytes(char *s, size_t len); + extern int pq_getbufbytes(char *s, size_t len); extern int pq_getstring(StringInfo s); extern int pq_getmessage(StringInfo s, int maxlen); extern int pq_getbyte(void); + extern int pq_getbufbyte(void); extern int pq_peekbyte(void); + extern int pq_peekbufbyte(void); extern int pq_putbytes(const char *s, size_t len); extern int pq_flush(void); + extern int pq_wait(bool forRead, bool forWrite, int timeout_ms); extern int pq_putmessage(char msgtype, const char *s, size_t len); extern void pq_startcopyout(void); extern void pq_endcopyout(bool errorAbort); *************** *** 73,77 **** --- 78,83 ---- extern void secure_close(Port *port); extern ssize_t secure_read(Port *port, void *ptr, size_t len); extern ssize_t secure_write(Port *port, void *ptr, size_t len); + extern int secure_poll(Port *port, bool forRead, bool forWrite, int timeout_ms); #endif /* LIBPQ_H */