PATCH: pgbench - option to build using ppoll() for larger connection counts
Started by Rady, Dougover 8 years ago1 messages
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.
To maintain the microseconds duration of \SLEEP meta command, ppoll() was used instead of poll().
The patch has been implemented to introduce a minimal of #ifdef/#ifndef
clutter in the code.
(this is just a re-send w/o the 9.6 patch as commitfest latched onto the 9.6 patch)
doug
--
Doug Rady
Amazon Aurora PostgreSQL
radydoug@amazon.com
Attachments:
pgbench-ppoll-v1.patchapplication/octet-stream; name=pgbench-ppoll-v1.patchDownload
diff --git a/configure b/configure
index 0d76e5ea42..756371be22 100755
--- a/configure
+++ b/configure
@@ -9511,6 +9511,62 @@ if test "$ac_res" != no; then :
fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing ppoll" >&5
+$as_echo_n "checking for library containing ppoll... " >&6; }
+if ${ac_cv_search_ppoll+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ppoll ();
+int
+main ()
+{
+return ppoll ();
+ ;
+ return 0;
+}
+_ACEOF
+for ac_lib in '' c; do
+ if test -z "$ac_lib"; then
+ ac_res="none required"
+ else
+ ac_res=-l$ac_lib
+ LIBS="-l$ac_lib $ac_func_search_save_LIBS"
+ fi
+ if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_search_ppoll=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext
+ if ${ac_cv_search_ppoll+:} false; then :
+ break
+fi
+done
+if ${ac_cv_search_ppoll+:} false; then :
+
+else
+ ac_cv_search_ppoll=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_ppoll" >&5
+$as_echo "$ac_cv_search_ppoll" >&6; }
+ac_res=$ac_cv_search_ppoll
+if test "$ac_res" != no; then :
+ test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
# Solaris:
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
$as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -13024,7 +13080,7 @@ fi
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
+for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
do :
as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index bdc41b071f..b73e848732 100644
--- a/configure.in
+++ b/configure.in
@@ -1035,6 +1035,7 @@ AC_SEARCH_LIBS(crypt, crypt)
AC_SEARCH_LIBS(shm_open, rt)
AC_SEARCH_LIBS(shm_unlink, rt)
AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
# Solaris:
AC_SEARCH_LIBS(fdatasync, [rt posix4])
# Required for thread_test.c on Solaris
@@ -1430,7 +1431,7 @@ PGAC_FUNC_WCSTOMBS_L
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
AC_REPLACE_FUNCS(fseeko)
case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e37496c971..7968160069 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
@@ -56,6 +59,10 @@
#define M_PI 3.14159265358979323846
#endif
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
#include "pgbench.h"
#define ERRCODE_UNDEFINED_TABLE "42P01"
@@ -90,6 +97,27 @@ static int pthread_join(pthread_t th, void **thread_return);
#define MAXCLIENTS 1024
#endif
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { ((s)->pfdp)->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { memset((s)->pfdp, 0, sizeof(struct pollfd)); } } while(0); }
+#define PFD_SETFD(s) { do { ((s)->pfdp)->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p) struct pollfd (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
@@ -331,6 +359,7 @@ typedef struct
/* per client collected stats */
int64 cnt; /* client transaction count, for -t */
int ecnt; /* error count */
+ PFD_STRUCT_POLLFD(*pfdp)
} CState;
/*
@@ -351,6 +380,7 @@ typedef struct
instr_time conn_time;
StatsData stats;
int64 latency_late; /* executed but late transactions */
+ PFD_STRUCT_POLLFD(*pfds)
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script);
static void *threadRun(void *arg);
static void setalarm(int seconds);
+static void finishCon(CState *st);
/* callback functions for our flex lexer */
@@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
+ PFD_ZERO(st);
+ PFD_SETFD(st);
}
/*
@@ -2406,8 +2439,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
if (is_connect)
{
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
INSTR_TIME_SET_ZERO(now);
}
@@ -2444,11 +2476,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
*/
case CSTATE_ABORTED:
case CSTATE_FINISHED:
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ finishCon(st);
return;
}
}
@@ -2596,13 +2624,7 @@ disconnect_all(CState *state, int length)
int i;
for (i = 0; i < length; i++)
- {
- if (state[i].con)
- {
- PQfinish(state[i].con);
- state[i].con = NULL;
- }
- }
+ finishCon(&state[i]);
}
/* create tables and setup data */
@@ -3767,7 +3789,11 @@ main(int argc, char **argv)
case 'c':
benchmarking_option_set = true;
nclients = atoi(optarg);
+#ifdef USE_PPOLL
+ if (nclients <= 0)
+#else
if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
{
fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg);
@@ -4463,6 +4489,8 @@ threadRun(void *arg)
}
}
+ PFD_THREAD_INIT(thread, state, nstate);
+
if (!is_connect)
{
/* make connections to the database */
@@ -4470,6 +4498,7 @@ threadRun(void *arg)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
+ PFD_SETFD(&state[i]);
}
}
@@ -4486,14 +4515,17 @@ threadRun(void *arg)
/* loop till all clients have terminated */
while (remains > 0)
{
- fd_set input_mask;
int maxsock; /* max socket number to be waited for */
int64 min_usec;
int64 now_usec = 0; /* set this only if needed */
+#ifndef USE_PPOLL
+ fd_set input_mask;
/* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
maxsock = -1;
+#endif
+
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
@@ -4503,8 +4535,7 @@ threadRun(void *arg)
{
/* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4542,7 +4573,13 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ maxsock = 0;
+ (st->pfdp)->events = POLL_EVENTS;
+ (st->pfdp)->revents = 0;
+#else
FD_SET(sock, &input_mask);
+#endif
if (maxsock < sock)
maxsock = sock;
}
@@ -4588,14 +4625,26 @@ threadRun(void *arg)
if (min_usec != PG_INT64_MAX)
{
+#ifdef USE_PPOLL
+ struct timespec timeout;
+
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_nsec = min_usec % 1000000000;
+ nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
}
else
+#ifdef USE_PPOLL
+ nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
if (nsocks < 0)
{
if (errno == EINTR)
@@ -4604,15 +4653,17 @@ threadRun(void *arg)
continue;
}
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
goto done;
}
}
+#ifndef USE_PPOLL
else
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
}
+#endif
/* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
@@ -4630,8 +4681,18 @@ threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
+#ifdef USE_PPOLL
+ if ((st->pfdp)->revents & POLL_FAIL)
+ {
+ fprintf(stderr,
+ "ppoll() fail - errno: %d, i: %d, events: %x\n",
+ errno, i, ((st->pfdp)->revents & POLL_FAIL));
+ }
+ if (!(st->pfdp)->revents)
+#else
if (!FD_ISSET(sock, &input_mask))
+#endif
continue;
}
else if (st->state == CSTATE_FINISHED ||
@@ -4645,7 +4706,10 @@ threadRun(void *arg)
/* If doCustom changed client to finished state, reduce remains */
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ {
remains--;
+ PFD_ZERO(st);
+ }
}
/* progress report is made by thread 0 for all threads */
@@ -4759,9 +4823,21 @@ done:
fclose(thread->logfile);
thread->logfile = NULL;
}
+ PFD_THREAD_FREE(thread);
return NULL;
}
+static void
+finishCon(CState *st)
+{
+ if (st->con)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ PFD_ZERO(st);
+}
+
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 579d195663..2d016c8a1f 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -390,6 +390,9 @@
/* Define to 1 if you have the <poll.h> header file. */
#undef HAVE_POLL_H
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
/* Define to 1 if you have the `posix_fadvise' function. */
#undef HAVE_POSIX_FADVISE