PATCH: pgbench - option to build using ppoll() for larger connection counts

Started by Rady, Dougover 8 years ago1 messages
#1Rady, Doug
radydoug@amazon.com
1 attachment(s)

This patch enables building pgbench to use ppoll() instead of select()
to allow for more than (FD_SETSIZE - 10) connections. As implemented,
when using ppoll(), the only connection limitation is system resources.

To maintain the microseconds duration of \SLEEP meta command, ppoll() was used instead of poll().

The patch has been implemented to introduce a minimal of #ifdef/#ifndef
clutter in the code.

(this is just a re-send w/o the 9.6 patch as commitfest latched onto the 9.6 patch)

doug
--
Doug Rady
Amazon Aurora PostgreSQL
radydoug@amazon.com

Attachments:

pgbench-ppoll-v1.patchapplication/octet-stream; name=pgbench-ppoll-v1.patchDownload
diff --git a/configure b/configure
index 0d76e5ea42..756371be22 100755
--- a/configure
+++ b/configure
@@ -9511,6 +9511,62 @@ if test "$ac_res" != no; then :
 
 fi
 
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing ppoll" >&5
+$as_echo_n "checking for library containing ppoll... " >&6; }
+if ${ac_cv_search_ppoll+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ppoll ();
+int
+main ()
+{
+return ppoll ();
+  ;
+  return 0;
+}
+_ACEOF
+for ac_lib in '' c; do
+  if test -z "$ac_lib"; then
+    ac_res="none required"
+  else
+    ac_res=-l$ac_lib
+    LIBS="-l$ac_lib  $ac_func_search_save_LIBS"
+  fi
+  if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_search_ppoll=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext
+  if ${ac_cv_search_ppoll+:} false; then :
+  break
+fi
+done
+if ${ac_cv_search_ppoll+:} false; then :
+
+else
+  ac_cv_search_ppoll=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_ppoll" >&5
+$as_echo "$ac_cv_search_ppoll" >&6; }
+ac_res=$ac_cv_search_ppoll
+if test "$ac_res" != no; then :
+  test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
 # Solaris:
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
 $as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -13024,7 +13080,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
+for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index bdc41b071f..b73e848732 100644
--- a/configure.in
+++ b/configure.in
@@ -1035,6 +1035,7 @@ AC_SEARCH_LIBS(crypt, crypt)
 AC_SEARCH_LIBS(shm_open, rt)
 AC_SEARCH_LIBS(shm_unlink, rt)
 AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(ppoll, c)
 # Solaris:
 AC_SEARCH_LIBS(fdatasync, [rt posix4])
 # Required for thread_test.c on Solaris
@@ -1430,7 +1431,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll ppoll pstat pthread_is_threaded_np readlink setproctitle setsid shm_open symlink sync_file_range towlower utime utimes wcstombs wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e37496c971..7968160069 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -47,6 +47,9 @@
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>		/* for getrlimit */
@@ -56,6 +59,10 @@
 #define M_PI 3.14159265358979323846
 #endif
 
+#ifdef HAVE_PPOLL
+#define USE_PPOLL 1
+#endif
+
 #include "pgbench.h"
 
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
@@ -90,6 +97,27 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #define MAXCLIENTS	1024
 #endif
 
+#ifdef USE_PPOLL
+#define SCKTWTMTHD "ppoll"
+#undef MAXCLIENTS
+#define POLL_EVENTS (POLLIN|POLLPRI|POLLRDHUP)
+#define POLL_FAIL (POLLERR|POLLHUP|POLLNVAL|POLLRDHUP)
+#define PFD_CLREV(s) { do { if ((s)->pfdp) { ((s)->pfdp)->revents = 0; } } while(0); }
+#define PFD_ZERO(s) { do { if ((s)->pfdp) { memset((s)->pfdp, 0, sizeof(struct pollfd)); } } while(0); }
+#define PFD_SETFD(s) { do { ((s)->pfdp)->fd = PQsocket((s)->con); } while(0); }
+#define	PFD_STRUCT_POLLFD(p)	struct pollfd   (p);
+#define PFD_THREAD_FREE(t) { do { if ((t)->pfds) { pg_free((t)->pfds); (t)->pfds = NULL; } } while (0); }
+#define PFD_THREAD_INIT(t,s,n) { do { int _i; (t)->pfds = (struct pollfd *) pg_malloc0(sizeof(struct pollfd) * (n)); for (_i = 0; _i < (n); _i++) { (s)[_i].pfdp = &(t)->pfds[_i]; } } while (0); } 
+#else
+#define SCKTWTMTHD "select"
+#define PFD_CLREV(s)
+#define PFD_ZERO(s)
+#define PFD_SETFD(s)
+#define PFD_STRUCT_POLLFD(p)
+#define PFD_THREAD_FREE(t)
+#define PFD_THREAD_INIT(t,s,n)
+#endif
+
 #define LOG_STEP_SECONDS	5	/* seconds between log messages */
 #define DEFAULT_NXACTS	10		/* default nxacts */
 
@@ -331,6 +359,7 @@ typedef struct
 	/* per client collected stats */
 	int64		cnt;			/* client transaction count, for -t */
 	int			ecnt;			/* error count */
+	PFD_STRUCT_POLLFD(*pfdp)
 } CState;
 
 /*
@@ -351,6 +380,7 @@ typedef struct
 	instr_time	conn_time;
 	StatsData	stats;
 	int64		latency_late;	/* executed but late transactions */
+	PFD_STRUCT_POLLFD(*pfds)
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -458,6 +488,7 @@ static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
 static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
 static void setalarm(int seconds);
+static void finishCon(CState *st);
 
 
 /* callback functions for our flex lexer */
@@ -2137,6 +2168,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 					/* Reset session-local state */
 					memset(st->prepared, 0, sizeof(st->prepared));
+					PFD_ZERO(st);
+					PFD_SETFD(st);
 				}
 
 				/*
@@ -2406,8 +2439,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
 				if (is_connect)
 				{
-					PQfinish(st->con);
-					st->con = NULL;
+					finishCon(st);
 					INSTR_TIME_SET_ZERO(now);
 				}
 
@@ -2444,11 +2476,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 				 */
 			case CSTATE_ABORTED:
 			case CSTATE_FINISHED:
-				if (st->con != NULL)
-				{
-					PQfinish(st->con);
-					st->con = NULL;
-				}
+				finishCon(st);
 				return;
 		}
 	}
@@ -2596,13 +2624,7 @@ disconnect_all(CState *state, int length)
 	int			i;
 
 	for (i = 0; i < length; i++)
-	{
-		if (state[i].con)
-		{
-			PQfinish(state[i].con);
-			state[i].con = NULL;
-		}
-	}
+		finishCon(&state[i]);
 }
 
 /* create tables and setup data */
@@ -3767,7 +3789,11 @@ main(int argc, char **argv)
 			case 'c':
 				benchmarking_option_set = true;
 				nclients = atoi(optarg);
+#ifdef USE_PPOLL
+				if (nclients <= 0)
+#else
 				if (nclients <= 0 || nclients > MAXCLIENTS)
+#endif
 				{
 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
 							optarg);
@@ -4463,6 +4489,8 @@ threadRun(void *arg)
 		}
 	}
 
+	PFD_THREAD_INIT(thread, state, nstate);
+
 	if (!is_connect)
 	{
 		/* make connections to the database */
@@ -4470,6 +4498,7 @@ threadRun(void *arg)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
+			PFD_SETFD(&state[i]);
 		}
 	}
 
@@ -4486,14 +4515,17 @@ threadRun(void *arg)
 	/* loop till all clients have terminated */
 	while (remains > 0)
 	{
-		fd_set		input_mask;
 		int			maxsock;	/* max socket number to be waited for */
 		int64		min_usec;
 		int64		now_usec = 0;	/* set this only if needed */
+#ifndef USE_PPOLL
+		fd_set		input_mask;
 
 		/* identify which client sockets should be checked for input */
 		FD_ZERO(&input_mask);
 		maxsock = -1;
+#endif
+
 		min_usec = PG_INT64_MAX;
 		for (i = 0; i < nstate; i++)
 		{
@@ -4503,8 +4535,7 @@ threadRun(void *arg)
 			{
 				/* interrupt client that has not started a transaction */
 				st->state = CSTATE_FINISHED;
-				PQfinish(st->con);
-				st->con = NULL;
+				finishCon(st);
 				remains--;
 			}
 			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
@@ -4542,7 +4573,13 @@ threadRun(void *arg)
 					goto done;
 				}
 
+#ifdef USE_PPOLL
+				maxsock = 0;
+				(st->pfdp)->events = POLL_EVENTS;
+				(st->pfdp)->revents = 0;
+#else
 				FD_SET(sock, &input_mask);
+#endif
 				if (maxsock < sock)
 					maxsock = sock;
 			}
@@ -4588,14 +4625,26 @@ threadRun(void *arg)
 
 			if (min_usec != PG_INT64_MAX)
 			{
+#ifdef USE_PPOLL
+				struct timespec timeout;
+
+				timeout.tv_sec = min_usec / 1000000;
+				timeout.tv_nsec = min_usec % 1000000000;
+				nsocks = ppoll(thread->pfds, nstate, &timeout, NULL);
+#else
 				struct timeval timeout;
 
 				timeout.tv_sec = min_usec / 1000000;
 				timeout.tv_usec = min_usec % 1000000;
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+#endif
 			}
 			else
+#ifdef USE_PPOLL
+				nsocks = ppoll(thread->pfds, nstate, NULL, NULL);
+#else
 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+#endif
 			if (nsocks < 0)
 			{
 				if (errno == EINTR)
@@ -4604,15 +4653,17 @@ threadRun(void *arg)
 					continue;
 				}
 				/* must be something wrong */
-				fprintf(stderr, "select() failed: %s\n", strerror(errno));
+				fprintf(stderr, "%s() failed: %s\n", SCKTWTMTHD, strerror(errno));
 				goto done;
 			}
 		}
+#ifndef USE_PPOLL
 		else
 		{
 			/* If we didn't call select(), don't try to read any data */
 			FD_ZERO(&input_mask);
 		}
+#endif
 
 		/* ok, advance the state machine of each connection */
 		for (i = 0; i < nstate; i++)
@@ -4630,8 +4681,18 @@ threadRun(void *arg)
 							PQerrorMessage(st->con));
 					goto done;
 				}
+#ifdef USE_PPOLL
+				if ((st->pfdp)->revents & POLL_FAIL)
+				{
+					fprintf(stderr,
+						"ppoll() fail - errno: %d, i: %d, events: %x\n",
+						errno, i, ((st->pfdp)->revents & POLL_FAIL));
+				}
 
+				if (!(st->pfdp)->revents)
+#else
 				if (!FD_ISSET(sock, &input_mask))
+#endif
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
@@ -4645,7 +4706,10 @@ threadRun(void *arg)
 
 			/* If doCustom changed client to finished state, reduce remains */
 			if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+			{
 				remains--;
+				PFD_ZERO(st);
+			}
 		}
 
 		/* progress report is made by thread 0 for all threads */
@@ -4759,9 +4823,21 @@ done:
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
+	PFD_THREAD_FREE(thread);
 	return NULL;
 }
 
+static void
+finishCon(CState *st)
+{
+	if (st->con)
+	{
+		PQfinish(st->con);
+		st->con = NULL;
+	}
+	PFD_ZERO(st);
+}
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 579d195663..2d016c8a1f 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -390,6 +390,9 @@
 /* Define to 1 if you have the <poll.h> header file. */
 #undef HAVE_POLL_H
 
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `posix_fadvise' function. */
 #undef HAVE_POSIX_FADVISE