diff --git a/configure.in b/configure.in index 4d26034579..7b2b531675 100644 --- a/configure.in +++ b/configure.in @@ -1425,7 +1425,7 @@ PGAC_FUNC_WCSTOMBS_L LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` -AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) +AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) AC_REPLACE_FUNCS(fseeko) case $host_os in diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 31ea6ca06e..05773d8bb1 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -47,6 +47,9 @@ #ifdef HAVE_SYS_SELECT_H #include #endif +#ifdef HAVE_POLL_H +#include +#endif #ifdef HAVE_SYS_RESOURCE_H #include /* for getrlimit */ @@ -83,12 +86,28 @@ static int pthread_join(pthread_t th, void **thread_return); /******************************************************************** * some configurable parameters */ -/* max number of clients allowed */ +#if defined(HAVE_PPOLL) +#define USE_PPOLL 1 +#endif + +#if defined(USE_PPOLL) +/* using ppoll(2) */ +#define SOCKET_WAIT_METHOD "ppoll" +typedef struct pollfd socket_array; +/* unlimited number of clients */ +#define MAXCLIENTS -1 +#else +/* using select(2) */ +#define SOCKET_WAIT_METHOD "select" +typedef fd_set socket_array; #ifdef FD_SETSIZE +/* system limited max number of clients allowed */ #define MAXCLIENTS (FD_SETSIZE - 10) #else +/* max number of clients allowed */ #define MAXCLIENTS 1024 #endif +#endif #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ @@ -502,6 +521,15 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2); static void addScript(ParsedScript script); static void *threadRun(void *arg); static void setalarm(int seconds); +static void finishCon(CState *st); +static socket_array *alloc_socket_array(int count); +static void clear_socket(socket_array *sa, int fd, int idx); +static int error_on_socket(socket_array *sa, int idx, PGconn *con); +static void free_socket_array(socket_array *sa); +static int ignore_socket(socket_array *sa, int idx, PGconn *con); +static void init_socket_array(socket_array *sa, int count); +static void set_socket(socket_array *sa, int fd, int idx); +static int wait_on_socket_array(socket_array *sa, int nstate, int maxsock, int64 usec); /* callback functions for our flex lexer */ @@ -2704,6 +2732,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT(now); start = now; + if ((st->con = doConnect()) == NULL) { fprintf(stderr, "client %d aborted while establishing connection\n", @@ -2981,8 +3010,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) if (is_connect) { - PQfinish(st->con); - st->con = NULL; + finishCon(st); INSTR_TIME_SET_ZERO(now); } @@ -3019,11 +3047,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) */ case CSTATE_ABORTED: case CSTATE_FINISHED: - if (st->con != NULL) - { - PQfinish(st->con); - st->con = NULL; - } + finishCon(st); return; } } @@ -3171,13 +3195,7 @@ disconnect_all(CState *state, int length) int i; for (i = 0; i < length; i++) - { - if (state[i].con) - { - PQfinish(state[i].con); - state[i].con = NULL; - } - } + finishCon(&state[i]); } /* @@ -4507,7 +4525,7 @@ main(int argc, char **argv) case 'c': benchmarking_option_set = true; nclients = atoi(optarg); - if (nclients <= 0 || nclients > MAXCLIENTS) + if (nclients <= 0 || (MAXCLIENTS != -1 && nclients > MAXCLIENTS)) { fprintf(stderr, "invalid number of clients: \"%s\"\n", optarg); @@ -5189,6 +5207,7 @@ threadRun(void *arg) int64 next_report = last_report + (int64) progress * 1000000; StatsData last, aggs; + socket_array *sockets = alloc_socket_array(nstate); /* * Initialize throttling rate target for all of the thread's clients. It @@ -5232,6 +5251,7 @@ threadRun(void *arg) { if ((state[i].con = doConnect()) == NULL) goto done; + set_socket(sockets, PQsocket(state[i].con), i); } } @@ -5248,13 +5268,12 @@ threadRun(void *arg) /* loop till all clients have terminated */ while (remains > 0) { - fd_set input_mask; int maxsock; /* max socket number to be waited for */ int64 min_usec; int64 now_usec = 0; /* set this only if needed */ - /* identify which client sockets should be checked for input */ - FD_ZERO(&input_mask); + init_socket_array(sockets, nstate); + maxsock = -1; min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) @@ -5265,8 +5284,7 @@ threadRun(void *arg) { /* interrupt client that has not started a transaction */ st->state = CSTATE_FINISHED; - PQfinish(st->con); - st->con = NULL; + finishCon(st); remains--; } else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) @@ -5304,7 +5322,7 @@ threadRun(void *arg) goto done; } - FD_SET(sock, &input_mask); + set_socket(sockets, sock, i); if (maxsock < sock) maxsock = sock; } @@ -5341,7 +5359,7 @@ threadRun(void *arg) /* * If no clients are ready to execute actions, sleep until we receive * data from the server, or a nap-time specified in the script ends, - * or it's time to print a progress report. Update input_mask to show + * or it's time to print a progress report. Update sockets to show * which client(s) received data. */ if (min_usec > 0) @@ -5352,11 +5370,7 @@ threadRun(void *arg) { if (maxsock != -1) { - struct timeval timeout; - - timeout.tv_sec = min_usec / 1000000; - timeout.tv_usec = min_usec % 1000000; - nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); + nsocks = wait_on_socket_array(sockets, nstate, maxsock, min_usec); } else /* nothing active, simple sleep */ { @@ -5365,7 +5379,7 @@ threadRun(void *arg) } else /* no explicit delay, select without timeout */ { - nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); + nsocks = wait_on_socket_array(sockets, nstate, maxsock, 0); } if (nsocks < 0) @@ -5376,7 +5390,7 @@ threadRun(void *arg) continue; } /* must be something wrong */ - fprintf(stderr, "select() failed: %s\n", strerror(errno)); + fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno)); goto done; } } @@ -5385,7 +5399,7 @@ threadRun(void *arg) /* min_usec == 0, i.e. something needs to be executed */ /* If we didn't call select(), don't try to read any data */ - FD_ZERO(&input_mask); + init_socket_array(sockets, nstate); } /* ok, advance the state machine of each connection */ @@ -5396,16 +5410,11 @@ threadRun(void *arg) if (st->state == CSTATE_WAIT_RESULT) { /* don't call doCustom unless data is available */ - int sock = PQsocket(st->con); - if (sock < 0) - { - fprintf(stderr, "invalid socket: %s", - PQerrorMessage(st->con)); + if (error_on_socket(sockets, i, st->con)) goto done; - } - if (!FD_ISSET(sock, &input_mask)) + if (ignore_socket(sockets, i, st->con)) continue; } else if (st->state == CSTATE_FINISHED || @@ -5543,9 +5552,137 @@ done: fclose(thread->logfile); thread->logfile = NULL; } + free_socket_array(sockets); + sockets = NULL; return NULL; } +static void +finishCon(CState *st) +{ + if (st->con) + { + PQfinish(st->con); + st->con = NULL; + } +} + +static socket_array * +alloc_socket_array(int count) +{ + return (socket_array *) pg_malloc0(sizeof(socket_array) * ((MAXCLIENTS > 0) ? 1 : count)); +} + +static void +free_socket_array(socket_array *sa) +{ + pg_free(sa); +} + +#if defined(USE_PPOLL) +#ifdef POLLRDHUP +#define POLL_EVENTS (POLLRDHUP|POLLIN|POLLPRI) +#define POLL_ISSUES (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL|POLLRDHUP) +#else +#define POLL_EVENTS (POLLIN|POLLPRI) +#define POLL_ISSUES (POLLERR|POLLHUP|POLLNVAL) +#endif + +static void +clear_socket(socket_array *sa, int fd, int idx) +{ + set_socket(sa, -1, idx); +} + +static int +error_on_socket(socket_array *sa, int idx, PGconn *con) +{ + if (sa[idx].fd == -1 || (PQsocket(con) >= 0 && !(sa[idx].revents & POLL_ISSUES))) + return 0; + fprintf(stderr, "ppoll() fail - errno: %d, idx: %d, events: %x, %s\n", + errno, idx, (sa[idx].revents & POLL_ISSUES), + PQerrorMessage(con)); + return 1; +} + +static int +ignore_socket(socket_array *sa, int idx, PGconn *con) +{ + return (sa[idx].fd != -1 && !sa[idx].revents) ? 1 : 0; +} + +static void +init_socket_array(socket_array *sa, int count) +{ + int i = 0; + for (i = 0; i < count; i++) + clear_socket(sa, -1, i); +} + +static void +set_socket(socket_array *sa, int fd, int idx) +{ + sa[idx].fd = fd; + sa[idx].events = POLL_EVENTS; + sa[idx].revents = 0; +} + +static int +wait_on_socket_array(socket_array *sa, int nstate, int maxsock, int64 usec) +{ + struct timespec timeout; + + if (usec) + { + timeout.tv_sec = usec / 1000000; + timeout.tv_nsec = usec % 1000000000; + } + return ppoll(sa, nstate, usec ? &timeout : NULL, NULL); +} +#else +static void +clear_socket(socket_array *sa, int fd, int idx) +{ + FD_CLR(fd, sa); +} + +static int +error_on_socket(socket_array *sa, int idx, PGconn *con) +{ + if (PQsocket(con) >= 0) return 0; + fprintf(stderr, "invalid socket: %s", PQerrorMessage(con)); + return 0; +} + +static int +ignore_socket(socket_array *sa, int idx, PGconn *con) +{ + return (!FD_ISSET(PQsocket(con), sa)) ? 1 : 0; +} + +static void +init_socket_array(socket_array *sa, int count) +{ + FD_ZERO(sa); +} + +static void +set_socket(socket_array *sa, int fd, int idx) +{ + FD_SET(fd, sa); +} + +static int +wait_on_socket_array(socket_array *sa, int nstate, int maxsock, int64 usec) +{ + struct timeval timeout; + + timeout.tv_sec = usec / 1000000; + timeout.tv_usec = usec % 1000000; + return select(maxsock + 1, sa, NULL, NULL, usec ? &timeout : NULL); +} +#endif + /* * Support for duration option: set timer_exceeded after so many seconds. */ diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index f98f773ff0..63207e13fd 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -409,6 +409,9 @@ /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */ #undef HAVE_PPC_LWARX_MUTEX_HINT +/* Define to 1 if you have the `ppoll' function. */ +#undef HAVE_PPOLL + /* Define to 1 if you have the `pstat' function. */ #undef HAVE_PSTAT