PATCH: pgbench - option to build using ppoll() for larger connection counts
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.
The patch has been implemented to introduce a minimal of #ifdef/#ifndef
clutter in the code.
Two patches attached.
One based on REL9_6_STABLE.
One based on 'master' which can also apply to REL_10_STABLE.
doug
--
Doug Rady
Amazon Aurora PostgreSQL
radydoug@amazon.com
Attachments:
pgbench-96-ppoll.patchapplication/octet-stream; name=pgbench-96-ppoll.patchDownload
diff --git a/configure b/configure
index 2821a8f7e4..c1ba3c1477 100755
--- a/configure
+++ b/configure
@@ -9053,6 +9053,62 @@ if test "$ac_res" != no; then :
fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing ppoll" >&5
+$as_echo_n "checking for library containing ppoll... " >&6; }
+if ${ac_cv_search_ppoll+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ppoll ();
+int
+main ()
+{
+return ppoll ();
+ ;
+ return 0;
+}
+_ACEOF
+for ac_lib in '' c; do
+ if test -z "$ac_lib"; then
+ ac_res="none required"
+ else
+ ac_res=-l$ac_lib
+ LIBS="-l$ac_lib $ac_func_search_save_LIBS"
+ fi
+ if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_search_ppoll=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext
+ if ${ac_cv_search_ppoll+:} false; then :
+ break
+fi
+done
+if ${ac_cv_search_ppoll+:} false; then :
+
+else
+ ac_cv_search_ppoll=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_ppoll" >&5
+$as_echo "$ac_cv_search_ppoll" >&6; }
+ac_res=$ac_cv_search_ppoll
+if test "$ac_res" != no; then :
+ test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
# Solaris:
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
$as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -12517,7 +12573,7 @@ fi
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-for ac_func in cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
+for ac_func in cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
do :
as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index a1696db921..b0b24867a6 100644
--- a/configure.in
+++ b/configure.in
@@ -1058,6 +1058,7 @@ AC_SEARCH_LIBS(getopt_long, [getopt gnugetopt])
AC_SEARCH_LIBS(crypt, crypt)
AC_SEARCH_LIBS(shm_open, rt)
AC_SEARCH_LIBS(shm_unlink, rt)
+AC_SEARCH_LIBS(ppoll, c)
# Solaris:
AC_SEARCH_LIBS(fdatasync, [rt posix4])
# Required for thread_test.c on Solaris
@@ -1457,7 +1458,7 @@ PGAC_FUNC_WCSTOMBS_L
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-AC_CHECK_FUNCS([cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
AC_REPLACE_FUNCS(fseeko)
case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 163dcad137..9a6b95d05b 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -46,6 +46,9 @@
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
@@ -55,6 +58,10 @@
#define M_PI 3.14159265358979323846
#endif
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
#include "pgbench.h"
#define ERRCODE_UNDEFINED_TABLE "42P01"
@@ -89,6 +96,27 @@ static int pthread_join(pthread_t th, void **thread_return);
#define MAXCLIENTS 1024
#endif
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { ((s)->pfdp)->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { memset((s)->pfdp, 0, sizeof(struct pollfd)); } } while(0); }
+#define PFD_SETFD(s) { do { ((s)->pfdp)->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p) struct pollfd (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
@@ -259,6 +287,7 @@ typedef struct
/* per client collected stats */
int64 cnt; /* transaction count */
int ecnt; /* error count */
+ PFD_STRUCT_POLLFD(*pfdp)
} CState;
/*
@@ -273,6 +302,7 @@ typedef struct
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
FILE *logfile; /* where to log, or NULL */
+ PFD_STRUCT_POLLFD(*pfds)
/* per thread collected stats */
instr_time start_time; /* thread start time */
@@ -386,6 +416,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script);
static void *threadRun(void *arg);
static void setalarm(int seconds);
+static void finishCon(CState *st);
/* callback functions for our flex lexer */
@@ -1734,11 +1765,7 @@ preparedStatementName(char *buffer, int file, int state)
static bool
clientDone(CState *st)
{
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ finishCon(st);
return false; /* always false */
}
@@ -1914,10 +1941,7 @@ top:
if (commands[st->state + 1] == NULL)
{
if (is_connect)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ finishCon(st);
++st->cnt;
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
@@ -1968,6 +1992,7 @@ top:
st->sleeping = false;
st->throttling = false;
memset(st->prepared, 0, sizeof(st->prepared));
+ PFD_SETFD(st);
}
/*
@@ -2315,13 +2340,7 @@ disconnect_all(CState *state, int length)
int i;
for (i = 0; i < length; i++)
- {
- if (state[i].con)
- {
- PQfinish(state[i].con);
- state[i].con = NULL;
- }
- }
+ finishCon(&state[i]);
}
/* create tables and setup data */
@@ -3495,7 +3514,11 @@ main(int argc, char **argv)
case 'c':
benchmarking_option_set = true;
nclients = atoi(optarg);
+#ifdef USE_PPOLL
+ if (nclients <= 0 )
+#else
if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
{
fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg);
@@ -3756,6 +3779,14 @@ main(int argc, char **argv)
}
}
+ /*
+ * Don't need more threads than there are clients. (This is not merely an
+ * optimization; throttle_delay is calculated incorrectly below if some
+ * threads have no clients assigned to them.)
+ */
+ if (nthreads > nclients)
+ nthreads = nclients;
+
/* set default script if none */
if (num_scripts == 0 && !is_init_mode)
{
@@ -3779,14 +3810,6 @@ main(int argc, char **argv)
if (num_scripts > 1)
per_script_stats = true;
- /*
- * Don't need more threads than there are clients. (This is not merely an
- * optimization; throttle_delay is calculated incorrectly below if some
- * threads have no clients assigned to them.)
- */
- if (nthreads > nclients)
- nthreads = nclients;
-
/* compute a per thread delay */
throttle_delay *= nthreads;
@@ -4162,6 +4185,8 @@ threadRun(void *arg)
}
}
+ PFD_THREAD_INIT(thread, state, nstate);
+
if (!is_connect)
{
/* make connections to the database */
@@ -4169,6 +4194,7 @@ threadRun(void *arg)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
+ PFD_SETFD(&state[i]);
}
}
@@ -4192,28 +4218,32 @@ threadRun(void *arg)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].desc);
if (!doCustom(thread, st, &aggs))
+ {
remains--; /* I've aborted */
+ PFD_ZERO(st);
+ }
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
i, st->state);
remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
}
}
while (remains > 0)
{
- fd_set input_mask;
int maxsock; /* max socket number to be waited */
int64 now_usec = 0;
int64 min_usec;
+#ifndef USE_PPOLL
+ fd_set input_mask;
FD_ZERO(&input_mask);
-
maxsock = -1;
+#endif
+
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
@@ -4233,8 +4263,7 @@ threadRun(void *arg)
remains--;
st->sleeping = false;
st->throttling = false;
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
continue;
}
else /* just a nap from the script */
@@ -4267,10 +4296,16 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ maxsock = 0;
+ (st->pfdp)->events = POLL_EVENTS;
+ (st->pfdp)->revents = 0;
+#else
FD_SET(sock, &input_mask);
if (maxsock < sock)
maxsock = sock;
+#endif
}
/* also wake up to print the next progress report on time */
@@ -4302,20 +4337,32 @@ threadRun(void *arg)
if (min_usec != PG_INT64_MAX)
{
+#ifdef USE_PPOLL
+ struct timespec timeout;
+
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_nsec = min_usec % 1000000000;
+ nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
}
else
+#ifdef USE_PPOLL
+ nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
if (nsocks < 0)
{
if (errno == EINTR)
continue;
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
goto done;
}
}
@@ -4337,18 +4384,36 @@ threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
+#ifdef USE_PPOLL
+ if ((st->pfdp)->revents & POLL_FAIL)
+ {
+ fprintf(stderr,
+ "ppoll() fail - errno: %d, i: %d, events: %x\n",
+ errno, i, ((st->pfdp)->revents & POLL_FAIL));
+ }
+
+ if (((st->pfdp)->revents) ||
+#else
if (FD_ISSET(sock, &input_mask) ||
+#endif
commands[st->state]->type == META_COMMAND)
{
if (!doCustom(thread, st, &aggs))
+ {
remains--; /* I've aborted */
+ PFD_ZERO(st);
+ }
}
+ PFD_CLREV(st);
}
else if (is_connect && st->sleeping)
{
/* it is sleeping for throttling, maybe it is done, let us try */
if (!doCustom(thread, st, &aggs))
+ {
remains--;
+ PFD_ZERO(st);
+ }
}
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -4356,8 +4421,7 @@ threadRun(void *arg)
fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
i, st->state);
remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
}
}
@@ -4460,9 +4524,21 @@ done:
}
fclose(thread->logfile);
}
+ PFD_THREAD_FREE(thread);
return NULL;
}
+static void
+finishCon(CState *st)
+{
+ if (st->con)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ PFD_ZERO(st);
+}
+
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index a152371e61..83bfbfac47 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -393,6 +393,9 @@
/* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */
#undef HAVE_PPC_LWARX_MUTEX_HINT
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
/* Define to 1 if you have the `pstat' function. */
#undef HAVE_PSTAT
pgbench-ppoll.patchapplication/octet-stream; name=pgbench-ppoll.patchDownload
diff --git a/configure b/configure
index 0d76e5ea42..756371be22 100755
--- a/configure
+++ b/configure
@@ -9511,6 +9511,62 @@ if test "$ac_res" != no; then :
fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing ppoll" >&5
+$as_echo_n "checking for library containing ppoll... " >&6; }
+if ${ac_cv_search_ppoll+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ppoll ();
+int
+main ()
+{
+return ppoll ();
+ ;
+ return 0;
+}
+_ACEOF
+for ac_lib in '' c; do
+ if test -z "$ac_lib"; then
+ ac_res="none required"
+ else
+ ac_res=-l$ac_lib
+ LIBS="-l$ac_lib $ac_func_search_save_LIBS"
+ fi
+ if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_search_ppoll=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext
+ if ${ac_cv_search_ppoll+:} false; then :
+ break
+fi
+done
+if ${ac_cv_search_ppoll+:} false; then :
+
+else
+ ac_cv_search_ppoll=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_ppoll" >&5
+$as_echo "$ac_cv_search_ppoll" >&6; }
+ac_res=$ac_cv_search_ppoll
+if test "$ac_res" != no; then :
+ test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
# Solaris:
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
$as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -13024,7 +13080,7 @@ fi
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
+for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
do :
as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index bdc41b071f..b73e848732 100644
--- a/configure.in
+++ b/configure.in
@@ -1035,6 +1035,7 @@ AC_SEARCH_LIBS(crypt, crypt)
AC_SEARCH_LIBS(shm_open, rt)
AC_SEARCH_LIBS(shm_unlink, rt)
AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
# Solaris:
AC_SEARCH_LIBS(fdatasync, [rt posix4])
# Required for thread_test.c on Solaris
@@ -1430,7 +1431,7 @@ PGAC_FUNC_WCSTOMBS_L
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
AC_REPLACE_FUNCS(fseeko)
case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e37496c971..7968160069 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
@@ -56,6 +59,10 @@
#define M_PI 3.14159265358979323846
#endif
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
#include "pgbench.h"
#define ERRCODE_UNDEFINED_TABLE "42P01"
@@ -90,6 +97,27 @@ static int pthread_join(pthread_t th, void **thread_return);
#define MAXCLIENTS 1024
#endif
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { ((s)->pfdp)->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { memset((s)->pfdp, 0, sizeof(struct pollfd)); } } while(0); }
+#define PFD_SETFD(s) { do { ((s)->pfdp)->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p) struct pollfd (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
@@ -331,6 +359,7 @@ typedef struct
/* per client collected stats */
int64 cnt; /* client transaction count, for -t */
int ecnt; /* error count */
+ PFD_STRUCT_POLLFD(*pfdp)
} CState;
/*
@@ -351,6 +380,7 @@ typedef struct
instr_time conn_time;
StatsData stats;
int64 latency_late; /* executed but late transactions */
+ PFD_STRUCT_POLLFD(*pfds)
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script);
static void *threadRun(void *arg);
static void setalarm(int seconds);
+static void finishCon(CState *st);
/* callback functions for our flex lexer */
@@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
+ PFD_ZERO(st);
+ PFD_SETFD(st);
}
/*
@@ -2406,8 +2439,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
if (is_connect)
{
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
INSTR_TIME_SET_ZERO(now);
}
@@ -2444,11 +2476,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
*/
case CSTATE_ABORTED:
case CSTATE_FINISHED:
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ finishCon(st);
return;
}
}
@@ -2596,13 +2624,7 @@ disconnect_all(CState *state, int length)
int i;
for (i = 0; i < length; i++)
- {
- if (state[i].con)
- {
- PQfinish(state[i].con);
- state[i].con = NULL;
- }
- }
+ finishCon(&state[i]);
}
/* create tables and setup data */
@@ -3767,7 +3789,11 @@ main(int argc, char **argv)
case 'c':
benchmarking_option_set = true;
nclients = atoi(optarg);
+#ifdef USE_PPOLL
+ if (nclients <= 0)
+#else
if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
{
fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg);
@@ -4463,6 +4489,8 @@ threadRun(void *arg)
}
}
+ PFD_THREAD_INIT(thread, state, nstate);
+
if (!is_connect)
{
/* make connections to the database */
@@ -4470,6 +4498,7 @@ threadRun(void *arg)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
+ PFD_SETFD(&state[i]);
}
}
@@ -4486,14 +4515,17 @@ threadRun(void *arg)
/* loop till all clients have terminated */
while (remains > 0)
{
- fd_set input_mask;
int maxsock; /* max socket number to be waited for */
int64 min_usec;
int64 now_usec = 0; /* set this only if needed */
+#ifndef USE_PPOLL
+ fd_set input_mask;
/* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
maxsock = -1;
+#endif
+
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
@@ -4503,8 +4535,7 @@ threadRun(void *arg)
{
/* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4542,7 +4573,13 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ maxsock = 0;
+ (st->pfdp)->events = POLL_EVENTS;
+ (st->pfdp)->revents = 0;
+#else
FD_SET(sock, &input_mask);
+#endif
if (maxsock < sock)
maxsock = sock;
}
@@ -4588,14 +4625,26 @@ threadRun(void *arg)
if (min_usec != PG_INT64_MAX)
{
+#ifdef USE_PPOLL
+ struct timespec timeout;
+
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_nsec = min_usec % 1000000000;
+ nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
}
else
+#ifdef USE_PPOLL
+ nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
if (nsocks < 0)
{
if (errno == EINTR)
@@ -4604,15 +4653,17 @@ threadRun(void *arg)
continue;
}
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
goto done;
}
}
+#ifndef USE_PPOLL
else
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
}
+#endif
/* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
@@ -4630,8 +4681,18 @@ threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
+#ifdef USE_PPOLL
+ if ((st->pfdp)->revents & POLL_FAIL)
+ {
+ fprintf(stderr,
+ "ppoll() fail - errno: %d, i: %d, events: %x\n",
+ errno, i, ((st->pfdp)->revents & POLL_FAIL));
+ }
+ if (!(st->pfdp)->revents)
+#else
if (!FD_ISSET(sock, &input_mask))
+#endif
continue;
}
else if (st->state == CSTATE_FINISHED ||
@@ -4645,7 +4706,10 @@ threadRun(void *arg)
/* If doCustom changed client to finished state, reduce remains */
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ {
remains--;
+ PFD_ZERO(st);
+ }
}
/* progress report is made by thread 0 for all threads */
@@ -4759,9 +4823,21 @@ done:
fclose(thread->logfile);
thread->logfile = NULL;
}
+ PFD_THREAD_FREE(thread);
return NULL;
}
+static void
+finishCon(CState *st)
+{
+ if (st->con)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ PFD_ZERO(st);
+}
+
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 579d195663..2d016c8a1f 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -390,6 +390,9 @@
/* Define to 1 if you have the <poll.h> header file. */
#undef HAVE_POLL_H
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
/* Define to 1 if you have the `posix_fadvise' function. */
#undef HAVE_POSIX_FADVISE
On 2017-09-25 18:01:40 +0000, Rady, Doug wrote:
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.
Hm, is there any need of using ppoll over poll? IIRC it's a good bit
more common and there's, also iirc, a number of platforms with buggy
ppoll implementations.
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello Again,
Two patches attached.
One based on REL9_6_STABLE.
I'd be surprise that there would be a backport unless there is a bug, so
this one might not be useful.
One based on 'master' which can also apply to REL_10_STABLE.
Could you add your patches to the next CF?
--
Fabien.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 9/25/17, 11:07, "Andres Freund" <andres@anarazel.de> wrote:
On 2017-09-25 18:01:40 +0000, Rady, Doug wrote:
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.
Hm, is there any need of using ppoll over poll? IIRC it's a good bit
more common and there's, also iirc, a number of platforms with buggy
ppoll implementations.
Greetings,
Andres Freund
I used ppoll() as it can support the microseconds duration for \SLEEP meta command.
thanks!
doug
--
Doug Rady
Amazon Aurora PostgreSQL
radydoug@amazon.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.One based on 'master' which can also apply to REL_10_STABLE.
/home/fabien/pgbench-ppoll.patch:137: trailing whitespace.
#define PFD_THREAD_INIT(t,s,n) { do ...
error: patch failed: configure:13024
error: configure: patch does not apply
error: patch failed: configure.in:1430
error: configure.in: patch does not apply
error: patch failed: src/bin/pgbench/pgbench.c:4588
error: src/bin/pgbench/pgbench.c: patch does not apply
--
Fabien.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Fixed the trailing garbage … sigh.
Fixed the init of unused pollfds.
Re-based for both 10 & master.
--
Thank you!
doug
On 10/3/17, 23:21, "Fabien COELHO" <coelho@cri.ensmp.fr> wrote:
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.One based on 'master' which can also apply to REL_10_STABLE.
/home/fabien/pgbench-ppoll.patch:137: trailing whitespace.
#define PFD_THREAD_INIT(t,s,n) { do ...
error: patch failed: configure:13024
error: configure: patch does not apply
error: patch failed: configure.in:1430
error: configure.in: patch does not apply
error: patch failed: src/bin/pgbench/pgbench.c:4588
error: src/bin/pgbench/pgbench.c: patch does not apply
--
Fabien.
Attachments:
pgbench11-ppoll-v2.patchapplication/octet-stream; name=pgbench11-ppoll-v2.patchDownload
diff --git a/configure.in b/configure.in
index cea7fd0755..1a163a1be5 100644
--- a/configure.in
+++ b/configure.in
@@ -1004,6 +1004,7 @@ AC_SEARCH_LIBS(crypt, crypt)
AC_SEARCH_LIBS(shm_open, rt)
AC_SEARCH_LIBS(shm_unlink, rt)
AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
# Solaris:
AC_SEARCH_LIBS(fdatasync, [rt posix4])
# Required for thread_test.c on Solaris
@@ -1418,7 +1419,7 @@ PGAC_FUNC_WCSTOMBS_L
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
AC_REPLACE_FUNCS(fseeko)
case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 5d8a01c72c..342a55baab 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
@@ -56,6 +59,10 @@
#define M_PI 3.14159265358979323846
#endif
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
#include "pgbench.h"
#define ERRCODE_UNDEFINED_TABLE "42P01"
@@ -90,6 +97,27 @@ static int pthread_join(pthread_t th, void **thread_return);
#define MAXCLIENTS 1024
#endif
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { (s)->pfdp->fd = 0; (s)->pfdp->events = (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_SETFD(s) { do { (s)->pfdp->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p) struct pollfd (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; (s)[_i].pfdp->fd = -1; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
@@ -331,6 +359,7 @@ typedef struct
/* per client collected stats */
int64 cnt; /* client transaction count, for -t */
int ecnt; /* error count */
+ PFD_STRUCT_POLLFD(*pfdp)
} CState;
/*
@@ -351,6 +380,7 @@ typedef struct
instr_time conn_time;
StatsData stats;
int64 latency_late; /* executed but late transactions */
+ PFD_STRUCT_POLLFD(*pfds)
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script);
static void *threadRun(void *arg);
static void setalarm(int seconds);
+static void finishCon(CState *st);
/* callback functions for our flex lexer */
@@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
+ PFD_ZERO(st);
+ PFD_SETFD(st);
}
/*
@@ -2402,8 +2435,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
if (is_connect)
{
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
INSTR_TIME_SET_ZERO(now);
}
@@ -2441,10 +2473,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
case CSTATE_ABORTED:
case CSTATE_FINISHED:
if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ finishCon(st);
return;
}
}
@@ -2592,13 +2621,7 @@ disconnect_all(CState *state, int length)
int i;
for (i = 0; i < length; i++)
- {
- if (state[i].con)
- {
- PQfinish(state[i].con);
- state[i].con = NULL;
- }
- }
+ finishCon(&state[i]);
}
/* create tables and setup data */
@@ -3763,7 +3786,11 @@ main(int argc, char **argv)
case 'c':
benchmarking_option_set = true;
nclients = atoi(optarg);
+#ifdef USE_PPOLL
+ if (nclients <= 0)
+#else
if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
{
fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg);
@@ -4459,6 +4486,8 @@ threadRun(void *arg)
}
}
+ PFD_THREAD_INIT(thread, state, nstate);
+
if (!is_connect)
{
/* make connections to the database */
@@ -4466,6 +4495,7 @@ threadRun(void *arg)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
+ PFD_SETFD(&state[i]);
}
}
@@ -4482,14 +4512,16 @@ threadRun(void *arg)
/* loop till all clients have terminated */
while (remains > 0)
{
- fd_set input_mask;
int maxsock; /* max socket number to be waited for */
int64 min_usec;
int64 now_usec = 0; /* set this only if needed */
+#ifndef USE_PPOLL
+ fd_set input_mask;
/* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
maxsock = -1;
+#endif
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
@@ -4499,8 +4531,7 @@ threadRun(void *arg)
{
/* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4538,7 +4569,13 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ maxsock = 0;
+ (st->pfdp)->events = POLL_EVENTS;
+ (st->pfdp)->revents = 0;
+#else
FD_SET(sock, &input_mask);
+#endif
if (maxsock < sock)
maxsock = sock;
}
@@ -4586,11 +4623,19 @@ threadRun(void *arg)
{
if (maxsock != -1)
{
+#ifdef USE_PPOLL
+ struct timespec timeout;
+
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_nsec = min_usec % 1000000000;
+ nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
}
else /* nothing active, simple sleep */
{
@@ -4599,7 +4644,11 @@ threadRun(void *arg)
}
else /* no explicit delay, select without timeout */
{
+#ifdef USE_PPOLL
+ nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
}
if (nsocks < 0)
@@ -4610,15 +4659,17 @@ threadRun(void *arg)
continue;
}
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
goto done;
}
}
+#ifndef USE_PPOLL
else /* min_usec == 0, i.e. something needs to be executed */
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
}
+#endif
/* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
@@ -4637,7 +4688,18 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ if ((st->pfdp)->revents & POLL_FAIL)
+ {
+ fprintf(stderr,
+ "ppoll() fail - errno: %d, i: %d, events: %x\n",
+ errno, i, ((st->pfdp)->revents & POLL_FAIL));
+ }
+
+ if (!(st->pfdp)->revents)
+#else
if (!FD_ISSET(sock, &input_mask))
+#endif
continue;
}
else if (st->state == CSTATE_FINISHED ||
@@ -4651,7 +4713,10 @@ threadRun(void *arg)
/* If doCustom changed client to finished state, reduce remains */
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ {
remains--;
+ PFD_ZERO(st);
+ }
}
/* progress report is made by thread 0 for all threads */
@@ -4765,9 +4830,21 @@ done:
fclose(thread->logfile);
thread->logfile = NULL;
}
+ PFD_THREAD_FREE(thread);
return NULL;
}
+static void
+finishCon(CState *st)
+{
+ if (st->con)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ PFD_ZERO(st);
+}
+
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index cfdcc5ac62..21d9bc0300 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -394,6 +394,9 @@
/* Define to 1 if you have the <poll.h> header file. */
#undef HAVE_POLL_H
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
/* Define to 1 if you have the `posix_fadvise' function. */
#undef HAVE_POSIX_FADVISE
pgbench10-ppoll-v2.patchapplication/octet-stream; name=pgbench10-ppoll-v2.patchDownload
diff --git a/configure.in b/configure.in
index 68a1e0c184..78df83bdd5 100644
--- a/configure.in
+++ b/configure.in
@@ -1035,6 +1035,7 @@ AC_SEARCH_LIBS(crypt, crypt)
AC_SEARCH_LIBS(shm_open, rt)
AC_SEARCH_LIBS(shm_unlink, rt)
AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
# Solaris:
AC_SEARCH_LIBS(fdatasync, [rt posix4])
# Required for thread_test.c on Solaris
@@ -1430,7 +1431,7 @@ PGAC_FUNC_WCSTOMBS_L
LIBS_including_readline="$LIBS"
LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
AC_REPLACE_FUNCS(fseeko)
case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2bdfc89d2a..79bbe3ec7a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h> /* for getrlimit */
@@ -56,6 +59,10 @@
#define M_PI 3.14159265358979323846
#endif
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
#include "pgbench.h"
#define ERRCODE_UNDEFINED_TABLE "42P01"
@@ -90,6 +97,27 @@ static int pthread_join(pthread_t th, void **thread_return);
#define MAXCLIENTS 1024
#endif
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { (s)->pfdp->fd = 0; (s)->pfdp->events = (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_SETFD(s) { do { (s)->pfdp->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p) struct pollfd (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; (s)[_i].pfdp->fd = -1; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
@@ -331,6 +359,7 @@ typedef struct
/* per client collected stats */
int64 cnt; /* transaction count */
int ecnt; /* error count */
+ PFD_STRUCT_POLLFD(*pfdp)
} CState;
/*
@@ -351,6 +380,7 @@ typedef struct
instr_time conn_time;
StatsData stats;
int64 latency_late; /* executed but late transactions */
+ PFD_STRUCT_POLLFD(*pfds)
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script);
static void *threadRun(void *arg);
static void setalarm(int seconds);
+static void finishCon(CState *st);
/* callback functions for our flex lexer */
@@ -2100,6 +2131,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
+ PFD_ZERO(st);
+ PFD_SETFD(st);
}
/*
@@ -2372,8 +2405,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
if (is_connect)
{
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
INSTR_TIME_SET_ZERO(now);
}
@@ -2412,10 +2444,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
case CSTATE_ABORTED:
case CSTATE_FINISHED:
if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ finishCon(st);
return;
}
}
@@ -2553,13 +2582,7 @@ disconnect_all(CState *state, int length)
int i;
for (i = 0; i < length; i++)
- {
- if (state[i].con)
- {
- PQfinish(state[i].con);
- state[i].con = NULL;
- }
- }
+ finishCon(&state[i]);
}
/* create tables and setup data */
@@ -3730,7 +3753,11 @@ main(int argc, char **argv)
case 'c':
benchmarking_option_set = true;
nclients = atoi(optarg);
+#ifdef USE_PPOLL
+ if (nclients <= 0)
+#else
if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
{
fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg);
@@ -4420,6 +4447,8 @@ threadRun(void *arg)
}
}
+ PFD_THREAD_INIT(thread, state, nstate);
+
if (!is_connect)
{
/* make connections to the database */
@@ -4427,6 +4456,7 @@ threadRun(void *arg)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
+ PFD_SETFD(&state[i]);
}
}
@@ -4443,14 +4473,16 @@ threadRun(void *arg)
/* loop till all clients have terminated */
while (remains > 0)
{
- fd_set input_mask;
int maxsock; /* max socket number to be waited for */
int64 min_usec;
int64 now_usec = 0; /* set this only if needed */
+#ifndef USE_PPOLL
+ fd_set input_mask;
/* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
maxsock = -1;
+#endif
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
@@ -4460,8 +4492,7 @@ threadRun(void *arg)
{
/* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
- PQfinish(st->con);
- st->con = NULL;
+ finishCon(st);
remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4499,7 +4530,13 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ maxsock = 0;
+ (st->pfdp)->events = POLL_EVENTS;
+ (st->pfdp)->revents = 0;
+#else
FD_SET(sock, &input_mask);
+#endif
if (maxsock < sock)
maxsock = sock;
}
@@ -4547,11 +4584,19 @@ threadRun(void *arg)
{
if (maxsock != -1)
{
+#ifdef USE_PPOLL
+ struct timespec timeout;
+
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_nsec = min_usec % 1000000000;
+ nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
struct timeval timeout;
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
}
else /* nothing active, simple sleep */
{
@@ -4560,7 +4605,11 @@ threadRun(void *arg)
}
else /* no explicit delay, select without timeout */
{
+#ifdef USE_PPOLL
+ nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
}
if (nsocks < 0)
@@ -4571,15 +4620,17 @@ threadRun(void *arg)
continue;
}
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
goto done;
}
}
+#ifndef USE_PPOLL
else /* min_usec == 0, i.e. something needs to be executed */
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
}
+#endif
/* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
@@ -4598,7 +4649,18 @@ threadRun(void *arg)
goto done;
}
+#ifdef USE_PPOLL
+ if ((st->pfdp)->revents & POLL_FAIL)
+ {
+ fprintf(stderr,
+ "ppoll() fail - errno: %d, i: %d, events: %x\n",
+ errno, i, ((st->pfdp)->revents & POLL_FAIL));
+ }
+
+ if (!(st->pfdp)->revents)
+#else
if (!FD_ISSET(sock, &input_mask))
+#endif
continue;
}
else if (st->state == CSTATE_FINISHED ||
@@ -4612,7 +4674,10 @@ threadRun(void *arg)
/* If doCustom changed client to finished state, reduce remains */
if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ {
remains--;
+ PFD_ZERO(st);
+ }
}
/* progress report is made by thread 0 for all threads */
@@ -4726,9 +4791,21 @@ done:
fclose(thread->logfile);
thread->logfile = NULL;
}
+ PFD_THREAD_FREE(thread);
return NULL;
}
+static void
+finishCon(CState *st)
+{
+ if (st->con)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ PFD_ZERO(st);
+}
+
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index c65dd7db21..63c0e74ff7 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -390,6 +390,9 @@
/* Define to 1 if you have the <poll.h> header file. */
#undef HAVE_POLL_H
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
/* Define to 1 if you have the `posix_fadvise' function. */
#undef HAVE_POSIX_FADVISE
On Mon, Sep 25, 2017 at 8:01 PM, Rady, Doug <radydoug@amazon.com> wrote:
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.
So what's an example of something that fails without this patch but
works with the patch?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Without this patch, one is limited to '(FD_SETSIZE - 10)’ number of connections.
Example of something that fails without this patch but works with the patch:
Without the patch:
$ pgbench -j 3000 -c 1500
invalid number of clients: "1500"
With the patch:
$ pgbench -j 3000 -c 1500
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 2000
query mode: simple
number of clients: 1500
number of threads: 1500
number of transactions per client: 10
number of transactions actually processed: 15000/15000
latency average = 631.730 ms
tps = 2374.430587 (including connections establishing)
tps = 4206.524986 (excluding connections establishing)
--
doug
On 10/26/17, 04:46, "Robert Haas" <robertmhaas@gmail.com> wrote:
On Mon, Sep 25, 2017 at 8:01 PM, Rady, Doug <radydoug@amazon.com> wrote:
This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.
So what's an example of something that fails without this patch but
works with the patch?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello,
Could you rebase the v11 patch?
--
Fabien.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Nov 3, 2017 at 2:29 PM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:
Could you rebase the v11 patch?
This patch has been waiting for a rebase for more than three weeks as
of today, I am marking it as returned with feedback. It would be a
good idea to reply to Robert's input in
/messages/by-id/CA+TgmoYybNv-DdhVPMxLB2nx2SqeNJirtWHmdEAZUCGoTB2VBg@mail.gmail.com.
--
Michael
Hello Michaᅵl,
Could you rebase the v11 patch?
This patch has been waiting for a rebase for more than three weeks as
of today, I am marking it as returned with feedback. It would be a
good idea to reply to Robert's input in
/messages/by-id/CA+TgmoYybNv-DdhVPMxLB2nx2SqeNJirtWHmdEAZUCGoTB2VBg@mail.gmail.com.
ISTM that this was done: If -c is high enough, pgbench fails without the
patch.
--
Fabien.