PATCH: pgbench - option to build using ppoll() for larger connection counts

Started by Rady, Dougover 8 years ago11 messages
#1Rady, Doug
radydoug@amazon.com
2 attachment(s)

This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.

The patch has been implemented to introduce a minimal of #ifdef/#ifndef
clutter in the code.

Two patches attached.
One based on REL9_6_STABLE.
One based on 'master' which can also apply to REL_10_STABLE.

doug
--
Doug Rady
Amazon Aurora PostgreSQL
radydoug@amazon.com

Attachments:

pgbench-96-ppoll.patchapplication/octet-stream; name=pgbench-96-ppoll.patchDownload
diff --git a/configure b/configure
index 2821a8f7e4..c1ba3c1477 100755
--- a/configure
+++ b/configure
@@ -9053,6 +9053,62 @@ if test "$ac_res" != no; then :
 
 fi
 
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing ppoll" >&5
+$as_echo_n "checking for library containing ppoll... " >&6; }
+if ${ac_cv_search_ppoll+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ppoll ();
+int
+main ()
+{
+return ppoll ();
+  ;
+  return 0;
+}
+_ACEOF
+for ac_lib in '' c; do
+  if test -z "$ac_lib"; then
+    ac_res="none required"
+  else
+    ac_res=-l$ac_lib
+    LIBS="-l$ac_lib  $ac_func_search_save_LIBS"
+  fi
+  if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_search_ppoll=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext
+  if ${ac_cv_search_ppoll+:} false; then :
+  break
+fi
+done
+if ${ac_cv_search_ppoll+:} false; then :
+
+else
+  ac_cv_search_ppoll=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_ppoll" >&5
+$as_echo "$ac_cv_search_ppoll" >&6; }
+ac_res=$ac_cv_search_ppoll
+if test "$ac_res" != no; then :
+  test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
 # Solaris:
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
 $as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -12517,7 +12573,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-for ac_func in cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
+for ac_func in cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index a1696db921..b0b24867a6 100644
--- a/configure.in
+++ b/configure.in
@@ -1058,6 +1058,7 @@ AC_SEARCH_LIBS(getopt_long, [getopt gnugetopt])
 AC_SEARCH_LIBS(crypt, crypt)
 AC_SEARCH_LIBS(shm_open, rt)
 AC_SEARCH_LIBS(shm_unlink, rt)
+AC_SEARCH_LIBS(ppoll, c)
 # Solaris:
 AC_SEARCH_LIBS(fdatasync, [rt posix4])
 # Required for thread_test.c on Solaris
@@ -1457,7 +1458,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 163dcad137..9a6b95d05b 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -46,6 +46,9 @@
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>		/* for getrlimit */
@@ -55,6 +58,10 @@
 #define M_PI 3.14159265358979323846
 #endif
 
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
 #include "pgbench.h"
 
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
@@ -89,6 +96,27 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #define MAXCLIENTS	1024
 #endif
 
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { ((s)->pfdp)->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { memset((s)->pfdp, 0, sizeof(struct pollfd)); } } while(0); }
+#define PFD_SETFD(s) { do { ((s)->pfdp)->fd = PQsocket((s)->con); } while(0); }
+#define	PFD_STRUCT_POLLFD(p)	struct pollfd   (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; } } while (0); } 
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
 #define DEFAULT_NXACTS	10		/* default nxacts */
 
@@ -259,6 +287,7 @@ typedef struct
 	/* per client collected stats */
 	int64		cnt;			/* transaction count */
 	int			ecnt;			/* error count */
+	PFD_STRUCT_POLLFD(*pfdp)
 } CState;
 
 /*
@@ -273,6 +302,7 @@ typedef struct
 	unsigned short random_state[3];		/* separate randomness for each thread */
 	int64		throttle_trigger;		/* previous/next throttling (us) */
 	FILE	   *logfile;		/* where to log, or NULL */
+	PFD_STRUCT_POLLFD(*pfds)
 
 	/* per thread collected stats */
 	instr_time	start_time;		/* thread start time */
@@ -386,6 +416,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
 static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
 static void setalarm(int seconds);
+static void finishCon(CState *st);
 
 
 /* callback functions for our flex lexer */
@@ -1734,11 +1765,7 @@ preparedStatementName(char *buffer, int file, int state)
 static bool
 clientDone(CState *st)
 {
-	if (st->con != NULL)
-	{
-		PQfinish(st->con);
-		st->con = NULL;
-	}
+	finishCon(st);
 	return false;				/* always false */
 }
 
@@ -1914,10 +1941,7 @@ top:
 		if (commands[st->state + 1] == NULL)
 		{
 			if (is_connect)
-			{
-				PQfinish(st->con);
-				st->con = NULL;
-			}
+				finishCon(st);
 
 			++st->cnt;
 			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
@@ -1968,6 +1992,7 @@ top:
 		st->sleeping = false;
 		st->throttling = false;
 		memset(st->prepared, 0, sizeof(st->prepared));
+		PFD_SETFD(st);
 	}
 
 	/*
@@ -2315,13 +2340,7 @@ disconnect_all(CState *state, int length)
 	int			i;
 
 	for (i = 0; i < length; i++)
-	{
-		if (state[i].con)
-		{
-			PQfinish(state[i].con);
-			state[i].con = NULL;
-		}
-	}
+		finishCon(&state[i]);
 }
 
 /* create tables and setup data */
@@ -3495,7 +3514,11 @@ main(int argc, char **argv)
 			case 'c':
 				benchmarking_option_set = true;
 				nclients = atoi(optarg);
+#ifdef USE_PPOLL
+				if (nclients <= 0 )
+#else
 				if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
 				{
 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
 							optarg);
@@ -3756,6 +3779,14 @@ main(int argc, char **argv)
 		}
 	}
 
+	/*
+	 * Don't need more threads than there are clients.  (This is not merely an
+	 * optimization; throttle_delay is calculated incorrectly below if some
+	 * threads have no clients assigned to them.)
+	 */
+	if (nthreads > nclients)
+		nthreads = nclients;
+
 	/* set default script if none */
 	if (num_scripts == 0 && !is_init_mode)
 	{
@@ -3779,14 +3810,6 @@ main(int argc, char **argv)
 	if (num_scripts > 1)
 		per_script_stats = true;
 
-	/*
-	 * Don't need more threads than there are clients.  (This is not merely an
-	 * optimization; throttle_delay is calculated incorrectly below if some
-	 * threads have no clients assigned to them.)
-	 */
-	if (nthreads > nclients)
-		nthreads = nclients;
-
 	/* compute a per thread delay */
 	throttle_delay *= nthreads;
 
@@ -4162,6 +4185,8 @@ threadRun(void *arg)
 		}
 	}
 
+	PFD_THREAD_INIT(thread, state, nstate);
+
 	if (!is_connect)
 	{
 		/* make connections to the database */
@@ -4169,6 +4194,7 @@ threadRun(void *arg)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
+			PFD_SETFD(&state[i]);
 		}
 	}
 
@@ -4192,28 +4218,32 @@ threadRun(void *arg)
 			fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
 					sql_script[st->use_file].desc);
 		if (!doCustom(thread, st, &aggs))
+		{
 			remains--;			/* I've aborted */
+			PFD_ZERO(st);
+		}
 
 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
 		{
 			fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
 					i, st->state);
 			remains--;			/* I've aborted */
-			PQfinish(st->con);
-			st->con = NULL;
+			finishCon(st);
 		}
 	}
 
 	while (remains > 0)
 	{
-		fd_set		input_mask;
 		int			maxsock;	/* max socket number to be waited */
 		int64		now_usec = 0;
 		int64		min_usec;
+#ifndef USE_PPOLL
+		fd_set		input_mask;
 
 		FD_ZERO(&input_mask);
-
 		maxsock = -1;
+#endif
+
 		min_usec = PG_INT64_MAX;
 		for (i = 0; i < nstate; i++)
 		{
@@ -4233,8 +4263,7 @@ threadRun(void *arg)
 					remains--;
 					st->sleeping = false;
 					st->throttling = false;
-					PQfinish(st->con);
-					st->con = NULL;
+					finishCon(st);
 					continue;
 				}
 				else	/* just a nap from the script */
@@ -4267,10 +4296,16 @@ threadRun(void *arg)
 				goto done;
 			}
 
+#ifdef USE_PPOLL
+			maxsock = 0;
+			(st->pfdp)->events = POLL_EVENTS;
+			(st->pfdp)->revents = 0;
+#else
 			FD_SET(sock, &input_mask);
 
 			if (maxsock < sock)
 				maxsock = sock;
+#endif
 		}
 
 		/* also wake up to print the next progress report on time */
@@ -4302,20 +4337,32 @@ threadRun(void *arg)
 
 			if (min_usec != PG_INT64_MAX)
 			{
+#ifdef USE_PPOLL
+				struct timespec timeout;
+
+				timeout.tv_sec = min_usec / 1000000;
+				timeout.tv_nsec = min_usec % 1000000000;
+				nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
 				struct timeval timeout;
 
 				timeout.tv_sec = min_usec / 1000000;
 				timeout.tv_usec = min_usec % 1000000;
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
 			}
 			else
+#ifdef USE_PPOLL
+				nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
 			if (nsocks < 0)
 			{
 				if (errno == EINTR)
 					continue;
 				/* must be something wrong */
-				fprintf(stderr, "select() failed: %s\n", strerror(errno));
+				fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
 				goto done;
 			}
 		}
@@ -4337,18 +4384,36 @@ threadRun(void *arg)
 							PQerrorMessage(st->con));
 					goto done;
 				}
+#ifdef USE_PPOLL
+				if ((st->pfdp)->revents & POLL_FAIL)
+				{
+					fprintf(stderr,
+						"ppoll() fail - errno: %d, i: %d, events: %x\n",
+						errno, i, ((st->pfdp)->revents & POLL_FAIL));
+				}
+
+				if (((st->pfdp)->revents) ||
+#else
 				if (FD_ISSET(sock, &input_mask) ||
+#endif
 					commands[st->state]->type == META_COMMAND)
 				{
 					if (!doCustom(thread, st, &aggs))
+					{
 						remains--;		/* I've aborted */
+						PFD_ZERO(st);
+					}
 				}
+				PFD_CLREV(st);
 			}
 			else if (is_connect && st->sleeping)
 			{
 				/* it is sleeping for throttling, maybe it is done, let us try */
 				if (!doCustom(thread, st, &aggs))
+				{
 					remains--;
+					PFD_ZERO(st);
+				}
 			}
 
 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -4356,8 +4421,7 @@ threadRun(void *arg)
 				fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
 						i, st->state);
 				remains--;		/* I've aborted */
-				PQfinish(st->con);
-				st->con = NULL;
+				finishCon(st);
 			}
 		}
 
@@ -4460,9 +4524,21 @@ done:
 		}
 		fclose(thread->logfile);
 	}
+	PFD_THREAD_FREE(thread);
 	return NULL;
 }
 
+static void
+finishCon(CState *st)
+{
+	if (st->con)
+	{
+		PQfinish(st->con);
+		st->con = NULL;
+	}
+	PFD_ZERO(st);
+}
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index a152371e61..83bfbfac47 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -393,6 +393,9 @@
 /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */
 #undef HAVE_PPC_LWARX_MUTEX_HINT
 
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `pstat' function. */
 #undef HAVE_PSTAT
 
pgbench-ppoll.patchapplication/octet-stream; name=pgbench-ppoll.patchDownload
diff --git a/configure b/configure
index 0d76e5ea42..756371be22 100755
--- a/configure
+++ b/configure
@@ -9511,6 +9511,62 @@ if test "$ac_res" != no; then :
 
 fi
 
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing ppoll" >&5
+$as_echo_n "checking for library containing ppoll... " >&6; }
+if ${ac_cv_search_ppoll+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ppoll ();
+int
+main ()
+{
+return ppoll ();
+  ;
+  return 0;
+}
+_ACEOF
+for ac_lib in '' c; do
+  if test -z "$ac_lib"; then
+    ac_res="none required"
+  else
+    ac_res=-l$ac_lib
+    LIBS="-l$ac_lib  $ac_func_search_save_LIBS"
+  fi
+  if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_search_ppoll=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext
+  if ${ac_cv_search_ppoll+:} false; then :
+  break
+fi
+done
+if ${ac_cv_search_ppoll+:} false; then :
+
+else
+  ac_cv_search_ppoll=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_ppoll" >&5
+$as_echo "$ac_cv_search_ppoll" >&6; }
+ac_res=$ac_cv_search_ppoll
+if test "$ac_res" != no; then :
+  test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
 # Solaris:
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
 $as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -13024,7 +13080,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
+for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index bdc41b071f..b73e848732 100644
--- a/configure.in
+++ b/configure.in
@@ -1035,6 +1035,7 @@ AC_SEARCH_LIBS(crypt, crypt)
 AC_SEARCH_LIBS(shm_open, rt)
 AC_SEARCH_LIBS(shm_unlink, rt)
 AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
 # Solaris:
 AC_SEARCH_LIBS(fdatasync, [rt posix4])
 # Required for thread_test.c on Solaris
@@ -1430,7 +1431,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e37496c971..7968160069 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>		/* for getrlimit */
@@ -56,6 +59,10 @@
 #define M_PI 3.14159265358979323846
 #endif
 
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
 #include "pgbench.h"
 
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
@@ -90,6 +97,27 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #define MAXCLIENTS	1024
 #endif
 
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { ((s)->pfdp)->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { memset((s)->pfdp, 0, sizeof(struct pollfd)); } } while(0); }
+#define PFD_SETFD(s) { do { ((s)->pfdp)->fd = PQsocket((s)->con); } while(0); }
+#define	PFD_STRUCT_POLLFD(p)	struct pollfd   (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; } } while (0); } 
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
 #define DEFAULT_NXACTS	10		/* default nxacts */
 
@@ -331,6 +359,7 @@ typedef struct
 	/* per client collected stats */
 	int64		cnt;			/* client transaction count, for -t */
 	int			ecnt;			/* error count */
+	PFD_STRUCT_POLLFD(*pfdp)
 } CState;
 
 /*
@@ -351,6 +380,7 @@ typedef struct
 	instr_time	conn_time;
 	StatsData	stats;
 	int64		latency_late;	/* executed but late transactions */
+	PFD_STRUCT_POLLFD(*pfds)
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
 static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
 static void setalarm(int seconds);
+static void finishCon(CState *st);
 
 
 /* callback functions for our flex lexer */
@@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 					/* Reset session-local state */
 					memset(st->prepared, 0, sizeof(st->prepared));
+					PFD_ZERO(st);
+					PFD_SETFD(st);
 				}
 
 				/*
@@ -2406,8 +2439,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 				if (is_connect)
 				{
-					PQfinish(st->con);
-					st->con = NULL;
+					finishCon(st);
 					INSTR_TIME_SET_ZERO(now);
 				}
 
@@ -2444,11 +2476,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 				 */
 			case CSTATE_ABORTED:
 			case CSTATE_FINISHED:
-				if (st->con != NULL)
-				{
-					PQfinish(st->con);
-					st->con = NULL;
-				}
+				finishCon(st);
 				return;
 		}
 	}
@@ -2596,13 +2624,7 @@ disconnect_all(CState *state, int length)
 	int			i;
 
 	for (i = 0; i < length; i++)
-	{
-		if (state[i].con)
-		{
-			PQfinish(state[i].con);
-			state[i].con = NULL;
-		}
-	}
+		finishCon(&state[i]);
 }
 
 /* create tables and setup data */
@@ -3767,7 +3789,11 @@ main(int argc, char **argv)
 			case 'c':
 				benchmarking_option_set = true;
 				nclients = atoi(optarg);
+#ifdef USE_PPOLL
+				if (nclients <= 0)
+#else
 				if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
 				{
 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
 							optarg);
@@ -4463,6 +4489,8 @@ threadRun(void *arg)
 		}
 	}
 
+	PFD_THREAD_INIT(thread, state, nstate);
+
 	if (!is_connect)
 	{
 		/* make connections to the database */
@@ -4470,6 +4498,7 @@ threadRun(void *arg)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
+			PFD_SETFD(&state[i]);
 		}
 	}
 
@@ -4486,14 +4515,17 @@ threadRun(void *arg)
 	/* loop till all clients have terminated */
 	while (remains > 0)
 	{
-		fd_set		input_mask;
 		int			maxsock;	/* max socket number to be waited for */
 		int64		min_usec;
 		int64		now_usec = 0;	/* set this only if needed */
+#ifndef USE_PPOLL
+		fd_set		input_mask;
 
 		/* identify which client sockets should be checked for input */
 		FD_ZERO(&input_mask);
 		maxsock = -1;
+#endif
+
 		min_usec = PG_INT64_MAX;
 		for (i = 0; i < nstate; i++)
 		{
@@ -4503,8 +4535,7 @@ threadRun(void *arg)
 			{
 				/* interrupt client that has not started a transaction */
 				st->state = CSTATE_FINISHED;
-				PQfinish(st->con);
-				st->con = NULL;
+				finishCon(st);
 				remains--;
 			}
 			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4542,7 +4573,13 @@ threadRun(void *arg)
 					goto done;
 				}
 
+#ifdef USE_PPOLL
+				maxsock = 0;
+				(st->pfdp)->events = POLL_EVENTS;
+				(st->pfdp)->revents = 0;
+#else
 				FD_SET(sock, &input_mask);
+#endif
 				if (maxsock < sock)
 					maxsock = sock;
 			}
@@ -4588,14 +4625,26 @@ threadRun(void *arg)
 
 			if (min_usec != PG_INT64_MAX)
 			{
+#ifdef USE_PPOLL
+				struct timespec timeout;
+
+				timeout.tv_sec = min_usec / 1000000;
+				timeout.tv_nsec = min_usec % 1000000000;
+				nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
 				struct timeval timeout;
 
 				timeout.tv_sec = min_usec / 1000000;
 				timeout.tv_usec = min_usec % 1000000;
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
 			}
 			else
+#ifdef USE_PPOLL
+				nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
 			if (nsocks < 0)
 			{
 				if (errno == EINTR)
@@ -4604,15 +4653,17 @@ threadRun(void *arg)
 					continue;
 				}
 				/* must be something wrong */
-				fprintf(stderr, "select() failed: %s\n", strerror(errno));
+				fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
 				goto done;
 			}
 		}
+#ifndef USE_PPOLL
 		else
 		{
 			/* If we didn't call select(), don't try to read any data */
 			FD_ZERO(&input_mask);
 		}
+#endif
 
 		/* ok, advance the state machine of each connection */
 		for (i = 0; i < nstate; i++)
@@ -4630,8 +4681,18 @@ threadRun(void *arg)
 							PQerrorMessage(st->con));
 					goto done;
 				}
+#ifdef USE_PPOLL
+				if ((st->pfdp)->revents & POLL_FAIL)
+				{
+					fprintf(stderr,
+						"ppoll() fail - errno: %d, i: %d, events: %x\n",
+						errno, i, ((st->pfdp)->revents & POLL_FAIL));
+				}
 
+				if (!(st->pfdp)->revents)
+#else
 				if (!FD_ISSET(sock, &input_mask))
+#endif
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
@@ -4645,7 +4706,10 @@ threadRun(void *arg)
 
 			/* If doCustom changed client to finished state, reduce remains */
 			if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+			{
 				remains--;
+				PFD_ZERO(st);
+			}
 		}
 
 		/* progress report is made by thread 0 for all threads */
@@ -4759,9 +4823,21 @@ done:
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
+	PFD_THREAD_FREE(thread);
 	return NULL;
 }
 
+static void
+finishCon(CState *st)
+{
+	if (st->con)
+	{
+		PQfinish(st->con);
+		st->con = NULL;
+	}
+	PFD_ZERO(st);
+}
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 579d195663..2d016c8a1f 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -390,6 +390,9 @@
 /* Define to 1 if you have the <poll.h> header file. */
 #undef HAVE_POLL_H
 
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `posix_fadvise' function. */
 #undef HAVE_POSIX_FADVISE
 
#2Andres Freund
andres@anarazel.de
In reply to: Rady, Doug (#1)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

On 2017-09-25 18:01:40 +0000, Rady, Doug wrote:

This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.

Hm, is there any need of using ppoll over poll? IIRC it's a good bit
more common and there's, also iirc, a number of platforms with buggy
ppoll implementations.

Greetings,

Andres Freund

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Fabien COELHO
coelho@cri.ensmp.fr
In reply to: Rady, Doug (#1)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

Hello Again,

Two patches attached.
One based on REL9_6_STABLE.

I'd be surprise that there would be a backport unless there is a bug, so
this one might not be useful.

One based on 'master' which can also apply to REL_10_STABLE.

Could you add your patches to the next CF?

--
Fabien.

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Rady, Doug
radydoug@amazon.com
In reply to: Andres Freund (#2)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

On 9/25/17, 11:07, "Andres Freund" <andres@anarazel.de> wrote:

On 2017-09-25 18:01:40 +0000, Rady, Doug wrote:

This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.

Hm, is there any need of using ppoll over poll? IIRC it's a good bit
more common and there's, also iirc, a number of platforms with buggy
ppoll implementations.

Greetings,

Andres Freund

I used ppoll() as it can support the microseconds duration for \SLEEP meta command.

thanks!
doug
--
Doug Rady
Amazon Aurora PostgreSQL
radydoug@amazon.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Fabien COELHO
coelho@cri.ensmp.fr
In reply to: Rady, Doug (#1)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.

One based on 'master' which can also apply to REL_10_STABLE.

/home/fabien/pgbench-ppoll.patch:137: trailing whitespace.
#define PFD_THREAD_INIT(t,s,n) { do ...
error: patch failed: configure:13024
error: configure: patch does not apply
error: patch failed: configure.in:1430
error: configure.in: patch does not apply
error: patch failed: src/bin/pgbench/pgbench.c:4588
error: src/bin/pgbench/pgbench.c: patch does not apply

--
Fabien.

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Rady, Doug
radydoug@amazon.com
In reply to: Fabien COELHO (#5)
2 attachment(s)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

Fixed the trailing garbage … sigh.
Fixed the init of unused pollfds.

Re-based for both 10 & master.

--
Thank you!
doug

On 10/3/17, 23:21, "Fabien COELHO" <coelho@cri.ensmp.fr> wrote:

This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.

One based on 'master' which can also apply to REL_10_STABLE.

/home/fabien/pgbench-ppoll.patch:137: trailing whitespace.
#define PFD_THREAD_INIT(t,s,n) { do ...
error: patch failed: configure:13024
error: configure: patch does not apply
error: patch failed: configure.in:1430
error: configure.in: patch does not apply
error: patch failed: src/bin/pgbench/pgbench.c:4588
error: src/bin/pgbench/pgbench.c: patch does not apply

--
Fabien.

Attachments:

pgbench11-ppoll-v2.patchapplication/octet-stream; name=pgbench11-ppoll-v2.patchDownload
diff --git a/configure.in b/configure.in
index cea7fd0755..1a163a1be5 100644
--- a/configure.in
+++ b/configure.in
@@ -1004,6 +1004,7 @@ AC_SEARCH_LIBS(crypt, crypt)
 AC_SEARCH_LIBS(shm_open, rt)
 AC_SEARCH_LIBS(shm_unlink, rt)
 AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
 # Solaris:
 AC_SEARCH_LIBS(fdatasync, [rt posix4])
 # Required for thread_test.c on Solaris
@@ -1418,7 +1419,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 5d8a01c72c..342a55baab 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>		/* for getrlimit */
@@ -56,6 +59,10 @@
 #define M_PI 3.14159265358979323846
 #endif
 
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
 #include "pgbench.h"
 
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
@@ -90,6 +97,27 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #define MAXCLIENTS	1024
 #endif
 
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { (s)->pfdp->fd = 0; (s)->pfdp->events = (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_SETFD(s) { do { (s)->pfdp->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p)	struct pollfd   (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; (s)[_i].pfdp->fd = -1; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
 #define DEFAULT_NXACTS	10		/* default nxacts */
 
@@ -331,6 +359,7 @@ typedef struct
 	/* per client collected stats */
 	int64		cnt;			/* client transaction count, for -t */
 	int			ecnt;			/* error count */
+	PFD_STRUCT_POLLFD(*pfdp)
 } CState;
 
 /*
@@ -351,6 +380,7 @@ typedef struct
 	instr_time	conn_time;
 	StatsData	stats;
 	int64		latency_late;	/* executed but late transactions */
+	PFD_STRUCT_POLLFD(*pfds)
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
 static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
 static void setalarm(int seconds);
+static void finishCon(CState *st);
 
 
 /* callback functions for our flex lexer */
@@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 					/* Reset session-local state */
 					memset(st->prepared, 0, sizeof(st->prepared));
+					PFD_ZERO(st);
+					PFD_SETFD(st);
 				}
 
 				/*
@@ -2402,8 +2435,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 				if (is_connect)
 				{
-					PQfinish(st->con);
-					st->con = NULL;
+					finishCon(st);
 					INSTR_TIME_SET_ZERO(now);
 				}
 
@@ -2441,10 +2473,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 			case CSTATE_ABORTED:
 			case CSTATE_FINISHED:
 				if (st->con != NULL)
-				{
-					PQfinish(st->con);
-					st->con = NULL;
-				}
+					finishCon(st);
 				return;
 		}
 	}
@@ -2592,13 +2621,7 @@ disconnect_all(CState *state, int length)
 	int			i;
 
 	for (i = 0; i < length; i++)
-	{
-		if (state[i].con)
-		{
-			PQfinish(state[i].con);
-			state[i].con = NULL;
-		}
-	}
+		finishCon(&state[i]);
 }
 
 /* create tables and setup data */
@@ -3763,7 +3786,11 @@ main(int argc, char **argv)
 			case 'c':
 				benchmarking_option_set = true;
 				nclients = atoi(optarg);
+#ifdef USE_PPOLL
+				if (nclients <= 0)
+#else
 				if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
 				{
 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
 							optarg);
@@ -4459,6 +4486,8 @@ threadRun(void *arg)
 		}
 	}
 
+	PFD_THREAD_INIT(thread, state, nstate);
+
 	if (!is_connect)
 	{
 		/* make connections to the database */
@@ -4466,6 +4495,7 @@ threadRun(void *arg)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
+			PFD_SETFD(&state[i]);
 		}
 	}
 
@@ -4482,14 +4512,16 @@ threadRun(void *arg)
 	/* loop till all clients have terminated */
 	while (remains > 0)
 	{
-		fd_set		input_mask;
 		int			maxsock;	/* max socket number to be waited for */
 		int64		min_usec;
 		int64		now_usec = 0;	/* set this only if needed */
+#ifndef USE_PPOLL
+		fd_set		input_mask;
 
 		/* identify which client sockets should be checked for input */
 		FD_ZERO(&input_mask);
 		maxsock = -1;
+#endif
 		min_usec = PG_INT64_MAX;
 		for (i = 0; i < nstate; i++)
 		{
@@ -4499,8 +4531,7 @@ threadRun(void *arg)
 			{
 				/* interrupt client that has not started a transaction */
 				st->state = CSTATE_FINISHED;
-				PQfinish(st->con);
-				st->con = NULL;
+				finishCon(st);
 				remains--;
 			}
 			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4538,7 +4569,13 @@ threadRun(void *arg)
 					goto done;
 				}
 
+#ifdef USE_PPOLL
+				maxsock = 0;
+				(st->pfdp)->events = POLL_EVENTS;
+				(st->pfdp)->revents = 0;
+#else
 				FD_SET(sock, &input_mask);
+#endif
 				if (maxsock < sock)
 					maxsock = sock;
 			}
@@ -4586,11 +4623,19 @@ threadRun(void *arg)
 			{
 				if (maxsock != -1)
 				{
+#ifdef USE_PPOLL
+					struct timespec timeout;
+
+					timeout.tv_sec = min_usec / 1000000;
+					timeout.tv_nsec = min_usec % 1000000000;
+					nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
 					struct timeval timeout;
 
 					timeout.tv_sec = min_usec / 1000000;
 					timeout.tv_usec = min_usec % 1000000;
 					nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
 				}
 				else /* nothing active, simple sleep */
 				{
@@ -4599,7 +4644,11 @@ threadRun(void *arg)
 			}
 			else /* no explicit delay, select without timeout */
 			{
+#ifdef USE_PPOLL
+				nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
 			}
 
 			if (nsocks < 0)
@@ -4610,15 +4659,17 @@ threadRun(void *arg)
 					continue;
 				}
 				/* must be something wrong */
-				fprintf(stderr, "select() failed: %s\n", strerror(errno));
+				fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
 				goto done;
 			}
 		}
+#ifndef USE_PPOLL
 		else /* min_usec == 0, i.e. something needs to be executed */
 		{
 			/* If we didn't call select(), don't try to read any data */
 			FD_ZERO(&input_mask);
 		}
+#endif
 
 		/* ok, advance the state machine of each connection */
 		for (i = 0; i < nstate; i++)
@@ -4637,7 +4688,18 @@ threadRun(void *arg)
 					goto done;
 				}
 
+#ifdef USE_PPOLL
+				if ((st->pfdp)->revents & POLL_FAIL)
+				{
+					fprintf(stderr,
+						"ppoll() fail - errno: %d, i: %d, events: %x\n",
+						errno, i, ((st->pfdp)->revents & POLL_FAIL));
+				}
+
+				if (!(st->pfdp)->revents)
+#else
 				if (!FD_ISSET(sock, &input_mask))
+#endif
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
@@ -4651,7 +4713,10 @@ threadRun(void *arg)
 
 			/* If doCustom changed client to finished state, reduce remains */
 			if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+			{
 				remains--;
+				PFD_ZERO(st);
+			}
 		}
 
 		/* progress report is made by thread 0 for all threads */
@@ -4765,9 +4830,21 @@ done:
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
+	PFD_THREAD_FREE(thread);
 	return NULL;
 }
 
+static void
+finishCon(CState *st)
+{
+	if (st->con)
+	{
+		PQfinish(st->con);
+		st->con = NULL;
+	}
+	PFD_ZERO(st);
+}
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index cfdcc5ac62..21d9bc0300 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -394,6 +394,9 @@
 /* Define to 1 if you have the <poll.h> header file. */
 #undef HAVE_POLL_H
 
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `posix_fadvise' function. */
 #undef HAVE_POSIX_FADVISE
 
pgbench10-ppoll-v2.patchapplication/octet-stream; name=pgbench10-ppoll-v2.patchDownload
diff --git a/configure.in b/configure.in
index 68a1e0c184..78df83bdd5 100644
--- a/configure.in
+++ b/configure.in
@@ -1035,6 +1035,7 @@ AC_SEARCH_LIBS(crypt, crypt)
 AC_SEARCH_LIBS(shm_open, rt)
 AC_SEARCH_LIBS(shm_unlink, rt)
 AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
 # Solaris:
 AC_SEARCH_LIBS(fdatasync, [rt posix4])
 # Required for thread_test.c on Solaris
@@ -1430,7 +1431,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2bdfc89d2a..79bbe3ec7a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>		/* for getrlimit */
@@ -56,6 +59,10 @@
 #define M_PI 3.14159265358979323846
 #endif
 
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
 #include "pgbench.h"
 
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
@@ -90,6 +97,27 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #define MAXCLIENTS	1024
 #endif
 
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { (s)->pfdp->fd = 0; (s)->pfdp->events = (s)->pfdp->revents = 0; } } while(0); }
+#define PFD_SETFD(s) { do { (s)->pfdp->fd = PQsocket((s)->con); } while(0); }
+#define PFD_STRUCT_POLLFD(p)	struct pollfd   (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; (s)[_i].pfdp->fd = -1; } } while (0); }
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
 #define DEFAULT_NXACTS	10		/* default nxacts */
 
@@ -331,6 +359,7 @@ typedef struct
 	/* per client collected stats */
 	int64		cnt;			/* transaction count */
 	int			ecnt;			/* error count */
+	PFD_STRUCT_POLLFD(*pfdp)
 } CState;
 
 /*
@@ -351,6 +380,7 @@ typedef struct
 	instr_time	conn_time;
 	StatsData	stats;
 	int64		latency_late;	/* executed but late transactions */
+	PFD_STRUCT_POLLFD(*pfds)
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
 static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
 static void setalarm(int seconds);
+static void finishCon(CState *st);
 
 
 /* callback functions for our flex lexer */
@@ -2100,6 +2131,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 					/* Reset session-local state */
 					memset(st->prepared, 0, sizeof(st->prepared));
+					PFD_ZERO(st);
+					PFD_SETFD(st);
 				}
 
 				/*
@@ -2372,8 +2405,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 				if (is_connect)
 				{
-					PQfinish(st->con);
-					st->con = NULL;
+					finishCon(st);
 					INSTR_TIME_SET_ZERO(now);
 				}
 
@@ -2412,10 +2444,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 			case CSTATE_ABORTED:
 			case CSTATE_FINISHED:
 				if (st->con != NULL)
-				{
-					PQfinish(st->con);
-					st->con = NULL;
-				}
+					finishCon(st);
 				return;
 		}
 	}
@@ -2553,13 +2582,7 @@ disconnect_all(CState *state, int length)
 	int			i;
 
 	for (i = 0; i < length; i++)
-	{
-		if (state[i].con)
-		{
-			PQfinish(state[i].con);
-			state[i].con = NULL;
-		}
-	}
+		finishCon(&state[i]);
 }
 
 /* create tables and setup data */
@@ -3730,7 +3753,11 @@ main(int argc, char **argv)
 			case 'c':
 				benchmarking_option_set = true;
 				nclients = atoi(optarg);
+#ifdef USE_PPOLL
+				if (nclients <= 0)
+#else
 				if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
 				{
 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
 							optarg);
@@ -4420,6 +4447,8 @@ threadRun(void *arg)
 		}
 	}
 
+	PFD_THREAD_INIT(thread, state, nstate);
+
 	if (!is_connect)
 	{
 		/* make connections to the database */
@@ -4427,6 +4456,7 @@ threadRun(void *arg)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
+			PFD_SETFD(&state[i]);
 		}
 	}
 
@@ -4443,14 +4473,16 @@ threadRun(void *arg)
 	/* loop till all clients have terminated */
 	while (remains > 0)
 	{
-		fd_set		input_mask;
 		int			maxsock;	/* max socket number to be waited for */
 		int64		min_usec;
 		int64		now_usec = 0;	/* set this only if needed */
+#ifndef USE_PPOLL
+		fd_set		input_mask;
 
 		/* identify which client sockets should be checked for input */
 		FD_ZERO(&input_mask);
 		maxsock = -1;
+#endif
 		min_usec = PG_INT64_MAX;
 		for (i = 0; i < nstate; i++)
 		{
@@ -4460,8 +4492,7 @@ threadRun(void *arg)
 			{
 				/* interrupt client that has not started a transaction */
 				st->state = CSTATE_FINISHED;
-				PQfinish(st->con);
-				st->con = NULL;
+				finishCon(st);
 				remains--;
 			}
 			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4499,7 +4530,13 @@ threadRun(void *arg)
 					goto done;
 				}
 
+#ifdef USE_PPOLL
+				maxsock = 0;
+				(st->pfdp)->events = POLL_EVENTS;
+				(st->pfdp)->revents = 0;
+#else
 				FD_SET(sock, &input_mask);
+#endif
 				if (maxsock < sock)
 					maxsock = sock;
 			}
@@ -4547,11 +4584,19 @@ threadRun(void *arg)
 			{
 				if (maxsock != -1)
 				{
+#ifdef USE_PPOLL
+					struct timespec timeout;
+
+					timeout.tv_sec = min_usec / 1000000;
+					timeout.tv_nsec = min_usec % 1000000000;
+					nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
 					struct timeval timeout;
 
 					timeout.tv_sec = min_usec / 1000000;
 					timeout.tv_usec = min_usec % 1000000;
 					nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
 				}
 				else /* nothing active, simple sleep */
 				{
@@ -4560,7 +4605,11 @@ threadRun(void *arg)
 			}
 			else /* no explicit delay, select without timeout */
 			{
+#ifdef USE_PPOLL
+				nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
 			}
 
 			if (nsocks < 0)
@@ -4571,15 +4620,17 @@ threadRun(void *arg)
 					continue;
 				}
 				/* must be something wrong */
-				fprintf(stderr, "select() failed: %s\n", strerror(errno));
+				fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
 				goto done;
 			}
 		}
+#ifndef USE_PPOLL
 		else /* min_usec == 0, i.e. something needs to be executed */
 		{
 			/* If we didn't call select(), don't try to read any data */
 			FD_ZERO(&input_mask);
 		}
+#endif
 
 		/* ok, advance the state machine of each connection */
 		for (i = 0; i < nstate; i++)
@@ -4598,7 +4649,18 @@ threadRun(void *arg)
 					goto done;
 				}
 
+#ifdef USE_PPOLL
+				if ((st->pfdp)->revents & POLL_FAIL)
+				{
+					fprintf(stderr,
+						"ppoll() fail - errno: %d, i: %d, events: %x\n",
+						errno, i, ((st->pfdp)->revents & POLL_FAIL));
+				}
+
+				if (!(st->pfdp)->revents)
+#else
 				if (!FD_ISSET(sock, &input_mask))
+#endif
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
@@ -4612,7 +4674,10 @@ threadRun(void *arg)
 
 			/* If doCustom changed client to finished state, reduce remains */
 			if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+			{
 				remains--;
+				PFD_ZERO(st);
+			}
 		}
 
 		/* progress report is made by thread 0 for all threads */
@@ -4726,9 +4791,21 @@ done:
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
+	PFD_THREAD_FREE(thread);
 	return NULL;
 }
 
+static void
+finishCon(CState *st)
+{
+	if (st->con)
+	{
+		PQfinish(st->con);
+		st->con = NULL;
+	}
+	PFD_ZERO(st);
+}
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index c65dd7db21..63c0e74ff7 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -390,6 +390,9 @@
 /* Define to 1 if you have the <poll.h> header file. */
 #undef HAVE_POLL_H
 
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `posix_fadvise' function. */
 #undef HAVE_POSIX_FADVISE
 
#7Robert Haas
robertmhaas@gmail.com
In reply to: Rady, Doug (#1)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

On Mon, Sep 25, 2017 at 8:01 PM, Rady, Doug <radydoug@amazon.com> wrote:

This patch enables building pgbench to use ppoll() instead of select()

to allow for more than (FD_SETSIZE - 10) connections. As implemented,

when using ppoll(), the only connection limitation is system resources.

So what's an example of something that fails without this patch but
works with the patch?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Rady, Doug
radydoug@amazon.com
In reply to: Robert Haas (#7)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

Without this patch, one is limited to '(FD_SETSIZE - 10)’ number of connections.
Example of something that fails without this patch but works with the patch:

Without the patch:

$ pgbench -j 3000 -c 1500
invalid number of clients: "1500"

With the patch:

$ pgbench -j 3000 -c 1500
starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 2000
query mode: simple
number of clients: 1500
number of threads: 1500
number of transactions per client: 10
number of transactions actually processed: 15000/15000
latency average = 631.730 ms
tps = 2374.430587 (including connections establishing)
tps = 4206.524986 (excluding connections establishing)

--
doug

On 10/26/17, 04:46, "Robert Haas" <robertmhaas@gmail.com> wrote:

On Mon, Sep 25, 2017 at 8:01 PM, Rady, Doug <radydoug@amazon.com> wrote:

This patch enables building pgbench to use ppoll() instead of select()

to allow for more than (FD_SETSIZE - 10) connections. As implemented,

when using ppoll(), the only connection limitation is system resources.

So what's an example of something that fails without this patch but
works with the patch?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Fabien COELHO
coelho@cri.ensmp.fr
In reply to: Rady, Doug (#6)
Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

Hello,

Could you rebase the v11 patch?

--
Fabien.

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Michael Paquier
michael.paquier@gmail.com
In reply to: Fabien COELHO (#9)
Re: [HACKERS] PATCH: pgbench - option to build using ppoll() for larger connection counts

On Fri, Nov 3, 2017 at 2:29 PM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:

Could you rebase the v11 patch?

This patch has been waiting for a rebase for more than three weeks as
of today, I am marking it as returned with feedback. It would be a
good idea to reply to Robert's input in
/messages/by-id/CA+TgmoYybNv-DdhVPMxLB2nx2SqeNJirtWHmdEAZUCGoTB2VBg@mail.gmail.com.
--
Michael

#11Fabien COELHO
coelho@cri.ensmp.fr
In reply to: Michael Paquier (#10)
Re: [HACKERS] PATCH: pgbench - option to build using ppoll() for larger connection counts

Hello Michaᅵl,

Could you rebase the v11 patch?

This patch has been waiting for a rebase for more than three weeks as
of today, I am marking it as returned with feedback. It would be a
good idea to reply to Robert's input in
/messages/by-id/CA+TgmoYybNv-DdhVPMxLB2nx2SqeNJirtWHmdEAZUCGoTB2VBg@mail.gmail.com.

ISTM that this was done: If -c is high enough, pgbench fails without the
patch.

--
Fabien.