multi-threaded pgbench

Started by Itagaki Takahiroover 16 years ago26 messages
#1Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
1 attachment(s)

Pgbench is a famous tool to measure postgres performance, but nowadays
it does not work well because it cannot use multiple CPUs. On the other
hand, postgres server can use CPUs very well, so the bottle-neck of
workload is *in pgbench*.

Multi-threading would be a solution. The attached patch adds -j
(number of jobs) option to pgbench. If the value N is greater than 1,
pgbench runs with N threads. Connections are equally-divided into
them (ex. -c64 -j4 => 4 threads with 16 connections each). It can
run on POSIX platforms with pthread and on Windows with win32 threads.

Here are results of multi-threaded pgbench runs on Fedora 11 with intel
core i7 (8 logical cores = 4 physical cores * HT). -j8 (8 threads) was
the best and the tps is 4.5 times of -j1, that is a traditional result.

$ pgbench -i -s10
$ pgbench -n -S -c64 -j1 => tps = 11600.158593
$ pgbench -n -S -c64 -j2 => tps = 17947.100954
$ pgbench -n -S -c64 -j4 => tps = 26571.124001
$ pgbench -n -S -c64 -j8 => tps = 52725.470403
$ pgbench -n -S -c64 -j16 => tps = 38976.675319
$ pgbench -n -S -c64 -j32 => tps = 28998.499601
$ pgbench -n -S -c64 -j64 => tps = 26701.877815

Is it acceptable to use pthread in contrib module?
If ok, I will add the patch to the next commitfest.

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

Attachments:

pgbench-mt.patchapplication/octet-stream; name=pgbench-mt.patchDownload
*** pgbench.c	Wed Jul  8 13:56:57 2009
--- pgbench-mt.c	Wed Jul  8 14:18:25 2009
***************
*** 55,64 ****
  #include <sys/resource.h>		/* for getrlimit */
  #endif
  
  extern char *optarg;
  extern int	optind;
  
- 
  /********************************************************************
   * some configurable parameters */
  
--- 55,74 ----
  #include <sys/resource.h>		/* for getrlimit */
  #endif
  
+ #ifdef WIN32
+ typedef struct win32_pthread   *pthread_t;
+ typedef int						pthread_attr_t;
+ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg);
+ static int pthread_join(pthread_t th, void **thread_return);
+ #elif defined(ENABLE_THREAD_SAFETY)
+ #include <pthread.h>
+ #else
+ #define pthread_t	int
+ #endif
+ 
  extern char *optarg;
  extern int	optind;
  
  /********************************************************************
   * some configurable parameters */
  
***************
*** 71,77 ****
  
  #define DEFAULT_NXACTS	10		/* default nxacts */
  
- int			nclients = 1;		/* default number of simulated clients */
  int			nxacts = 0;			/* number of transactions per client */
  int			duration = 0;		/* duration in seconds */
  
--- 81,86 ----
***************
*** 99,106 ****
  
  bool		use_log;			/* log transaction latencies to a file */
  
- int			remains;			/* number of remaining clients */
- 
  int			is_connect;			/* establish connection  for each transaction */
  
  char	   *pghost = "";
--- 108,113 ----
***************
*** 144,149 ****
--- 151,168 ----
  } CState;
  
  /*
+  * Thread state
+  */
+ typedef struct
+ {
+ 	pthread_t		thread;		/* thread handle */
+ 	CState		   *state;		/* array of CState */
+ 	int				nstate;		/* length of state */
+ } TState;
+ 
+ #define INVALID_THREAD		((pthread_t) 0)
+ 
+ /*
   * queries read from files
   */
  #define SQL_COMMAND		1
***************
*** 168,175 ****
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! Command   **sql_files[MAX_FILES];		/* SQL script files */
! int			num_files;			/* number of script files */
  
  /* default scenario */
  static char *tpc_b = {
--- 187,195 ----
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! static Command	  **sql_files[MAX_FILES];	/* SQL script files */
! static int			num_files;				/* number of script files */
! static int			debug = 0;				/* debug flag */
  
  /* default scenario */
  static char *tpc_b = {
***************
*** 217,223 ****
  
  /* Function prototypes */
  static void setalarm(int seconds);
! 
  
  /* Calculate total time */
  static void
--- 237,243 ----
  
  /* Function prototypes */
  static void setalarm(int seconds);
! static void* threadRun(void *arg);
  
  /* Calculate total time */
  static void
***************
*** 376,404 ****
  	} while (res);
  }
  
- /* check to see if the SQL result was good */
- static int
- check(CState *state, PGresult *res, int n)
- {
- 	CState	   *st = &state[n];
- 
- 	switch (PQresultStatus(res))
- 	{
- 		case PGRES_COMMAND_OK:
- 		case PGRES_TUPLES_OK:
- 			/* OK */
- 			break;
- 		default:
- 			fprintf(stderr, "Client %d aborted in state %d: %s",
- 					n, st->state, PQerrorMessage(st->con));
- 			remains--;			/* I've aborted */
- 			PQfinish(st->con);
- 			st->con = NULL;
- 			return (-1);
- 	}
- 	return (0);					/* OK */
- }
- 
  static int
  compareVariables(const void *v1, const void *v2)
  {
--- 396,401 ----
***************
*** 595,605 ****
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static void
! doCustom(CState *state, int n, int debug)
  {
  	PGresult   *res;
- 	CState	   *st = &state[n];
  	Command   **commands;
  
  top:
--- 592,615 ----
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static bool
! clientDone(CState *st, bool ok)
! {
! 	(void) ok;	/* unused */
! 
! 	if (st->con != NULL)
! 	{
! 		PQfinish(st->con);
! 		st->con = NULL;
! 	}
! 	return false;	/* always false */
! }
! 
! /* return false iff client should be disconnected */
! static bool
! doCustom(CState *st)
  {
  	PGresult   *res;
  	Command   **commands;
  
  top:
***************
*** 616,622 ****
  		if (usec <= 0)
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return;				/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
--- 626,632 ----
  		if (usec <= 0)
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return true;		/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
***************
*** 624,640 ****
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", n);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
! 				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
! 				return;
  			}
  			if (PQisBusy(st->con))
! 				return;			/* don't have the whole result yet */
  		}
  
  		/*
--- 634,647 ----
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", st->id);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
! 				return clientDone(st, false);
  			}
  			if (PQisBusy(st->con))
! 				return true;	/* don't have the whole result yet */
  		}
  
  		/*
***************
*** 657,666 ****
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			if (check(state, res, n))
  			{
! 				PQclear(res);
! 				return;
  			}
  			PQclear(res);
  			discard_response(st);
--- 664,679 ----
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			switch (PQresultStatus(res))
  			{
! 				case PGRES_COMMAND_OK:
! 				case PGRES_TUPLES_OK:
! 					break;	/* OK */
! 				default:
! 					fprintf(stderr, "Client %d aborted in state %d: %s",
! 						st->id, st->state, PQerrorMessage(st->con));
! 					PQclear(res);
! 					return clientDone(st, false);
  			}
  			PQclear(res);
  			discard_response(st);
***************
*** 676,690 ****
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 			{
! 				remains--;		/* I've done */
! 				if (st->con != NULL)
! 				{
! 					PQfinish(st->con);
! 					st->con = NULL;
! 				}
! 				return;
! 			}
  		}
  
  		/* increment state counter */
--- 689,695 ----
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 				return clientDone(st, true);	/* exit success */
  		}
  
  		/* increment state counter */
***************
*** 706,717 ****
  		gettimeofday(&t1, NULL);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n",
! 					n);
! 			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
! 			return;
  		}
  		gettimeofday(&t2, NULL);
  		diffTime(&t2, &t1, &t3);
--- 711,718 ----
  		gettimeofday(&t1, NULL);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
! 			return clientDone(st, false);
  		}
  		gettimeofday(&t2, NULL);
  		diffTime(&t2, &t1, &t3);
***************
*** 735,745 ****
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
--- 736,746 ----
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return true;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
***************
*** 751,757 ****
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
--- 752,758 ----
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
***************
*** 785,791 ****
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
--- 786,792 ----
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
***************
*** 795,801 ****
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
  			st->ecnt++;
  		}
  		else
--- 796,802 ----
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
  			st->ecnt++;
  		}
  		else
***************
*** 809,815 ****
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", n, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
--- 810,816 ----
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
***************
*** 828,834 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				min = atoi(var);
  			}
--- 829,835 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				min = atoi(var);
  			}
***************
*** 850,856 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  				max = atoi(var);
  			}
--- 851,857 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  				max = atoi(var);
  			}
***************
*** 861,867 ****
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return;
  			}
  
  #ifdef DEBUG
--- 862,868 ----
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return true;
  			}
  
  #ifdef DEBUG
***************
*** 873,879 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 874,880 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
***************
*** 891,897 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				ope1 = atoi(var);
  			}
--- 892,898 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				ope1 = atoi(var);
  			}
***************
*** 908,914 ****
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return;
  					}
  					ope2 = atoi(var);
  				}
--- 909,915 ----
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return true;
  					}
  					ope2 = atoi(var);
  				}
***************
*** 927,933 ****
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
--- 928,934 ----
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return true;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
***************
*** 935,941 ****
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  			}
  
--- 936,942 ----
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  			}
  
***************
*** 943,949 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 944,950 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
***************
*** 960,966 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return;
  				}
  				usec = atoi(var);
  			}
--- 961,967 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return true;
  				}
  				usec = atoi(var);
  			}
***************
*** 987,1004 ****
  
  		goto top;
  	}
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state)
  {
  	int			i;
  
! 	for (i = 0; i < nclients; i++)
  	{
  		if (state[i].con)
  			PQfinish(state[i].con);
  	}
  }
  
--- 988,1010 ----
  
  		goto top;
  	}
+ 
+ 	return true;
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state, int length)
  {
  	int			i;
  
! 	for (i = 0; i < length; i++)
  	{
  		if (state[i].con)
+ 		{
  			PQfinish(state[i].con);
+ 			state[i].con = NULL;
+ 		}
  	}
  }
  
***************
*** 1450,1457 ****
  
  /* print out results */
  static void
! printResults(
! 			 int ttype, CState *state,
  			 struct timeval * start_time, struct timeval * end_time)
  {
  	double		t1,
--- 1456,1462 ----
  
  /* print out results */
  static void
! printResults(int ttype, const CState *state, int nclients, int nthreads,
  			 struct timeval * start_time, struct timeval * end_time)
  {
  	double		t1,
***************
*** 1483,1488 ****
--- 1488,1494 ----
  	printf("scaling factor: %d\n", scale);
  	printf("query mode: %s\n", QUERYMODE[querymode]);
  	printf("number of clients: %d\n", nclients);
+ 	printf("number of threads: %d\n", nthreads);
  	if (duration <= 0)
  	{
  		printf("number of transactions per client: %d\n", nxacts);
***************
*** 1504,1532 ****
  main(int argc, char **argv)
  {
  	int			c;
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
- 	int			debug = 0;		/* debug flag */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
  
  	struct timeval start_time;	/* start up time */
  	struct timeval end_time;	/* end time */
  
  	int			i;
  
- 	fd_set		input_mask;
- 	int			nsocks;			/* return from select(2) */
- 	int			maxsock;		/* max socket number to be waited */
- 	struct timeval now;
- 	struct timeval timeout;
- 	int			min_usec;
- 
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
--- 1510,1533 ----
  main(int argc, char **argv)
  {
  	int			c;
+ 	int			nclients = 1;		/* default number of simulated clients */
+ 	int			nthreads = 1;		/* default number of threads */
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
+ 	TState	   *threads;		/* array of thread */
  
  	struct timeval start_time;	/* start up time */
  	struct timeval end_time;	/* end time */
  
  	int			i;
  
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
***************
*** 1576,1582 ****
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
  	{
  		switch (c)
  		{
--- 1577,1583 ----
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
  	{
  		switch (c)
  		{
***************
*** 1629,1634 ****
--- 1630,1650 ----
  				}
  #endif   /* HAVE_GETRLIMIT */
  				break;
+ 			case 'j':	/* jobs */
+ 				nthreads = atoi(optarg);
+ 				if (nthreads <= 0)
+ 				{
+ 					fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+ 					exit(1);
+ 				}
+ #ifndef ENABLE_THREAD_SAFETY
+ 				if (nthreads > 1)
+ 				{
+ 					fprintf(stderr, "multi-threading is not supported\n");
+ 					exit(1);
+ 				}
+ #endif
+ 				break;
  			case 'C':
  				is_connect = 1;
  				break;
***************
*** 1749,1755 ****
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	remains = nclients;
  
  	if (nclients > 1)
  	{
--- 1765,1775 ----
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	if (nclients % nthreads != 0)
! 	{
! 		fprintf(stderr, "number of client (%d) must be multiple number of threads (%d)\n", nclients, nthreads);
! 		exit(1);
! 	}
  
  	if (nclients > 1)
  	{
***************
*** 1926,1974 ****
  			break;
  	}
  
  	/* send start up queries in async manner */
! 	for (i = 0; i < nclients; i++)
  	{
! 		Command   **commands = sql_files[state[i].use_file];
! 		int			prev_ecnt = state[i].ecnt;
  
! 		state[i].use_file = getrand(0, num_files - 1);
! 		doCustom(state, i, debug);
  
! 		if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
  			remains--;			/* I've aborted */
! 			PQfinish(state[i].con);
! 			state[i].con = NULL;
  		}
  	}
  
! 	for (;;)
  	{
! 		if (remains <= 0)
! 		{						/* all done ? */
! 			disconnect_all(state);
! 			/* get end time */
! 			gettimeofday(&end_time, NULL);
! 			printResults(ttype, state, &start_time, &end_time);
! 			if (LOGFILE)
! 				fclose(LOGFILE);
! 			exit(0);
! 		}
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
  		min_usec = -1;
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
  
! 			if (state[i].sleeping)
  			{
  				int			this_usec;
! 				int			sock = PQsocket(state[i].con);
  
  				if (min_usec < 0)
  				{
--- 1946,2048 ----
  			break;
  	}
  
+ 	/* start threads */
+ 	threads = (TState *) malloc(sizeof(TState) * nthreads);
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ #ifdef ENABLE_THREAD_SAFETY
+ 		/* the first thread (i = 0) is executed by main thread */
+ 		if (i > 0)
+ 		{
+ 			int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+ 			if (err != 0)
+ 			{
+ 				fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+ 				exit(1);
+ 			}
+ 		}
+ 		else
+ #endif
+ 		{
+ 			threads[i].thread = INVALID_THREAD;
+ 		}
+ 
+ 		threads[i].state = &state[nclients / nthreads * i];
+ 		threads[i].nstate = nclients / nthreads;
+ 	}
+ 
+ 	/* wait for threads */
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ 		if (threads[i].thread == INVALID_THREAD)
+ 			threadRun(&threads[i]);
+ #ifdef ENABLE_THREAD_SAFETY
+ 		else
+ 			pthread_join(threads[i].thread, NULL);
+ #endif
+ 	}
+ 	disconnect_all(state, nclients);
+ 
+ 	/* get end time */
+ 	gettimeofday(&end_time, NULL);
+ 	printResults(ttype, state, nclients, nthreads, &start_time, &end_time);
+ 	if (LOGFILE)
+ 		fclose(LOGFILE);
+ 
+ 	return 0;
+ }
+ 
+ static void*
+ threadRun(void *arg)
+ {
+ 	TState *thread = (TState *) arg;
+ 	CState *state = thread->state;
+ 	int		nstate = thread->nstate;
+ 	int		remains = nstate;	/* number of remaining clients */
+ 	int		i;
+ 
  	/* send start up queries in async manner */
! 	for (i = 0; i < nstate; i++)
  	{
! 		CState	   *st = &state[i];
! 		Command   **commands = sql_files[st->use_file];
! 		int			prev_ecnt = st->ecnt;
  
! 		st->use_file = getrand(0, num_files - 1);
! 		if (!doCustom(st))
! 			remains--;		/* I've aborted */
  
! 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
  			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
  		}
  	}
  
! 	while (remains > 0)
  	{
! 		fd_set			input_mask;
! 		int				nsocks;			/* return from select(2) */
! 		int				maxsock;		/* max socket number to be waited */
! 		struct timeval	now;
! 		struct timeval	timeout;
! 		int				min_usec;
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
  		min_usec = -1;
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
  
! 			if (st->sleeping)
  			{
  				int			this_usec;
! 				int			sock = PQsocket(st->con);
  
  				if (min_usec < 0)
  				{
***************
*** 1976,2002 ****
  					min_usec = 0;
  				}
  
! 				this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
! 					state[i].until.tv_usec - now.tv_usec;
  
  				if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
  					min_usec = this_usec;
  
! 				FD_SET		(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
  			}
! 			else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
  			{
! 				int			sock = PQsocket(state[i].con);
  
  				if (sock < 0)
  				{
! 					disconnect_all(state);
! 					exit(1);
  				}
! 				FD_SET		(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
--- 2050,2076 ----
  					min_usec = 0;
  				}
  
! 				this_usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
! 					st->until.tv_usec - now.tv_usec;
  
  				if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
  					min_usec = this_usec;
  
! 				FD_SET(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
  			}
! 			else if (st->con && commands[st->state]->type != META_COMMAND)
  			{
! 				int			sock = PQsocket(st->con);
  
  				if (sock < 0)
  				{
! 					disconnect_all(state, nstate);
! 					return NULL;
  				}
! 				FD_SET(sock, &input_mask);
  
  				if (maxsock < sock)
  					maxsock = sock;
***************
*** 2021,2065 ****
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
! 				disconnect_all(state);
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				exit(1);
! 			}
! #ifdef NOT_USED
! 			else if (nsocks == 0)
! 			{					/* timeout */
! 				fprintf(stderr, "select timeout\n");
! 				for (i = 0; i < nclients; i++)
! 				{
! 					fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
! 							i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
! 				}
! 				exit(0);
  			}
- #endif
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
! 			int			prev_ecnt = state[i].ecnt;
  
! 			if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
! 						  || commands[state[i].state]->type == META_COMMAND))
  			{
! 				doCustom(state, i, debug);
  			}
  
! 			if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
  				remains--;		/* I've aborted */
! 				PQfinish(state[i].con);
! 				state[i].con = NULL;
  			}
  		}
  	}
  }
  
  
--- 2095,2131 ----
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
! 				disconnect_all(state, nstate);
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				return NULL;
  			}
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
! 			int			prev_ecnt = st->ecnt;
  
! 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
! 						  || commands[st->state]->type == META_COMMAND))
  			{
! 				if (!doCustom(st))
! 					remains--;		/* I've aborted */
  			}
  
! 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
  				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
  			}
  		}
  	}
+ 
+ 	return NULL;
  }
  
  
***************
*** 2105,2110 ****
--- 2171,2242 ----
  		fprintf(stderr, "Failed to set timer\n");
  		exit(1);
  	}
+ }
+ 
+ /* partial pthread implementation for Windows */
+ 
+ typedef struct win32_pthread
+ {
+ 	HANDLE		handle;
+ 	void	   *(*routine)(void *);
+ 	void	   *arg;
+ 	void	   *result;
+ } win32_pthread;
+ 
+ static unsigned __stdcall
+ win32_pthread_run(void *arg)
+ {
+ 	win32_pthread *th = (win32_pthread *) arg;
+ 
+ 	th->result = th->routine(th->arg);
+ 
+ 	return 0;
+ }
+ 
+ static int
+ pthread_create(pthread_t *thread,
+ 			   pthread_attr_t *attr,
+ 			   void * (*start_routine)(void *),
+ 			   void * arg)
+ {
+ 	int				save_errno;
+ 	win32_pthread   *th;
+ 
+ 	th = (win32_pthread *) malloc(sizeof(win32_pthread));
+ 	th->routine = start_routine;
+ 	th->arg = arg;
+ 	th->result = NULL;
+ 
+ 	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+ 	if (th->handle == NULL)
+ 	{
+ 		save_errno = errno;
+ 		free(th);
+ 		return save_errno;
+ 	}
+ 
+ 	*thread = th;
+ 	return 0;
+ }
+ 
+ static int
+ pthread_join(pthread_t th, void **thread_return)
+ {
+ 	if (th == NULL || th->handle == NULL)
+ 		return errno = EINVAL;
+ 
+ 	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+ 	{
+ 		_dosmaperr(GetLastError());
+ 		return errno;
+ 	}
+ 
+ 	if (thread_return)
+ 		*thread_return = th->result;
+ 
+ 	CloseHandle(th->handle);
+ 	free(th);
+ 	return 0;
  }
  
  #endif   /* WIN32 */
#2Alvaro Herrera
alvherre@commandprompt.com
In reply to: Itagaki Takahiro (#1)
Re: multi-threaded pgbench

Itagaki Takahiro wrote:

Is it acceptable to use pthread in contrib module?

We don't have a precedent it seems. I think the requirement would be
that it should compile if pthread support is not present.

If ok, I will add the patch to the next commitfest.

Add it anyway -- discussion should happen during commitfest if it
doesn't spark right away.

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

#3Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Alvaro Herrera (#2)
Re: multi-threaded pgbench

Alvaro Herrera wrote:

Itagaki Takahiro wrote:

Is it acceptable to use pthread in contrib module?

We don't have a precedent it seems. I think the requirement would be
that it should compile if pthread support is not present.

My thoughts as well. But I wonder, would it be harder or easier to use
fork() instead?

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#4Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#2)
Re: multi-threaded pgbench

Alvaro Herrera <alvherre@commandprompt.com> writes:

Itagaki Takahiro wrote:

Is it acceptable to use pthread in contrib module?

We don't have a precedent it seems. I think the requirement would be
that it should compile if pthread support is not present.

Right. Breaking it for non-pthread environments is not acceptable.

The real question here is whether it will be a problem if pgbench
delivers significantly different results when built with or without
threading support. I can see arguents either way on that ...

regards, tom lane

#5Andrew Dunstan
andrew@dunslane.net
In reply to: Heikki Linnakangas (#3)
Re: multi-threaded pgbench

Heikki Linnakangas wrote:

Alvaro Herrera wrote:

Itagaki Takahiro wrote:

Is it acceptable to use pthread in contrib module?

We don't have a precedent it seems. I think the requirement would be
that it should compile if pthread support is not present.

My thoughts as well. But I wonder, would it be harder or easier to use
fork() instead?

I have just been down this road to some extent with parallel pg_restore,
which uses threads on Windows. That might be useful as a bit of a
template. Extending it to use pthreads would probably be fairly trivial.
The thread/fork specific stuff ended up being fairly isolated for
pg_restore. see src/bin/pg_dump/pg_backup_archiver.c:spawn_restore()

I think you should have it use pthreads if available, or Windows threads
there, or fork() elsewhere.

cheers

andrew

#6Stefan Kaltenbrunner
stefan@kaltenbrunner.cc
In reply to: Tom Lane (#4)
Re: multi-threaded pgbench

Tom Lane wrote:

Alvaro Herrera <alvherre@commandprompt.com> writes:

Itagaki Takahiro wrote:

Is it acceptable to use pthread in contrib module?

We don't have a precedent it seems. I think the requirement would be
that it should compile if pthread support is not present.

Right. Breaking it for non-pthread environments is not acceptable.

The real question here is whether it will be a problem if pgbench
delivers significantly different results when built with or without
threading support. I can see arguents either way on that ...

well pgbench as it is now is now is more ore less unusable on modern
hardware for SELECT type queries(way too slow to scale to what the
backend can do thses days and the number of cores in a recent box).
It is only somewhat usable on the default update heavy test as well
because even there it is hitting scalability limits (ie I can easily
improve on its numbers with a perl script that forks and issues the same
queries).
I would even go as far as issuing a WARNING if pgbench is invoked and
not compiled with threads if we accept this patch...

Stefan

#7Greg Smith
gsmith@gregsmith.com
In reply to: Itagaki Takahiro (#1)
Re: multi-threaded pgbench

On Wed, 8 Jul 2009, Itagaki Takahiro wrote:

Multi-threading would be a solution. The attached patch adds -j
(number of jobs) option to pgbench.

Should probably name this -w "numbers of workers" to stay consistent with
terminology used on the server side.

Is it acceptable to use pthread in contrib module?
If ok, I will add the patch to the next commitfest.

pgbench is basically broken right now, as demonstrated by the lack of
scaling show in your results and similar ones I've collected. This looks
like it fixes the primary problem there. While it would be nice if a
multi-process based solution were written instead, unless someone is
willing to step up and volunteer to write one I'd much rather see your
patch go in than doing nothing at all. It shouldn't even impact old
results if you don't toggle the option on.

I have 3 new server systems I was going to run pgbench on anyway in the
next month as part of my standard performance testing on new hardware.
I'll be happy to mix in results using the multi-threaded pgbench to check
the patch's performance, along with the rest of the initial review here.

--
* Greg Smith gsmith@gregsmith.com http://www.gregsmith.com Baltimore, MD

#8Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andrew Dunstan (#5)
Re: multi-threaded pgbench

Andrew Dunstan <andrew@dunslane.net> writes:

I think you should have it use pthreads if available, or Windows threads
there, or fork() elsewhere.

Hmm, but how will you communicate stats back from the sub-processes?
pg_restore doesn't need anything more than a success/failure result
from its child processes, but I think pgbench will want more.

regards, tom lane

#9Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#8)
Re: multi-threaded pgbench

Tom Lane wrote:

Andrew Dunstan <andrew@dunslane.net> writes:

I think you should have it use pthreads if available, or Windows threads
there, or fork() elsewhere.

Hmm, but how will you communicate stats back from the sub-processes?
pg_restore doesn't need anything more than a success/failure result
from its child processes, but I think pgbench will want more.

My first reaction is to say "use a pipe."

cheers

andtrew

#10Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
In reply to: Andrew Dunstan (#5)
Re: multi-threaded pgbench

Andrew Dunstan <andrew@dunslane.net> wrote:

I think you should have it use pthreads if available, or Windows threads
there, or fork() elsewhere.

Just a question - which platform does not support any threading?
I think threading is very common in modern applications. If there
are such OSes, they seem to be just abandoned and not maintained...

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

#11Greg Smith
gsmith@gregsmith.com
In reply to: Tom Lane (#8)
Re: multi-threaded pgbench

On Wed, 8 Jul 2009, Tom Lane wrote:

pg_restore doesn't need anything more than a success/failure result
from its child processes, but I think pgbench will want more.

The biggest chunk of returned state to consider is how each client
transaction generates a line of latency information that goes into the log
file.

--
* Greg Smith gsmith@gregsmith.com http://www.gregsmith.com Baltimore, MD

#12Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
In reply to: Andrew Dunstan (#9)
1 attachment(s)
Re: multi-threaded pgbench

Here is an updated version of multi-threaded pgbench patch.

Andrew Dunstan <andrew@dunslane.net> wrote:

Hmm, but how will you communicate stats back from the sub-processes?

My first reaction is to say "use a pipe."

I added partial implementation of pthread using fork and pipe for platform
without ENABLE_THREAD_SAFETY. Pthread version is not necessarily needed
if we have the fork version, but I still left it as-is.

The name of new option is still -j, that is borrowed from pg_restore
and gmake. They use -j for multi-worker-processing.

-j NUM number of threads (default: 1)

I needed to modify the meaning of tps (excluding connections establishing)
a little because connections are executed in parallel. I subtract average
of connection times from total execution time.

total_time := last_thread_finish_time - first_thread_start_time
tps (including connection) := num_transaction / total_time
tps (excluding connection) := num_transaction /
(total_time - (total_connection_time / num_threads))

I notice that I also fixed a few parts of pgbench:
* Use instr_time instead of struct timeval.
Macros in portability/instr_time.h makes codes cleaner.
* Accept "\sleep 1ms" format (no spaces between "1" and "ms") for sleep
meta command. The old version of pgbench interprets "1ms" as just "1",
that means "1 s". It was confusable.

I'll add the patch to the commitfest page.

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

Attachments:

pgbench-mt_20090709.patchapplication/octet-stream; name=pgbench-mt_20090709.patchDownload
diff -cpr HEAD/contrib/pgbench/pgbench.c pgbench-mt_20090709/contrib/pgbench/pgbench.c
*** HEAD/contrib/pgbench/pgbench.c	Fri Jun 12 09:52:43 2009
--- pgbench-mt_20090709/contrib/pgbench/pgbench.c	Thu Jul  9 17:22:03 2009
***************
*** 30,35 ****
--- 30,36 ----
  
  #include "libpq-fe.h"
  #include "pqsignal.h"
+ #include "portability/instr_time.h"
  
  #include <ctype.h>
  
***************
*** 55,60 ****
--- 56,80 ----
  #include <sys/resource.h>		/* for getrlimit */
  #endif
  
+ #ifndef INT64_MAX
+ #define INT64_MAX	INT64CONST(0x7FFFFFFFFFFFFFFF)
+ #endif
+ 
+ #ifdef WIN32
+ typedef struct win32_pthread   *pthread_t;
+ typedef int						pthread_attr_t;
+ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg);
+ static int pthread_join(pthread_t th, void **thread_return);
+ #elif defined(ENABLE_THREAD_SAFETY)
+ #include <pthread.h>
+ #else
+ #include <sys/wait.h>
+ typedef struct fork_pthread	   *pthread_t;
+ typedef int						pthread_attr_t;
+ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg);
+ static int pthread_join(pthread_t th, void **thread_return);
+ #endif
+ 
  extern char *optarg;
  extern int	optind;
  
*************** extern int	optind;
*** 71,77 ****
  
  #define DEFAULT_NXACTS	10		/* default nxacts */
  
- int			nclients = 1;		/* default number of simulated clients */
  int			nxacts = 0;			/* number of transactions per client */
  int			duration = 0;		/* duration in seconds */
  
--- 91,96 ----
*************** FILE	   *LOGFILE = NULL;
*** 99,106 ****
  
  bool		use_log;			/* log transaction latencies to a file */
  
- int			remains;			/* number of remaining clients */
- 
  int			is_connect;			/* establish connection  for each transaction */
  
  char	   *pghost = "";
--- 118,123 ----
*************** typedef struct
*** 135,149 ****
  	int			listen;			/* 0 indicates that an async query has been
  								 * sent */
  	int			sleeping;		/* 1 indicates that the client is napping */
! 	struct timeval until;		/* napping until */
  	Variable   *variables;		/* array of variable definitions */
  	int			nvariables;
! 	struct timeval txn_begin;	/* used for measuring latencies */
  	int			use_file;		/* index in sql_files for this client */
  	bool		prepared[MAX_FILES];
  } CState;
  
  /*
   * queries read from files
   */
  #define SQL_COMMAND		1
--- 152,185 ----
  	int			listen;			/* 0 indicates that an async query has been
  								 * sent */
  	int			sleeping;		/* 1 indicates that the client is napping */
! 	int64		until;			/* napping until (usec) */
  	Variable   *variables;		/* array of variable definitions */
  	int			nvariables;
! 	instr_time	txn_begin;		/* used for measuring latencies */
  	int			use_file;		/* index in sql_files for this client */
  	bool		prepared[MAX_FILES];
  } CState;
  
  /*
+  * Thread state and result
+  */
+ typedef struct
+ {
+ 	pthread_t		thread;		/* thread handle */
+ 	CState		   *state;		/* array of CState */
+ 	int				nstate;		/* length of state */
+ 	instr_time		start_time;	/* thread start time */
+ } TState;
+ 
+ #define INVALID_THREAD		((pthread_t) 0)
+ 
+ typedef struct
+ {
+ 	instr_time		conn_time;
+ 	int				xacts;
+ } TResult;
+ 
+ /*
   * queries read from files
   */
  #define SQL_COMMAND		1
*************** typedef struct
*** 168,175 ****
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! Command   **sql_files[MAX_FILES];		/* SQL script files */
! int			num_files;			/* number of script files */
  
  /* default scenario */
  static char *tpc_b = {
--- 204,212 ----
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! static Command	  **sql_files[MAX_FILES];	/* SQL script files */
! static int			num_files;				/* number of script files */
! static int			debug = 0;				/* debug flag */
  
  /* default scenario */
  static char *tpc_b = {
*************** static char *select_only = {
*** 212,255 ****
  	"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
  };
  
- /* Connection overhead time */
- static struct timeval conn_total_time = {0, 0};
- 
  /* Function prototypes */
  static void setalarm(int seconds);
! 
! 
! /* Calculate total time */
! static void
! addTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
! {
! 	int			sec = t1->tv_sec + t2->tv_sec;
! 	int			usec = t1->tv_usec + t2->tv_usec;
! 
! 	if (usec >= 1000000)
! 	{
! 		usec -= 1000000;
! 		sec++;
! 	}
! 	result->tv_sec = sec;
! 	result->tv_usec = usec;
! }
! 
! /* Calculate time difference */
! static void
! diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
! {
! 	int			sec = t1->tv_sec - t2->tv_sec;
! 	int			usec = t1->tv_usec - t2->tv_usec;
! 
! 	if (usec < 0)
! 	{
! 		usec += 1000000;
! 		sec--;
! 	}
! 	result->tv_sec = sec;
! 	result->tv_usec = usec;
! }
  
  static void
  usage(const char *progname)
--- 249,257 ----
  	"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
  };
  
  /* Function prototypes */
  static void setalarm(int seconds);
! static void* threadRun(void *arg);
  
  static void
  usage(const char *progname)
*************** usage(const char *progname)
*** 267,272 ****
--- 269,275 ----
  		   "  -D VARNAME=VALUE\n"
  		   "               define variable for use by custom script\n"
  		   "  -f FILENAME  read transaction script from FILENAME\n"
+ 		   "  -j NUM       number of threads (default: 1)\n"
  		   "  -l           write transaction times to log file\n"
  		   "  -M {simple|extended|prepared}\n"
  		   "               protocol for submitting queries to server (default: simple)\n"
*************** discard_response(CState *state)
*** 376,404 ****
  	} while (res);
  }
  
- /* check to see if the SQL result was good */
- static int
- check(CState *state, PGresult *res, int n)
- {
- 	CState	   *st = &state[n];
- 
- 	switch (PQresultStatus(res))
- 	{
- 		case PGRES_COMMAND_OK:
- 		case PGRES_TUPLES_OK:
- 			/* OK */
- 			break;
- 		default:
- 			fprintf(stderr, "Client %d aborted in state %d: %s",
- 					n, st->state, PQerrorMessage(st->con));
- 			remains--;			/* I've aborted */
- 			PQfinish(st->con);
- 			st->con = NULL;
- 			return (-1);
- 	}
- 	return (0);					/* OK */
- }
- 
  static int
  compareVariables(const void *v1, const void *v2)
  {
--- 379,384 ----
*************** preparedStatementName(char *buffer, int 
*** 595,605 ****
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static void
! doCustom(CState *state, int n, int debug)
  {
  	PGresult   *res;
- 	CState	   *st = &state[n];
  	Command   **commands;
  
  top:
--- 575,598 ----
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static bool
! clientDone(CState *st, bool ok)
! {
! 	(void) ok;	/* unused */
! 
! 	if (st->con != NULL)
! 	{
! 		PQfinish(st->con);
! 		st->con = NULL;
! 	}
! 	return false;	/* always false */
! }
! 
! /* return false iff client should be disconnected */
! static bool
! doCustom(CState *st, instr_time *conn_time)
  {
  	PGresult   *res;
  	Command   **commands;
  
  top:
*************** top:
*** 607,622 ****
  
  	if (st->sleeping)
  	{							/* are we sleeping? */
! 		int			usec;
! 		struct timeval now;
  
! 		gettimeofday(&now, NULL);
! 		usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
! 			st->until.tv_usec - now.tv_usec;
! 		if (usec <= 0)
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return;				/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
--- 600,612 ----
  
  	if (st->sleeping)
  	{							/* are we sleeping? */
! 		instr_time	now;
  
! 		INSTR_TIME_SET_CURRENT(now);
! 		if (st->until <= INSTR_TIME_GET_MICROSEC(now))
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return true;		/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
*************** top:
*** 624,640 ****
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", n);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
! 				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
! 				return;
  			}
  			if (PQisBusy(st->con))
! 				return;			/* don't have the whole result yet */
  		}
  
  		/*
--- 614,627 ----
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", st->id);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
! 				return clientDone(st, false);
  			}
  			if (PQisBusy(st->con))
! 				return true;	/* don't have the whole result yet */
  		}
  
  		/*
*************** top:
*** 642,666 ****
  		 */
  		if (use_log && commands[st->state + 1] == NULL)
  		{
! 			double		diff;
! 			struct timeval now;
! 
! 			gettimeofday(&now, NULL);
! 			diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
! 				(int) (now.tv_usec - st->txn_begin.tv_usec);
! 
! 			fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
! 					st->id, st->cnt, diff, st->use_file,
! 					(long) now.tv_sec, (long) now.tv_usec);
  		}
  
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			if (check(state, res, n))
  			{
! 				PQclear(res);
! 				return;
  			}
  			PQclear(res);
  			discard_response(st);
--- 629,663 ----
  		 */
  		if (use_log && commands[st->state + 1] == NULL)
  		{
! 			instr_time	diff;
! 			double		sec;
! 			double		msec;
! 			double		usec;
! 
! 			INSTR_TIME_SET_CURRENT(diff);
! 			INSTR_TIME_SUBTRACT(diff, st->txn_begin);
! 			sec = INSTR_TIME_GET_DOUBLE(diff);
! 			msec = INSTR_TIME_GET_MILLISEC(diff);
! 			usec = (double) INSTR_TIME_GET_MICROSEC(diff);
! 
! 			fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n",
! 					st->id, st->cnt, usec, st->use_file,
! 					sec, usec - sec * 1000.0);
  		}
  
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			switch (PQresultStatus(res))
  			{
! 				case PGRES_COMMAND_OK:
! 				case PGRES_TUPLES_OK:
! 					break;	/* OK */
! 				default:
! 					fprintf(stderr, "Client %d aborted in state %d: %s",
! 						st->id, st->state, PQerrorMessage(st->con));
! 					PQclear(res);
! 					return clientDone(st, false);
  			}
  			PQclear(res);
  			discard_response(st);
*************** top:
*** 676,690 ****
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 			{
! 				remains--;		/* I've done */
! 				if (st->con != NULL)
! 				{
! 					PQfinish(st->con);
! 					st->con = NULL;
! 				}
! 				return;
! 			}
  		}
  
  		/* increment state counter */
--- 673,679 ----
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 				return clientDone(st, true);	/* exit success */
  		}
  
  		/* increment state counter */
*************** top:
*** 699,725 ****
  
  	if (st->con == NULL)
  	{
! 		struct timeval t1,
! 					t2,
! 					t3;
  
! 		gettimeofday(&t1, NULL);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n",
! 					n);
! 			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
! 			return;
  		}
! 		gettimeofday(&t2, NULL);
! 		diffTime(&t2, &t1, &t3);
! 		addTime(&conn_total_time, &t3, &conn_total_time);
  	}
  
  	if (use_log && st->state == 0)
! 		gettimeofday(&(st->txn_begin), NULL);
  
  	if (commands[st->state]->type == SQL_COMMAND)
  	{
--- 688,707 ----
  
  	if (st->con == NULL)
  	{
! 		instr_time	start, end;
  
! 		INSTR_TIME_SET_CURRENT(start);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
! 			return clientDone(st, false);
  		}
! 		INSTR_TIME_SET_CURRENT(end);
! 		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
  	}
  
  	if (use_log && st->state == 0)
! 		INSTR_TIME_SET_CURRENT(st->txn_begin);
  
  	if (commands[st->state]->type == SQL_COMMAND)
  	{
*************** top:
*** 735,745 ****
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
--- 717,727 ----
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return true;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
*************** top:
*** 751,757 ****
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
--- 733,739 ----
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
*************** top:
*** 785,791 ****
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
--- 767,773 ----
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
*************** top:
*** 795,801 ****
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
  			st->ecnt++;
  		}
  		else
--- 777,783 ----
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
  			st->ecnt++;
  		}
  		else
*************** top:
*** 809,815 ****
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", n, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
--- 791,797 ----
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
*************** top:
*** 828,834 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				min = atoi(var);
  			}
--- 810,816 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				min = atoi(var);
  			}
*************** top:
*** 850,856 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  				max = atoi(var);
  			}
--- 832,838 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  				max = atoi(var);
  			}
*************** top:
*** 861,867 ****
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return;
  			}
  
  #ifdef DEBUG
--- 843,849 ----
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return true;
  			}
  
  #ifdef DEBUG
*************** top:
*** 873,879 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 855,861 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
*************** top:
*** 891,897 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				ope1 = atoi(var);
  			}
--- 873,879 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				ope1 = atoi(var);
  			}
*************** top:
*** 908,914 ****
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return;
  					}
  					ope2 = atoi(var);
  				}
--- 890,896 ----
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return true;
  					}
  					ope2 = atoi(var);
  				}
*************** top:
*** 927,933 ****
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
--- 909,915 ----
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return true;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
*************** top:
*** 935,941 ****
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  			}
  
--- 917,923 ----
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  			}
  
*************** top:
*** 943,949 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 925,931 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
*************** top:
*** 952,958 ****
  		{
  			char	   *var;
  			int			usec;
! 			struct timeval now;
  
  			if (*argv[1] == ':')
  			{
--- 934,940 ----
  		{
  			char	   *var;
  			int			usec;
! 			instr_time now;
  
  			if (*argv[1] == ':')
  			{
*************** top:
*** 960,966 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return;
  				}
  				usec = atoi(var);
  			}
--- 942,948 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return true;
  				}
  				usec = atoi(var);
  			}
*************** top:
*** 977,985 ****
  			else
  				usec *= 1000000;
  
! 			gettimeofday(&now, NULL);
! 			st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000;
! 			st->until.tv_usec = (now.tv_usec + usec) % 1000000;
  			st->sleeping = 1;
  
  			st->listen = 1;
--- 959,966 ----
  			else
  				usec *= 1000000;
  
! 			INSTR_TIME_SET_CURRENT(now);
! 			st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
  			st->sleeping = 1;
  
  			st->listen = 1;
*************** top:
*** 987,1004 ****
  
  		goto top;
  	}
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state)
  {
  	int			i;
  
! 	for (i = 0; i < nclients; i++)
  	{
  		if (state[i].con)
  			PQfinish(state[i].con);
  	}
  }
  
--- 968,990 ----
  
  		goto top;
  	}
+ 
+ 	return true;
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state, int length)
  {
  	int			i;
  
! 	for (i = 0; i < length; i++)
  	{
  		if (state[i].con)
+ 		{
  			PQfinish(state[i].con);
+ 			state[i].con = NULL;
+ 		}
  	}
  }
  
*************** process_commands(char *buf)
*** 1264,1269 ****
--- 1250,1273 ----
  				return NULL;
  			}
  
+ 			/*
+ 			 * Split argument into number and unit for "sleep 1ms" or so.
+ 			 * We don't have to terminate the number argument with null
+ 			 * because it will parsed with atoi, that ignores trailing
+ 			 * non-digit characters.
+ 			 */
+ 			if (my_commands->argv[1][0] != ':')
+ 			{
+ 				char	*c = my_commands->argv[1];
+ 				while (isdigit(*c)) { c++; }
+ 				if (*c)
+ 				{
+ 					my_commands->argv[2] = c;
+ 					if (my_commands->argc < 3)
+ 						my_commands->argc = 3;
+ 				}
+ 			}
+ 
  			if (my_commands->argc >= 3)
  			{
  				if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
*************** process_builtin(char *tb)
*** 1450,1474 ****
  
  /* print out results */
  static void
! printResults(
! 			 int ttype, CState *state,
! 			 struct timeval * start_time, struct timeval * end_time)
  {
! 	double		t1,
! 				t2;
! 	int			i;
! 	int			normal_xacts = 0;
  	char	   *s;
  
! 	for (i = 0; i < nclients; i++)
! 		normal_xacts += state[i].cnt;
! 
! 	t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec);
! 	t1 = normal_xacts * 1000000.0 / t1;
! 
! 	t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
! 		(end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
! 	t2 = normal_xacts * 1000000.0 / t2;
  
  	if (ttype == 0)
  		s = "TPC-B (sort of)";
--- 1454,1471 ----
  
  /* print out results */
  static void
! printResults(int ttype, int normal_xacts, int nclients, int nthreads,
! 			 instr_time total_time, instr_time conn_total_time)
  {
! 	double		time_include,
! 				tps_include,
! 				tps_exclude;
  	char	   *s;
  
! 	time_include = INSTR_TIME_GET_DOUBLE(total_time);
! 	tps_include = normal_xacts / time_include;
! 	tps_exclude = normal_xacts / (time_include -
! 		(INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
  
  	if (ttype == 0)
  		s = "TPC-B (sort of)";
*************** printResults(
*** 1483,1488 ****
--- 1480,1486 ----
  	printf("scaling factor: %d\n", scale);
  	printf("query mode: %s\n", QUERYMODE[querymode]);
  	printf("number of clients: %d\n", nclients);
+ 	printf("number of threads: %d\n", nthreads);
  	if (duration <= 0)
  	{
  		printf("number of transactions per client: %d\n", nxacts);
*************** printResults(
*** 1495,1502 ****
  		printf("number of transactions actually processed: %d\n",
  			   normal_xacts);
  	}
! 	printf("tps = %f (including connections establishing)\n", t1);
! 	printf("tps = %f (excluding connections establishing)\n", t2);
  }
  
  
--- 1493,1500 ----
  		printf("number of transactions actually processed: %d\n",
  			   normal_xacts);
  	}
! 	printf("tps = %f (including connections establishing)\n", tps_include);
! 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
  }
  
  
*************** int
*** 1504,1532 ****
  main(int argc, char **argv)
  {
  	int			c;
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
- 	int			debug = 0;		/* debug flag */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
  
! 	struct timeval start_time;	/* start up time */
! 	struct timeval end_time;	/* end time */
  
  	int			i;
  
- 	fd_set		input_mask;
- 	int			nsocks;			/* return from select(2) */
- 	int			maxsock;		/* max socket number to be waited */
- 	struct timeval now;
- 	struct timeval timeout;
- 	int			min_usec;
- 
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
--- 1502,1527 ----
  main(int argc, char **argv)
  {
  	int			c;
+ 	int			nclients = 1;		/* default number of simulated clients */
+ 	int			nthreads = 1;		/* default number of threads */
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
+ 	TState	   *threads;		/* array of thread */
  
! 	instr_time	start_time;		/* start up time */
! 	instr_time	total_time;
! 	instr_time	conn_total_time;
! 	int			total_xacts;
  
  	int			i;
  
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
*************** main(int argc, char **argv)
*** 1576,1582 ****
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
  	{
  		switch (c)
  		{
--- 1571,1577 ----
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
  	{
  		switch (c)
  		{
*************** main(int argc, char **argv)
*** 1629,1634 ****
--- 1624,1637 ----
  				}
  #endif   /* HAVE_GETRLIMIT */
  				break;
+ 			case 'j':	/* jobs */
+ 				nthreads = atoi(optarg);
+ 				if (nthreads <= 0)
+ 				{
+ 					fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'C':
  				is_connect = 1;
  				break;
*************** main(int argc, char **argv)
*** 1749,1755 ****
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	remains = nclients;
  
  	if (nclients > 1)
  	{
--- 1752,1762 ----
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	if (nclients % nthreads != 0)
! 	{
! 		fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads);
! 		exit(1);
! 	}
  
  	if (nclients > 1)
  	{
*************** main(int argc, char **argv)
*** 1767,1772 ****
--- 1774,1780 ----
  		{
  			int			j;
  
+ 			state[i].id = i;
  			for (j = 0; j < state[0].nvariables; j++)
  			{
  				if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
*************** main(int argc, char **argv)
*** 1876,1908 ****
  	PQfinish(con);
  
  	/* set random seed */
! 	gettimeofday(&start_time, NULL);
! 	srandom((unsigned int) start_time.tv_usec);
! 
! 	/* get start up time */
! 	gettimeofday(&start_time, NULL);
! 
! 	/* set alarm if duration is specified. */
! 	if (duration > 0)
! 		setalarm(duration);
! 
! 	if (is_connect == 0)
! 	{
! 		struct timeval t,
! 					now;
! 
! 		/* make connections to the database */
! 		for (i = 0; i < nclients; i++)
! 		{
! 			state[i].id = i;
! 			if ((state[i].con = doConnect()) == NULL)
! 				exit(1);
! 		}
! 		/* time after connections set up */
! 		gettimeofday(&now, NULL);
! 		diffTime(&now, &start_time, &t);
! 		addTime(&conn_total_time, &t, &conn_total_time);
! 	}
  
  	/* process bultin SQL scripts */
  	switch (ttype)
--- 1884,1891 ----
  	PQfinish(con);
  
  	/* set random seed */
! 	INSTR_TIME_SET_CURRENT(start_time);
! 	srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
  
  	/* process bultin SQL scripts */
  	switch (ttype)
*************** main(int argc, char **argv)
*** 1926,2065 ****
  			break;
  	}
  
  	/* send start up queries in async manner */
! 	for (i = 0; i < nclients; i++)
  	{
! 		Command   **commands = sql_files[state[i].use_file];
! 		int			prev_ecnt = state[i].ecnt;
  
! 		state[i].use_file = getrand(0, num_files - 1);
! 		doCustom(state, i, debug);
  
! 		if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
  			remains--;			/* I've aborted */
! 			PQfinish(state[i].con);
! 			state[i].con = NULL;
  		}
  	}
  
! 	for (;;)
  	{
! 		if (remains <= 0)
! 		{						/* all done ? */
! 			disconnect_all(state);
! 			/* get end time */
! 			gettimeofday(&end_time, NULL);
! 			printResults(ttype, state, &start_time, &end_time);
! 			if (LOGFILE)
! 				fclose(LOGFILE);
! 			exit(0);
! 		}
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
! 		min_usec = -1;
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
  
! 			if (state[i].sleeping)
  			{
  				int			this_usec;
- 				int			sock = PQsocket(state[i].con);
  
! 				if (min_usec < 0)
  				{
! 					gettimeofday(&now, NULL);
! 					min_usec = 0;
  				}
  
! 				this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
! 					state[i].until.tv_usec - now.tv_usec;
! 
! 				if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
  					min_usec = this_usec;
- 
- 				FD_SET		(sock, &input_mask);
- 
- 				if (maxsock < sock)
- 					maxsock = sock;
  			}
! 			else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
  			{
! 				int			sock = PQsocket(state[i].con);
! 
! 				if (sock < 0)
! 				{
! 					disconnect_all(state);
! 					exit(1);
! 				}
! 				FD_SET		(sock, &input_mask);
  
! 				if (maxsock < sock)
! 					maxsock = sock;
  			}
  		}
  
! 		if (maxsock != -1)
  		{
! 			if (min_usec >= 0)
  			{
  				timeout.tv_sec = min_usec / 1000000;
  				timeout.tv_usec = min_usec % 1000000;
! 
! 				nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
! 								(fd_set *) NULL, &timeout);
  			}
  			else
! 				nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
! 								(fd_set *) NULL, (struct timeval *) NULL);
  			if (nsocks < 0)
  			{
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
- 				disconnect_all(state);
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				exit(1);
  			}
- #ifdef NOT_USED
- 			else if (nsocks == 0)
- 			{					/* timeout */
- 				fprintf(stderr, "select timeout\n");
- 				for (i = 0; i < nclients; i++)
- 				{
- 					fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
- 							i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
- 				}
- 				exit(0);
- 			}
- #endif
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
! 			int			prev_ecnt = state[i].ecnt;
  
! 			if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
! 						  || commands[state[i].state]->type == META_COMMAND))
  			{
! 				doCustom(state, i, debug);
  			}
  
! 			if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
  				remains--;		/* I've aborted */
! 				PQfinish(state[i].con);
! 				state[i].con = NULL;
  			}
  		}
  	}
  }
  
  
--- 1909,2135 ----
  			break;
  	}
  
+ 	/* get start up time */
+ 	INSTR_TIME_SET_CURRENT(start_time);
+ 
+ 	/* set alarm if duration is specified. */
+ 	if (duration > 0)
+ 		setalarm(duration);
+ 
+ 	/* start threads */
+ 	threads = (TState *) malloc(sizeof(TState) * nthreads);
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ 		threads[i].state = &state[nclients / nthreads * i];
+ 		threads[i].nstate = nclients / nthreads;
+ 		INSTR_TIME_SET_CURRENT(threads[i].start_time);
+ 
+ 		/* the first thread (i = 0) is executed by main thread */
+ 		if (i > 0)
+ 		{
+ 			int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+ 			if (err != 0 || threads[i].thread == INVALID_THREAD)
+ 			{
+ 				fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+ 				exit(1);
+ 			}
+ 		}
+ 		else
+ 		{
+ 			threads[i].thread = INVALID_THREAD;
+ 		}
+ 	}
+ 
+ 	/* wait for threads and accumulate results */
+ 	total_xacts = 0;
+ 	INSTR_TIME_SET_ZERO(conn_total_time);
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ 		void *ret = NULL;
+ 
+ 		if (threads[i].thread == INVALID_THREAD)
+ 			ret = threadRun(&threads[i]);
+ 		else
+ 			pthread_join(threads[i].thread, &ret);
+ 
+ 		if (ret != NULL)
+ 		{
+ 			TResult *r = (TResult *) ret;
+ 			total_xacts += r->xacts;
+ 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
+ 			free(ret);
+ 		}
+ 	}
+ 	disconnect_all(state, nclients);
+ 
+ 	/* get end time */
+ 	INSTR_TIME_SET_CURRENT(total_time);
+ 	INSTR_TIME_SUBTRACT(total_time, start_time);
+ 	printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
+ 	if (LOGFILE)
+ 		fclose(LOGFILE);
+ 
+ 	return 0;
+ }
+ 
+ static void *
+ threadRun(void *arg)
+ {
+ 	TState	   *thread = (TState *) arg;
+ 	CState	   *state = thread->state;
+ 	TResult	   *result;
+ 	instr_time	start, end;
+ 	int			nstate = thread->nstate;
+ 	int			remains = nstate;	/* number of remaining clients */
+ 	int			i;
+ 
+ 	result = malloc(sizeof(TResult));
+ 	INSTR_TIME_SET_ZERO(result->conn_time);
+ 
+ 	if (is_connect == 0)
+ 	{
+ 		/* make connections to the database */
+ 		for (i = 0; i < nstate; i++)
+ 		{
+ 			if ((state[i].con = doConnect()) == NULL)
+ 				goto done;
+ 		}
+ 	}
+ 
+ 	/* time after thread and connections set up */
+ 	INSTR_TIME_SET_CURRENT(result->conn_time);
+ 	INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+ 
  	/* send start up queries in async manner */
! 	for (i = 0; i < nstate; i++)
  	{
! 		CState	   *st = &state[i];
! 		Command   **commands = sql_files[st->use_file];
! 		int			prev_ecnt = st->ecnt;
  
! 		st->use_file = getrand(0, num_files - 1);
! 		if (!doCustom(st, &result->conn_time))
! 			remains--;		/* I've aborted */
  
! 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
  			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
  		}
  	}
  
! 	while (remains > 0)
  	{
! 		fd_set			input_mask;
! 		int				maxsock;		/* max socket number to be waited */
! 		int64			now_usec = 0;
! 		int64			min_usec;
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
! 		min_usec = INT64_MAX;
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
! 			int			sock;
  
! 			if (st->sleeping)
  			{
  				int			this_usec;
  
! 				if (min_usec == INT64_MAX)
  				{
! 					instr_time	now;
! 					INSTR_TIME_SET_CURRENT(now);
! 					now_usec = INSTR_TIME_GET_MICROSEC(now);
  				}
  
! 				this_usec = st->until - now_usec;
! 				if (min_usec > this_usec)
  					min_usec = this_usec;
  			}
! 			else if (st->con == NULL)
  			{
! 				continue;
! 			}
! 			else if (commands[st->state]->type == META_COMMAND)
! 			{
! 				min_usec = 0;	/* the connection is ready to run */
! 				break;
! 			}
  
! 			sock = PQsocket(st->con);
! 			if (sock < 0)
! 			{
! 				fprintf(stderr, "bad socket: %s\n", strerror(errno));
! 				goto done;
  			}
+ 
+ 			FD_SET(sock, &input_mask);
+ 			if (maxsock < sock)
+ 				maxsock = sock;
  		}
  
! 		if (min_usec > 0 && maxsock != -1)
  		{
! 			int		nsocks;			/* return from select(2) */
! 
! 			if (min_usec != INT64_MAX)
  			{
+ 				struct timeval	timeout;
  				timeout.tv_sec = min_usec / 1000000;
  				timeout.tv_usec = min_usec % 1000000;
! 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
  			}
  			else
! 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
  			if (nsocks < 0)
  			{
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				goto done;
  			}
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
! 			int			prev_ecnt = st->ecnt;
  
! 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
! 						  || commands[st->state]->type == META_COMMAND))
  			{
! 				if (!doCustom(st, &result->conn_time))
! 					remains--;		/* I've aborted */
  			}
  
! 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
  				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
  			}
  		}
  	}
+ 
+ done:
+ 	INSTR_TIME_SET_CURRENT(start);
+ 	disconnect_all(state, nstate);
+ 	result->xacts = 0;
+ 	for (i = 0; i < nstate; i++)
+ 		result->xacts += state[i].cnt;
+ 	INSTR_TIME_SET_CURRENT(end);
+ 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ 	return result;
  }
  
  
*************** setalarm(int seconds)
*** 2081,2086 ****
--- 2151,2228 ----
  	pqsignal(SIGALRM, handle_sig_alarm);
  	alarm(seconds);
  }
+ 
+ #ifndef ENABLE_THREAD_SAFETY
+ 
+ /*
+  * implements pthread using fork.
+  */
+ 
+ typedef struct fork_pthread
+ {
+ 	pid_t	pid;
+ 	int		pipes[2];
+ } fork_pthread;
+ 
+ static int
+ pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg)
+ {
+ 	fork_pthread   *th;
+ 	void		   *ret;
+ 
+ 	th = (fork_pthread *) malloc(sizeof(fork_pthread));
+ 	pipe(th->pipes);
+ 
+ 	th->pid = fork();
+ 	if (th->pid == -1)	/* error */
+ 	{
+ 		free(th);
+ 		return errno;
+ 	}
+ 	if (th->pid != 0)	/* parent process */
+ 	{
+ 		close(th->pipes[1]);
+ 		*thread = th;
+ 		return 0;
+ 	}
+ 
+ 	/* child process */
+ 	close(th->pipes[0]);
+ 
+ 	/* set alarm again because the child does not inherit timers */
+ 	if (duration > 0)
+ 		setalarm(duration);
+ 
+ 	ret = start_routine(arg);
+ 	write(th->pipes[1], ret, sizeof(TResult));
+ 	close(th->pipes[1]);
+ 	free(th);
+ 	exit(0);
+ }
+ 
+ static int
+ pthread_join(pthread_t th, void **thread_return)
+ {
+ 	int		status;
+ 
+ 	while (waitpid(th->pid, &status, 0) != th->pid)
+ 	{
+ 		if (errno != EINTR)
+ 			return errno;
+ 	}
+ 
+ 	if (thread_return != NULL)
+ 	{
+ 		*thread_return = malloc(sizeof(TResult));
+ 		read(th->pipes[0], *thread_return, sizeof(TResult));
+ 	}
+ 
+ 	free(th);
+ 	return 0;
+ }
+ 
+ #endif
+ 
  #else							/* WIN32 */
  
  static VOID CALLBACK
*************** setalarm(int seconds)
*** 2107,2110 ****
--- 2249,2318 ----
  	}
  }
  
+ /* partial pthread implementation for Windows */
+ 
+ typedef struct win32_pthread
+ {
+ 	HANDLE		handle;
+ 	void	   *(*routine)(void *);
+ 	void	   *arg;
+ 	void	   *result;
+ } win32_pthread;
+ 
+ static unsigned __stdcall
+ win32_pthread_run(void *arg)
+ {
+ 	win32_pthread *th = (win32_pthread *) arg;
+ 
+ 	th->result = th->routine(th->arg);
+ 
+ 	return 0;
+ }
+ 
+ static int
+ pthread_create(pthread_t *thread,
+ 			   pthread_attr_t *attr,
+ 			   void * (*start_routine)(void *),
+ 			   void * arg)
+ {
+ 	int				save_errno;
+ 	win32_pthread   *th;
+ 
+ 	th = (win32_pthread *) malloc(sizeof(win32_pthread));
+ 	th->routine = start_routine;
+ 	th->arg = arg;
+ 	th->result = NULL;
+ 
+ 	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+ 	if (th->handle == NULL)
+ 	{
+ 		save_errno = errno;
+ 		free(th);
+ 		return save_errno;
+ 	}
+ 
+ 	*thread = th;
+ 	return 0;
+ }
+ 
+ static int
+ pthread_join(pthread_t th, void **thread_return)
+ {
+ 	if (th == NULL || th->handle == NULL)
+ 		return errno = EINVAL;
+ 
+ 	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+ 	{
+ 		_dosmaperr(GetLastError());
+ 		return errno;
+ 	}
+ 
+ 	if (thread_return)
+ 		*thread_return = th->result;
+ 
+ 	CloseHandle(th->handle);
+ 	free(th);
+ 	return 0;
+ }
+ 
  #endif   /* WIN32 */
diff -cpr HEAD/doc/src/sgml/pgbench.sgml pgbench-mt_20090709/doc/src/sgml/pgbench.sgml
*** HEAD/doc/src/sgml/pgbench.sgml	Mon May 11 11:33:20 2009
--- pgbench-mt_20090709/doc/src/sgml/pgbench.sgml	Thu Jul  9 17:22:03 2009
*************** pgbench <optional> <replaceable>options<
*** 172,177 ****
--- 172,185 ----
        </entry>
       </row>
       <row>
+       <entry><literal>-j</literal> <replaceable>threads</></entry>
+       <entry>
+        Number of worker threads. Clients are equally-divided into those
+        threads and executed in it. The number of clients must be a multiple
+        number of threads. Default is 1.
+       </entry>
+      </row>
+      <row>
        <entry><literal>-t</literal> <replaceable>transactions</></entry>
        <entry>
         Number of transactions each client runs.  Default is 10.
#13Robert Haas
robertmhaas@gmail.com
In reply to: Itagaki Takahiro (#12)
Re: multi-threaded pgbench

On Thu, Jul 9, 2009 at 4:51 AM, Itagaki
Takahiro<itagaki.takahiro@oss.ntt.co.jp> wrote:

Here is an updated version of multi-threaded pgbench patch.

Greg (Smith), do you have time to review this version? If not, I will
assign a round-robin reviewer when one becomes available.

...Robert

#14Greg Stark
gsstark@mit.edu
In reply to: Robert Haas (#13)
Re: multi-threaded pgbench

On Sat, Jul 18, 2009 at 8:25 PM, Robert Haas<robertmhaas@gmail.com> wrote:

On Thu, Jul 9, 2009 at 4:51 AM, Itagaki
Takahiro<itagaki.takahiro@oss.ntt.co.jp> wrote:

Here is an updated version of multi-threaded pgbench patch.

Greg (Smith), do you have time to review this version?  If not, I will
assign a round-robin reviewer when one becomes available.

Incidentally you could assign me something if you want.

I gave feedback on Simon/Your join removal and the Append min/max
patch. I don't think either has really reached any conclusive
"finished" state though. I suppose I should mark your patch as
"returned with feedback" even if it's mostly just "good work, keep
going"? And the other patch isn't actually in this commitfest but I
think we're still discussing what it should do.

--
greg
http://mit.edu/~gsstark/resume.pdf

#15Josh Berkus
josh@agliodbs.com
In reply to: Robert Haas (#13)
Re: multi-threaded pgbench

Greg (Smith), do you have time to review this version? If not, I will
assign a round-robin reviewer when one becomes available.

I can do a concurrency test of this next week.

--
Josh Berkus
PostgreSQL Experts Inc.
www.pgexperts.com

#16Robert Haas
robertmhaas@gmail.com
In reply to: Greg Stark (#14)
Re: multi-threaded pgbench

On Jul 18, 2009, at 3:40 PM, Greg Stark <gsstark@mit.edu> wrote:

On Sat, Jul 18, 2009 at 8:25 PM, Robert Haas<robertmhaas@gmail.com>
wrote:

On Thu, Jul 9, 2009 at 4:51 AM, Itagaki
Takahiro<itagaki.takahiro@oss.ntt.co.jp> wrote:

Here is an updated version of multi-threaded pgbench patch.

Greg (Smith), do you have time to review this version? If not, I
will
assign a round-robin reviewer when one becomes available.

Incidentally you could assign me something if you want.

OK.

I gave feedback on Simon/Your join removal and the Append min/max
patch. I don't think either has really reached any conclusive
"finished" state though. I suppose I should mark your patch as
"returned with feedback" even if it's mostly just "good work, keep
going"? And the other patch isn't actually in this commitfest but I
think we're still discussing what it should do.

Well, I think we really need Tom to look at join removal. If he
doesn't have any better ideas for how to structure the code it's not
clear to me that we shouldn't just commit what I already did and then
start future work from there. But this seems like an issue for that
thread rather than this one.

Wrt append min/max I think we should postpone further discussion until
end of commitfest, since it was submitted mid-CommitFest.

...Robert

#17Robert Haas
robertmhaas@gmail.com
In reply to: Josh Berkus (#15)
Re: multi-threaded pgbench

On Sun, Jul 19, 2009 at 12:50 AM, Josh Berkus<josh@agliodbs.com> wrote:

Greg (Smith), do you have time to review this version?  If not, I will
assign a round-robin reviewer when one becomes available.

I can do a concurrency test of this next week.

Sounds good.

...Robert

#18Greg Smith
gsmith@gregsmith.com
In reply to: Itagaki Takahiro (#12)
Re: multi-threaded pgbench

I just took multi-threaded pgbench for an initial spin, looks good overall
with only a couple of small rough edges.

The latest code works differently depending on whether you compiled with
--enable-thread-safety or not, it defines some structures based on fork if
it's not enabled:

#elif defined(ENABLE_THREAD_SAFETY)
#include <pthread.h>
#else
#include <sys/wait.h>
typedef struct fork_pthread *pthread_t;
typedef int pthread_attr_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void
* (*start_routine)(void *), void * arg);
static int pthread_join(pthread_t th, void **thread_return);
#endif

That second code path, when --enable-thread-safety is turned off, crashes
and burns on my Linux system:

gcc -O2 -Wall -Wmissing-prototypes -Wpointer-arith
-Wdeclaration-after-statement -Wendif-labels -fno-strict-aliasing -fwrapv
-I../../src/interfaces/libpq -I. -I../../src/include -D_GNU_SOURCE -c -o
pgbench.o pgbench.c -MMD -MP -MF .deps/pgbench.Po
pgbench.c:72: error: conflicting types for pthread_t
/usr/include/bits/pthreadtypes.h:50: error: previous declaration of
pthread_t was here
pgbench.c:73: error: conflicting types for pthread_attr_t
/usr/include/bits/pthreadtypes.h:57: error: previous declaration of
pthread_attr_t was here

So that's the first problem to sort out, I was planning to test that path
as well as the regular threaded one. Since I'd expect there to be Linux
packages built both with and without thread safety enabled, they both
should work, even though people should always be turning safety on
nowadays.

We should try to get a Windows based tester here too at some point,
there's a completely different set of thread wrapper code for that OS that
could use a look by somebody more familiar than me with that platform.

The second thing that concerns me is that there's a limitation in the code
where the number of clients must be a multiple of the number of workers.
When I tried to gradually step up the client volume the tests wouldn't
run:

$ ./pgbench -j 16 -S -c 24 -t 10000 pgbench
number of clients (24) must be a multiple number of threads (16)

Once the larger issues are worked out, I would be much friendlier if it
were possible to pass new threads a client count so that the last in the
pool could service a smaller number. The logic for that is kind of a
pain--in this case you'd want 8 threads running 2 clients each while 8 ran
1 client--but it would really be much friendlier and flexible that way.

Onto performance. My test system has a 16 cores of Xeon X5550 @ 2.67GHz.
I created a little pgbench database (-s 10) and used the default
postgresql.conf parameters for everything but max_connections for a rough
initial test.

Performance on this box drops off pretty fast once you get past 16
clients; using the original, unpatched pgbench:

c tps
16 86887
24 70685
32 63787
64 64712
128 60602

A quick test of the new version suggest that there's no glaring
performance regression running it with a single client thread:

$ ./pgbench.orig -S -c 64 -t 10000 pgbench
tps = 64712.451737 (including connections establishing)

$ ./pgbench -S -c 64 -t 10000 pgbench
tps = 63806.494046 (including connections establishing)

So I moved onto to testing with a worker thread per CPU:

./pgbench -j 16 -S -c 16 -t 100000 pgbench
./pgbench -j 16 -S -c 32 -t 50000 pgbench
./pgbench -j 16 -S -c 64 -t 10000 pgbench
./pgbench -j 16 -S -c 128 -t 10000 pgbench

And got considerably better results:

c tps
16 96223
32 89014
64 82487
128 74217

That's as much as a 40% speedup @ 32 clients, and even a decent win at
lower counts.

The patch looks like it accomplishes its performance goals quite well
here. I'll be glad to run some more extensive performance tests, but I'd
like to at least see the version without --enable-thread-safety fixed
first so that I can queue up and compare both versions when I go through
that.

--
* Greg Smith gsmith@gregsmith.com http://www.gregsmith.com Baltimore, MD

#19Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
In reply to: Greg Smith (#18)
Re: multi-threaded pgbench

Greg Smith <gsmith@gregsmith.com> wrote:

That second code path, when --enable-thread-safety is turned off, crashes
and burns on my Linux system:

It comes from confliction of identifiers.
Renaming identifiers with #define can solve the errors:

#define pthread_t pg_pthread_t
#define pthread_attr_t pg_pthread_attr_t
#define pthread_create pg_pthread_create
#define pthread_join pg_pthread_join
typedef struct fork_pthread *pthread_t;
...

Another idea is that we don't use pthread and add 'pg_thread' wrapper
module on the top of pthread.

We can choose either of implementations... Which is better?

$ ./pgbench -j 16 -S -c 24 -t 10000 pgbench
number of clients (24) must be a multiple number of threads (16)

It's hard on forking-thread platforms because multiple threads need
to access the job queue. We need to put the queue on inter-process
shared memory, but it introduces additional complexities.

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

#20Itagaki Takahiro
itagaki.takahiro@oss.ntt.co.jp
In reply to: Itagaki Takahiro (#19)
1 attachment(s)
Re: multi-threaded pgbench

Itagaki Takahiro <itagaki.takahiro@oss.ntt.co.jp> wrote:

Greg Smith <gsmith@gregsmith.com> wrote:

That second code path, when --enable-thread-safety is turned off, crashes
and burns on my Linux system:

It comes from confliction of identifiers.
Renaming identifiers with #define can solve the errors:
#define pthread_t pg_pthread_t

Here is a patch to fix compile errors by identifier-renaming
when thread-safety is disabled on linux.

Also I fixed file descriptor leaks at the end of benchmark.

Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center

Attachments:

pgbench-mt_20090724.patchapplication/octet-stream; name=pgbench-mt_20090724.patchDownload
diff -cpr head/contrib/pgbench/pgbench.c pgbench-mt/contrib/pgbench/pgbench.c
*** head/contrib/pgbench/pgbench.c	2009-06-11 23:48:51.000000000 +0900
--- pgbench-mt/contrib/pgbench/pgbench.c	2009-07-24 10:11:07.625570376 +0900
***************
*** 30,35 ****
--- 30,36 ----
  
  #include "libpq-fe.h"
  #include "pqsignal.h"
+ #include "portability/instr_time.h"
  
  #include <ctype.h>
  
***************
*** 55,60 ****
--- 56,95 ----
  #include <sys/resource.h>		/* for getrlimit */
  #endif
  
+ #ifndef INT64_MAX
+ #define INT64_MAX	INT64CONST(0x7FFFFFFFFFFFFFFF)
+ #endif
+ 
+ /*
+  * Multi-platform pthread implementations
+  */
+ 
+ #ifdef WIN32
+ /* Use native win32 threads on Windows */
+ typedef struct win32_pthread   *pthread_t;
+ typedef int						pthread_attr_t;
+ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+ static int pthread_join(pthread_t th, void **thread_return);
+ 
+ #elif defined(ENABLE_THREAD_SAFETY)
+ /* Use platform-dependent pthread */
+ #include <pthread.h>
+ 
+ #else
+ 
+ #include <sys/wait.h>
+ /* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */
+ #define pthread_t				pg_pthread_t
+ #define pthread_attr_t			pg_pthread_attr_t
+ #define pthread_create			pg_pthread_create
+ #define pthread_join			pg_pthread_join
+ typedef struct fork_pthread	   *pthread_t;
+ typedef int						pthread_attr_t;
+ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+ static int pthread_join(pthread_t th, void **thread_return);
+ 
+ #endif
+ 
  extern char *optarg;
  extern int	optind;
  
*************** extern int	optind;
*** 71,77 ****
  
  #define DEFAULT_NXACTS	10		/* default nxacts */
  
- int			nclients = 1;		/* default number of simulated clients */
  int			nxacts = 0;			/* number of transactions per client */
  int			duration = 0;		/* duration in seconds */
  
--- 106,111 ----
*************** FILE	   *LOGFILE = NULL;
*** 99,106 ****
  
  bool		use_log;			/* log transaction latencies to a file */
  
- int			remains;			/* number of remaining clients */
- 
  int			is_connect;			/* establish connection  for each transaction */
  
  char	   *pghost = "";
--- 133,138 ----
*************** typedef struct
*** 135,149 ****
  	int			listen;			/* 0 indicates that an async query has been
  								 * sent */
  	int			sleeping;		/* 1 indicates that the client is napping */
! 	struct timeval until;		/* napping until */
  	Variable   *variables;		/* array of variable definitions */
  	int			nvariables;
! 	struct timeval txn_begin;	/* used for measuring latencies */
  	int			use_file;		/* index in sql_files for this client */
  	bool		prepared[MAX_FILES];
  } CState;
  
  /*
   * queries read from files
   */
  #define SQL_COMMAND		1
--- 167,200 ----
  	int			listen;			/* 0 indicates that an async query has been
  								 * sent */
  	int			sleeping;		/* 1 indicates that the client is napping */
! 	int64		until;			/* napping until (usec) */
  	Variable   *variables;		/* array of variable definitions */
  	int			nvariables;
! 	instr_time	txn_begin;		/* used for measuring latencies */
  	int			use_file;		/* index in sql_files for this client */
  	bool		prepared[MAX_FILES];
  } CState;
  
  /*
+  * Thread state and result
+  */
+ typedef struct
+ {
+ 	pthread_t		thread;		/* thread handle */
+ 	CState		   *state;		/* array of CState */
+ 	int				nstate;		/* length of state */
+ 	instr_time		start_time;	/* thread start time */
+ } TState;
+ 
+ #define INVALID_THREAD		((pthread_t) 0)
+ 
+ typedef struct
+ {
+ 	instr_time		conn_time;
+ 	int				xacts;
+ } TResult;
+ 
+ /*
   * queries read from files
   */
  #define SQL_COMMAND		1
*************** typedef struct
*** 168,175 ****
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! Command   **sql_files[MAX_FILES];		/* SQL script files */
! int			num_files;			/* number of script files */
  
  /* default scenario */
  static char *tpc_b = {
--- 219,227 ----
  	char	   *argv[MAX_ARGS]; /* command list */
  } Command;
  
! static Command	  **sql_files[MAX_FILES];	/* SQL script files */
! static int			num_files;				/* number of script files */
! static int			debug = 0;				/* debug flag */
  
  /* default scenario */
  static char *tpc_b = {
*************** static char *select_only = {
*** 212,255 ****
  	"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
  };
  
- /* Connection overhead time */
- static struct timeval conn_total_time = {0, 0};
- 
  /* Function prototypes */
  static void setalarm(int seconds);
! 
! 
! /* Calculate total time */
! static void
! addTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
! {
! 	int			sec = t1->tv_sec + t2->tv_sec;
! 	int			usec = t1->tv_usec + t2->tv_usec;
! 
! 	if (usec >= 1000000)
! 	{
! 		usec -= 1000000;
! 		sec++;
! 	}
! 	result->tv_sec = sec;
! 	result->tv_usec = usec;
! }
! 
! /* Calculate time difference */
! static void
! diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
! {
! 	int			sec = t1->tv_sec - t2->tv_sec;
! 	int			usec = t1->tv_usec - t2->tv_usec;
! 
! 	if (usec < 0)
! 	{
! 		usec += 1000000;
! 		sec--;
! 	}
! 	result->tv_sec = sec;
! 	result->tv_usec = usec;
! }
  
  static void
  usage(const char *progname)
--- 264,272 ----
  	"SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
  };
  
  /* Function prototypes */
  static void setalarm(int seconds);
! static void* threadRun(void *arg);
  
  static void
  usage(const char *progname)
*************** usage(const char *progname)
*** 267,272 ****
--- 284,290 ----
  		   "  -D VARNAME=VALUE\n"
  		   "               define variable for use by custom script\n"
  		   "  -f FILENAME  read transaction script from FILENAME\n"
+ 		   "  -j NUM       number of threads (default: 1)\n"
  		   "  -l           write transaction times to log file\n"
  		   "  -M {simple|extended|prepared}\n"
  		   "               protocol for submitting queries to server (default: simple)\n"
*************** discard_response(CState *state)
*** 376,404 ****
  	} while (res);
  }
  
- /* check to see if the SQL result was good */
- static int
- check(CState *state, PGresult *res, int n)
- {
- 	CState	   *st = &state[n];
- 
- 	switch (PQresultStatus(res))
- 	{
- 		case PGRES_COMMAND_OK:
- 		case PGRES_TUPLES_OK:
- 			/* OK */
- 			break;
- 		default:
- 			fprintf(stderr, "Client %d aborted in state %d: %s",
- 					n, st->state, PQerrorMessage(st->con));
- 			remains--;			/* I've aborted */
- 			PQfinish(st->con);
- 			st->con = NULL;
- 			return (-1);
- 	}
- 	return (0);					/* OK */
- }
- 
  static int
  compareVariables(const void *v1, const void *v2)
  {
--- 394,399 ----
*************** preparedStatementName(char *buffer, int 
*** 595,605 ****
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static void
! doCustom(CState *state, int n, int debug)
  {
  	PGresult   *res;
- 	CState	   *st = &state[n];
  	Command   **commands;
  
  top:
--- 590,613 ----
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static bool
! clientDone(CState *st, bool ok)
! {
! 	(void) ok;	/* unused */
! 
! 	if (st->con != NULL)
! 	{
! 		PQfinish(st->con);
! 		st->con = NULL;
! 	}
! 	return false;	/* always false */
! }
! 
! /* return false iff client should be disconnected */
! static bool
! doCustom(CState *st, instr_time *conn_time)
  {
  	PGresult   *res;
  	Command   **commands;
  
  top:
*************** top:
*** 607,622 ****
  
  	if (st->sleeping)
  	{							/* are we sleeping? */
! 		int			usec;
! 		struct timeval now;
  
! 		gettimeofday(&now, NULL);
! 		usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
! 			st->until.tv_usec - now.tv_usec;
! 		if (usec <= 0)
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return;				/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
--- 615,627 ----
  
  	if (st->sleeping)
  	{							/* are we sleeping? */
! 		instr_time	now;
  
! 		INSTR_TIME_SET_CURRENT(now);
! 		if (st->until <= INSTR_TIME_GET_MICROSEC(now))
  			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
  		else
! 			return true;		/* Still sleeping, nothing to do here */
  	}
  
  	if (st->listen)
*************** top:
*** 624,640 ****
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", n);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
! 				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
! 				return;
  			}
  			if (PQisBusy(st->con))
! 				return;			/* don't have the whole result yet */
  		}
  
  		/*
--- 629,642 ----
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d receiving\n", st->id);
  			if (!PQconsumeInput(st->con))
  			{					/* there's something wrong */
! 				fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
! 				return clientDone(st, false);
  			}
  			if (PQisBusy(st->con))
! 				return true;	/* don't have the whole result yet */
  		}
  
  		/*
*************** top:
*** 642,666 ****
  		 */
  		if (use_log && commands[st->state + 1] == NULL)
  		{
! 			double		diff;
! 			struct timeval now;
! 
! 			gettimeofday(&now, NULL);
! 			diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
! 				(int) (now.tv_usec - st->txn_begin.tv_usec);
! 
! 			fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
! 					st->id, st->cnt, diff, st->use_file,
! 					(long) now.tv_sec, (long) now.tv_usec);
  		}
  
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			if (check(state, res, n))
  			{
! 				PQclear(res);
! 				return;
  			}
  			PQclear(res);
  			discard_response(st);
--- 644,678 ----
  		 */
  		if (use_log && commands[st->state + 1] == NULL)
  		{
! 			instr_time	diff;
! 			double		sec;
! 			double		msec;
! 			double		usec;
! 
! 			INSTR_TIME_SET_CURRENT(diff);
! 			INSTR_TIME_SUBTRACT(diff, st->txn_begin);
! 			sec = INSTR_TIME_GET_DOUBLE(diff);
! 			msec = INSTR_TIME_GET_MILLISEC(diff);
! 			usec = (double) INSTR_TIME_GET_MICROSEC(diff);
! 
! 			fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n",
! 					st->id, st->cnt, usec, st->use_file,
! 					sec, usec - sec * 1000.0);
  		}
  
  		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			res = PQgetResult(st->con);
! 			switch (PQresultStatus(res))
  			{
! 				case PGRES_COMMAND_OK:
! 				case PGRES_TUPLES_OK:
! 					break;	/* OK */
! 				default:
! 					fprintf(stderr, "Client %d aborted in state %d: %s",
! 						st->id, st->state, PQerrorMessage(st->con));
! 					PQclear(res);
! 					return clientDone(st, false);
  			}
  			PQclear(res);
  			discard_response(st);
*************** top:
*** 676,690 ****
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 			{
! 				remains--;		/* I've done */
! 				if (st->con != NULL)
! 				{
! 					PQfinish(st->con);
! 					st->con = NULL;
! 				}
! 				return;
! 			}
  		}
  
  		/* increment state counter */
--- 688,694 ----
  
  			++st->cnt;
  			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 				return clientDone(st, true);	/* exit success */
  		}
  
  		/* increment state counter */
*************** top:
*** 699,725 ****
  
  	if (st->con == NULL)
  	{
! 		struct timeval t1,
! 					t2,
! 					t3;
  
! 		gettimeofday(&t1, NULL);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n",
! 					n);
! 			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
! 			return;
  		}
! 		gettimeofday(&t2, NULL);
! 		diffTime(&t2, &t1, &t3);
! 		addTime(&conn_total_time, &t3, &conn_total_time);
  	}
  
  	if (use_log && st->state == 0)
! 		gettimeofday(&(st->txn_begin), NULL);
  
  	if (commands[st->state]->type == SQL_COMMAND)
  	{
--- 703,722 ----
  
  	if (st->con == NULL)
  	{
! 		instr_time	start, end;
  
! 		INSTR_TIME_SET_CURRENT(start);
  		if ((st->con = doConnect()) == NULL)
  		{
! 			fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
! 			return clientDone(st, false);
  		}
! 		INSTR_TIME_SET_CURRENT(end);
! 		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
  	}
  
  	if (use_log && st->state == 0)
! 		INSTR_TIME_SET_CURRENT(st->txn_begin);
  
  	if (commands[st->state]->type == SQL_COMMAND)
  	{
*************** top:
*** 735,745 ****
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
--- 732,742 ----
  			{
  				fprintf(stderr, "out of memory\n");
  				st->ecnt++;
! 				return true;
  			}
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQuery(st->con, sql);
  			free(sql);
  		}
*************** top:
*** 751,757 ****
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
--- 748,754 ----
  			getQueryParams(st, command, params);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
  			r = PQsendQueryParams(st->con, sql, command->argc - 1,
  								  NULL, params, NULL, NULL, 0);
  		}
*************** top:
*** 785,791 ****
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", n, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
--- 782,788 ----
  			preparedStatementName(name, st->use_file, st->state);
  
  			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, name);
  			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
  									params, NULL, NULL, 0);
  		}
*************** top:
*** 795,801 ****
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
  			st->ecnt++;
  		}
  		else
--- 792,798 ----
  		if (r == 0)
  		{
  			if (debug)
! 				fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
  			st->ecnt++;
  		}
  		else
*************** top:
*** 809,815 ****
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", n, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
--- 806,812 ----
  
  		if (debug)
  		{
! 			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
  			for (i = 1; i < argc; i++)
  				fprintf(stderr, " %s", argv[i]);
  			fprintf(stderr, "\n");
*************** top:
*** 828,834 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				min = atoi(var);
  			}
--- 825,831 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				min = atoi(var);
  			}
*************** top:
*** 850,856 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  				max = atoi(var);
  			}
--- 847,853 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  				max = atoi(var);
  			}
*************** top:
*** 861,867 ****
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return;
  			}
  
  #ifdef DEBUG
--- 858,864 ----
  			{
  				fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
  				st->ecnt++;
! 				return true;
  			}
  
  #ifdef DEBUG
*************** top:
*** 873,879 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 870,876 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
*************** top:
*** 891,897 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return;
  				}
  				ope1 = atoi(var);
  			}
--- 888,894 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
  					st->ecnt++;
! 					return true;
  				}
  				ope1 = atoi(var);
  			}
*************** top:
*** 908,914 ****
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return;
  					}
  					ope2 = atoi(var);
  				}
--- 905,911 ----
  					{
  						fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
  						st->ecnt++;
! 						return true;
  					}
  					ope2 = atoi(var);
  				}
*************** top:
*** 927,933 ****
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
--- 924,930 ----
  					{
  						fprintf(stderr, "%s: division by zero\n", argv[0]);
  						st->ecnt++;
! 						return true;
  					}
  					snprintf(res, sizeof(res), "%d", ope1 / ope2);
  				}
*************** top:
*** 935,941 ****
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return;
  				}
  			}
  
--- 932,938 ----
  				{
  					fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
  					st->ecnt++;
! 					return true;
  				}
  			}
  
*************** top:
*** 943,949 ****
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return;
  			}
  
  			st->listen = 1;
--- 940,946 ----
  			{
  				fprintf(stderr, "%s: out of memory\n", argv[0]);
  				st->ecnt++;
! 				return true;
  			}
  
  			st->listen = 1;
*************** top:
*** 952,958 ****
  		{
  			char	   *var;
  			int			usec;
! 			struct timeval now;
  
  			if (*argv[1] == ':')
  			{
--- 949,955 ----
  		{
  			char	   *var;
  			int			usec;
! 			instr_time now;
  
  			if (*argv[1] == ':')
  			{
*************** top:
*** 960,966 ****
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return;
  				}
  				usec = atoi(var);
  			}
--- 957,963 ----
  				{
  					fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
  					st->ecnt++;
! 					return true;
  				}
  				usec = atoi(var);
  			}
*************** top:
*** 977,985 ****
  			else
  				usec *= 1000000;
  
! 			gettimeofday(&now, NULL);
! 			st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000;
! 			st->until.tv_usec = (now.tv_usec + usec) % 1000000;
  			st->sleeping = 1;
  
  			st->listen = 1;
--- 974,981 ----
  			else
  				usec *= 1000000;
  
! 			INSTR_TIME_SET_CURRENT(now);
! 			st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
  			st->sleeping = 1;
  
  			st->listen = 1;
*************** top:
*** 987,1004 ****
  
  		goto top;
  	}
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state)
  {
  	int			i;
  
! 	for (i = 0; i < nclients; i++)
  	{
  		if (state[i].con)
  			PQfinish(state[i].con);
  	}
  }
  
--- 983,1005 ----
  
  		goto top;
  	}
+ 
+ 	return true;
  }
  
  /* discard connections */
  static void
! disconnect_all(CState *state, int length)
  {
  	int			i;
  
! 	for (i = 0; i < length; i++)
  	{
  		if (state[i].con)
+ 		{
  			PQfinish(state[i].con);
+ 			state[i].con = NULL;
+ 		}
  	}
  }
  
*************** process_commands(char *buf)
*** 1264,1269 ****
--- 1265,1288 ----
  				return NULL;
  			}
  
+ 			/*
+ 			 * Split argument into number and unit for "sleep 1ms" or so.
+ 			 * We don't have to terminate the number argument with null
+ 			 * because it will parsed with atoi, that ignores trailing
+ 			 * non-digit characters.
+ 			 */
+ 			if (my_commands->argv[1][0] != ':')
+ 			{
+ 				char	*c = my_commands->argv[1];
+ 				while (isdigit(*c)) { c++; }
+ 				if (*c)
+ 				{
+ 					my_commands->argv[2] = c;
+ 					if (my_commands->argc < 3)
+ 						my_commands->argc = 3;
+ 				}
+ 			}
+ 
  			if (my_commands->argc >= 3)
  			{
  				if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
*************** process_builtin(char *tb)
*** 1450,1474 ****
  
  /* print out results */
  static void
! printResults(
! 			 int ttype, CState *state,
! 			 struct timeval * start_time, struct timeval * end_time)
  {
! 	double		t1,
! 				t2;
! 	int			i;
! 	int			normal_xacts = 0;
  	char	   *s;
  
! 	for (i = 0; i < nclients; i++)
! 		normal_xacts += state[i].cnt;
! 
! 	t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec);
! 	t1 = normal_xacts * 1000000.0 / t1;
! 
! 	t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
! 		(end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
! 	t2 = normal_xacts * 1000000.0 / t2;
  
  	if (ttype == 0)
  		s = "TPC-B (sort of)";
--- 1469,1486 ----
  
  /* print out results */
  static void
! printResults(int ttype, int normal_xacts, int nclients, int nthreads,
! 			 instr_time total_time, instr_time conn_total_time)
  {
! 	double		time_include,
! 				tps_include,
! 				tps_exclude;
  	char	   *s;
  
! 	time_include = INSTR_TIME_GET_DOUBLE(total_time);
! 	tps_include = normal_xacts / time_include;
! 	tps_exclude = normal_xacts / (time_include -
! 		(INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
  
  	if (ttype == 0)
  		s = "TPC-B (sort of)";
*************** printResults(
*** 1483,1488 ****
--- 1495,1501 ----
  	printf("scaling factor: %d\n", scale);
  	printf("query mode: %s\n", QUERYMODE[querymode]);
  	printf("number of clients: %d\n", nclients);
+ 	printf("number of threads: %d\n", nthreads);
  	if (duration <= 0)
  	{
  		printf("number of transactions per client: %d\n", nxacts);
*************** printResults(
*** 1495,1502 ****
  		printf("number of transactions actually processed: %d\n",
  			   normal_xacts);
  	}
! 	printf("tps = %f (including connections establishing)\n", t1);
! 	printf("tps = %f (excluding connections establishing)\n", t2);
  }
  
  
--- 1508,1515 ----
  		printf("number of transactions actually processed: %d\n",
  			   normal_xacts);
  	}
! 	printf("tps = %f (including connections establishing)\n", tps_include);
! 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
  }
  
  
*************** int
*** 1504,1532 ****
  main(int argc, char **argv)
  {
  	int			c;
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
- 	int			debug = 0;		/* debug flag */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
  
! 	struct timeval start_time;	/* start up time */
! 	struct timeval end_time;	/* end time */
  
  	int			i;
  
- 	fd_set		input_mask;
- 	int			nsocks;			/* return from select(2) */
- 	int			maxsock;		/* max socket number to be waited */
- 	struct timeval now;
- 	struct timeval timeout;
- 	int			min_usec;
- 
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
--- 1517,1542 ----
  main(int argc, char **argv)
  {
  	int			c;
+ 	int			nclients = 1;		/* default number of simulated clients */
+ 	int			nthreads = 1;		/* default number of threads */
  	int			is_init_mode = 0;		/* initialize mode? */
  	int			is_no_vacuum = 0;		/* no vacuum at all before testing? */
  	int			do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
  	int			ttype = 0;		/* transaction type. 0: TPC-B, 1: SELECT only,
  								 * 2: skip update of branches and tellers */
  	char	   *filename = NULL;
  	bool		scale_given = false;
  
  	CState	   *state;			/* status of clients */
+ 	TState	   *threads;		/* array of thread */
  
! 	instr_time	start_time;		/* start up time */
! 	instr_time	total_time;
! 	instr_time	conn_total_time;
! 	int			total_xacts;
  
  	int			i;
  
  #ifdef HAVE_GETRLIMIT
  	struct rlimit rlim;
  #endif
*************** main(int argc, char **argv)
*** 1576,1582 ****
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
  	{
  		switch (c)
  		{
--- 1586,1592 ----
  
  	memset(state, 0, sizeof(*state));
  
! 	while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
  	{
  		switch (c)
  		{
*************** main(int argc, char **argv)
*** 1629,1634 ****
--- 1639,1652 ----
  				}
  #endif   /* HAVE_GETRLIMIT */
  				break;
+ 			case 'j':	/* jobs */
+ 				nthreads = atoi(optarg);
+ 				if (nthreads <= 0)
+ 				{
+ 					fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'C':
  				is_connect = 1;
  				break;
*************** main(int argc, char **argv)
*** 1749,1755 ****
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	remains = nclients;
  
  	if (nclients > 1)
  	{
--- 1767,1777 ----
  	if (nxacts <= 0 && duration <= 0)
  		nxacts = DEFAULT_NXACTS;
  
! 	if (nclients % nthreads != 0)
! 	{
! 		fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads);
! 		exit(1);
! 	}
  
  	if (nclients > 1)
  	{
*************** main(int argc, char **argv)
*** 1767,1772 ****
--- 1789,1795 ----
  		{
  			int			j;
  
+ 			state[i].id = i;
  			for (j = 0; j < state[0].nvariables; j++)
  			{
  				if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
*************** main(int argc, char **argv)
*** 1876,1908 ****
  	PQfinish(con);
  
  	/* set random seed */
! 	gettimeofday(&start_time, NULL);
! 	srandom((unsigned int) start_time.tv_usec);
! 
! 	/* get start up time */
! 	gettimeofday(&start_time, NULL);
! 
! 	/* set alarm if duration is specified. */
! 	if (duration > 0)
! 		setalarm(duration);
! 
! 	if (is_connect == 0)
! 	{
! 		struct timeval t,
! 					now;
! 
! 		/* make connections to the database */
! 		for (i = 0; i < nclients; i++)
! 		{
! 			state[i].id = i;
! 			if ((state[i].con = doConnect()) == NULL)
! 				exit(1);
! 		}
! 		/* time after connections set up */
! 		gettimeofday(&now, NULL);
! 		diffTime(&now, &start_time, &t);
! 		addTime(&conn_total_time, &t, &conn_total_time);
! 	}
  
  	/* process bultin SQL scripts */
  	switch (ttype)
--- 1899,1906 ----
  	PQfinish(con);
  
  	/* set random seed */
! 	INSTR_TIME_SET_CURRENT(start_time);
! 	srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
  
  	/* process bultin SQL scripts */
  	switch (ttype)
*************** main(int argc, char **argv)
*** 1926,2065 ****
  			break;
  	}
  
  	/* send start up queries in async manner */
! 	for (i = 0; i < nclients; i++)
  	{
! 		Command   **commands = sql_files[state[i].use_file];
! 		int			prev_ecnt = state[i].ecnt;
  
! 		state[i].use_file = getrand(0, num_files - 1);
! 		doCustom(state, i, debug);
  
! 		if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
  			remains--;			/* I've aborted */
! 			PQfinish(state[i].con);
! 			state[i].con = NULL;
  		}
  	}
  
! 	for (;;)
  	{
! 		if (remains <= 0)
! 		{						/* all done ? */
! 			disconnect_all(state);
! 			/* get end time */
! 			gettimeofday(&end_time, NULL);
! 			printResults(ttype, state, &start_time, &end_time);
! 			if (LOGFILE)
! 				fclose(LOGFILE);
! 			exit(0);
! 		}
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
! 		min_usec = -1;
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
  
! 			if (state[i].sleeping)
  			{
  				int			this_usec;
- 				int			sock = PQsocket(state[i].con);
  
! 				if (min_usec < 0)
  				{
! 					gettimeofday(&now, NULL);
! 					min_usec = 0;
  				}
  
! 				this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
! 					state[i].until.tv_usec - now.tv_usec;
! 
! 				if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
  					min_usec = this_usec;
- 
- 				FD_SET		(sock, &input_mask);
- 
- 				if (maxsock < sock)
- 					maxsock = sock;
  			}
! 			else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
  			{
! 				int			sock = PQsocket(state[i].con);
! 
! 				if (sock < 0)
! 				{
! 					disconnect_all(state);
! 					exit(1);
! 				}
! 				FD_SET		(sock, &input_mask);
  
! 				if (maxsock < sock)
! 					maxsock = sock;
  			}
  		}
  
! 		if (maxsock != -1)
  		{
! 			if (min_usec >= 0)
  			{
  				timeout.tv_sec = min_usec / 1000000;
  				timeout.tv_usec = min_usec % 1000000;
! 
! 				nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
! 								(fd_set *) NULL, &timeout);
  			}
  			else
! 				nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
! 								(fd_set *) NULL, (struct timeval *) NULL);
  			if (nsocks < 0)
  			{
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
- 				disconnect_all(state);
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				exit(1);
! 			}
! #ifdef NOT_USED
! 			else if (nsocks == 0)
! 			{					/* timeout */
! 				fprintf(stderr, "select timeout\n");
! 				for (i = 0; i < nclients; i++)
! 				{
! 					fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
! 							i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
! 				}
! 				exit(0);
  			}
- #endif
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nclients; i++)
  		{
! 			Command   **commands = sql_files[state[i].use_file];
! 			int			prev_ecnt = state[i].ecnt;
  
! 			if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
! 						  || commands[state[i].state]->type == META_COMMAND))
  			{
! 				doCustom(state, i, debug);
  			}
  
! 			if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
  				remains--;		/* I've aborted */
! 				PQfinish(state[i].con);
! 				state[i].con = NULL;
  			}
  		}
  	}
  }
  
  
--- 1924,2150 ----
  			break;
  	}
  
+ 	/* get start up time */
+ 	INSTR_TIME_SET_CURRENT(start_time);
+ 
+ 	/* set alarm if duration is specified. */
+ 	if (duration > 0)
+ 		setalarm(duration);
+ 
+ 	/* start threads */
+ 	threads = (TState *) malloc(sizeof(TState) * nthreads);
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ 		threads[i].state = &state[nclients / nthreads * i];
+ 		threads[i].nstate = nclients / nthreads;
+ 		INSTR_TIME_SET_CURRENT(threads[i].start_time);
+ 
+ 		/* the first thread (i = 0) is executed by main thread */
+ 		if (i > 0)
+ 		{
+ 			int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+ 			if (err != 0 || threads[i].thread == INVALID_THREAD)
+ 			{
+ 				fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+ 				exit(1);
+ 			}
+ 		}
+ 		else
+ 		{
+ 			threads[i].thread = INVALID_THREAD;
+ 		}
+ 	}
+ 
+ 	/* wait for threads and accumulate results */
+ 	total_xacts = 0;
+ 	INSTR_TIME_SET_ZERO(conn_total_time);
+ 	for (i = 0; i < nthreads; i++)
+ 	{
+ 		void *ret = NULL;
+ 
+ 		if (threads[i].thread == INVALID_THREAD)
+ 			ret = threadRun(&threads[i]);
+ 		else
+ 			pthread_join(threads[i].thread, &ret);
+ 
+ 		if (ret != NULL)
+ 		{
+ 			TResult *r = (TResult *) ret;
+ 			total_xacts += r->xacts;
+ 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
+ 			free(ret);
+ 		}
+ 	}
+ 	disconnect_all(state, nclients);
+ 
+ 	/* get end time */
+ 	INSTR_TIME_SET_CURRENT(total_time);
+ 	INSTR_TIME_SUBTRACT(total_time, start_time);
+ 	printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
+ 	if (LOGFILE)
+ 		fclose(LOGFILE);
+ 
+ 	return 0;
+ }
+ 
+ static void *
+ threadRun(void *arg)
+ {
+ 	TState	   *thread = (TState *) arg;
+ 	CState	   *state = thread->state;
+ 	TResult	   *result;
+ 	instr_time	start, end;
+ 	int			nstate = thread->nstate;
+ 	int			remains = nstate;	/* number of remaining clients */
+ 	int			i;
+ 
+ 	result = malloc(sizeof(TResult));
+ 	INSTR_TIME_SET_ZERO(result->conn_time);
+ 
+ 	if (is_connect == 0)
+ 	{
+ 		/* make connections to the database */
+ 		for (i = 0; i < nstate; i++)
+ 		{
+ 			if ((state[i].con = doConnect()) == NULL)
+ 				goto done;
+ 		}
+ 	}
+ 
+ 	/* time after thread and connections set up */
+ 	INSTR_TIME_SET_CURRENT(result->conn_time);
+ 	INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+ 
  	/* send start up queries in async manner */
! 	for (i = 0; i < nstate; i++)
  	{
! 		CState	   *st = &state[i];
! 		Command   **commands = sql_files[st->use_file];
! 		int			prev_ecnt = st->ecnt;
  
! 		st->use_file = getrand(0, num_files - 1);
! 		if (!doCustom(st, &result->conn_time))
! 			remains--;		/* I've aborted */
  
! 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  		{
! 			fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
  			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
  		}
  	}
  
! 	while (remains > 0)
  	{
! 		fd_set			input_mask;
! 		int				maxsock;		/* max socket number to be waited */
! 		int64			now_usec = 0;
! 		int64			min_usec;
  
  		FD_ZERO(&input_mask);
  
  		maxsock = -1;
! 		min_usec = INT64_MAX;
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
! 			int			sock;
  
! 			if (st->sleeping)
  			{
  				int			this_usec;
  
! 				if (min_usec == INT64_MAX)
  				{
! 					instr_time	now;
! 					INSTR_TIME_SET_CURRENT(now);
! 					now_usec = INSTR_TIME_GET_MICROSEC(now);
  				}
  
! 				this_usec = st->until - now_usec;
! 				if (min_usec > this_usec)
  					min_usec = this_usec;
  			}
! 			else if (st->con == NULL)
  			{
! 				continue;
! 			}
! 			else if (commands[st->state]->type == META_COMMAND)
! 			{
! 				min_usec = 0;	/* the connection is ready to run */
! 				break;
! 			}
  
! 			sock = PQsocket(st->con);
! 			if (sock < 0)
! 			{
! 				fprintf(stderr, "bad socket: %s\n", strerror(errno));
! 				goto done;
  			}
+ 
+ 			FD_SET(sock, &input_mask);
+ 			if (maxsock < sock)
+ 				maxsock = sock;
  		}
  
! 		if (min_usec > 0 && maxsock != -1)
  		{
! 			int		nsocks;			/* return from select(2) */
! 
! 			if (min_usec != INT64_MAX)
  			{
+ 				struct timeval	timeout;
  				timeout.tv_sec = min_usec / 1000000;
  				timeout.tv_usec = min_usec % 1000000;
! 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
  			}
  			else
! 				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
  			if (nsocks < 0)
  			{
  				if (errno == EINTR)
  					continue;
  				/* must be something wrong */
  				fprintf(stderr, "select failed: %s\n", strerror(errno));
! 				goto done;
  			}
  		}
  
  		/* ok, backend returns reply */
! 		for (i = 0; i < nstate; i++)
  		{
! 			CState	   *st = &state[i];
! 			Command   **commands = sql_files[st->use_file];
! 			int			prev_ecnt = st->ecnt;
  
! 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
! 						  || commands[st->state]->type == META_COMMAND))
  			{
! 				if (!doCustom(st, &result->conn_time))
! 					remains--;		/* I've aborted */
  			}
  
! 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
  				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
  			}
  		}
  	}
+ 
+ done:
+ 	INSTR_TIME_SET_CURRENT(start);
+ 	disconnect_all(state, nstate);
+ 	result->xacts = 0;
+ 	for (i = 0; i < nstate; i++)
+ 		result->xacts += state[i].cnt;
+ 	INSTR_TIME_SET_CURRENT(end);
+ 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ 	return result;
  }
  
  
*************** setalarm(int seconds)
*** 2081,2086 ****
--- 2166,2252 ----
  	pqsignal(SIGALRM, handle_sig_alarm);
  	alarm(seconds);
  }
+ 
+ #ifndef ENABLE_THREAD_SAFETY
+ 
+ /*
+  * implements pthread using fork.
+  */
+ 
+ typedef struct fork_pthread
+ {
+ 	pid_t	pid;
+ 	int		pipes[2];
+ } fork_pthread;
+ 
+ static int
+ pthread_create(pthread_t *thread,
+ 			   pthread_attr_t *attr,
+ 			   void * (*start_routine)(void *),
+ 			   void *arg)
+ {
+ 	fork_pthread   *th;
+ 	void		   *ret;
+ 
+ 	th = (fork_pthread *) malloc(sizeof(fork_pthread));
+ 	pipe(th->pipes);
+ 
+ 	th->pid = fork();
+ 	if (th->pid == -1)	/* error */
+ 	{
+ 		free(th);
+ 		return errno;
+ 	}
+ 	if (th->pid != 0)	/* parent process */
+ 	{
+ 		close(th->pipes[1]);
+ 		*thread = th;
+ 		return 0;
+ 	}
+ 
+ 	/* child process */
+ 	close(th->pipes[0]);
+ 
+ 	/* set alarm again because the child does not inherit timers */
+ 	if (duration > 0)
+ 		setalarm(duration);
+ 
+ 	ret = start_routine(arg);
+ 	write(th->pipes[1], ret, sizeof(TResult));
+ 	close(th->pipes[1]);
+ 	free(th);
+ 	exit(0);
+ }
+ 
+ static int
+ pthread_join(pthread_t th, void **thread_return)
+ {
+ 	int		status;
+ 
+ 	while (waitpid(th->pid, &status, 0) != th->pid)
+ 	{
+ 		if (errno != EINTR)
+ 			return errno;
+ 	}
+ 
+ 	if (thread_return != NULL)
+ 	{
+ 		/* assume result is TResult */
+ 		*thread_return = malloc(sizeof(TResult));
+ 		if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
+ 		{
+ 			free(*thread_return);
+ 			*thread_return = NULL;
+ 		}
+ 	}
+ 	close(th->pipes[0]);
+ 
+ 	free(th);
+ 	return 0;
+ }
+ 
+ #endif
+ 
  #else							/* WIN32 */
  
  static VOID CALLBACK
*************** setalarm(int seconds)
*** 2107,2110 ****
--- 2273,2342 ----
  	}
  }
  
+ /* partial pthread implementation for Windows */
+ 
+ typedef struct win32_pthread
+ {
+ 	HANDLE		handle;
+ 	void	   *(*routine)(void *);
+ 	void	   *arg;
+ 	void	   *result;
+ } win32_pthread;
+ 
+ static unsigned __stdcall
+ win32_pthread_run(void *arg)
+ {
+ 	win32_pthread *th = (win32_pthread *) arg;
+ 
+ 	th->result = th->routine(th->arg);
+ 
+ 	return 0;
+ }
+ 
+ static int
+ pthread_create(pthread_t *thread,
+ 			   pthread_attr_t *attr,
+ 			   void * (*start_routine)(void *),
+ 			   void *arg)
+ {
+ 	int				save_errno;
+ 	win32_pthread   *th;
+ 
+ 	th = (win32_pthread *) malloc(sizeof(win32_pthread));
+ 	th->routine = start_routine;
+ 	th->arg = arg;
+ 	th->result = NULL;
+ 
+ 	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+ 	if (th->handle == NULL)
+ 	{
+ 		save_errno = errno;
+ 		free(th);
+ 		return save_errno;
+ 	}
+ 
+ 	*thread = th;
+ 	return 0;
+ }
+ 
+ static int
+ pthread_join(pthread_t th, void **thread_return)
+ {
+ 	if (th == NULL || th->handle == NULL)
+ 		return errno = EINVAL;
+ 
+ 	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+ 	{
+ 		_dosmaperr(GetLastError());
+ 		return errno;
+ 	}
+ 
+ 	if (thread_return)
+ 		*thread_return = th->result;
+ 
+ 	CloseHandle(th->handle);
+ 	free(th);
+ 	return 0;
+ }
+ 
  #endif   /* WIN32 */
diff -cpr head/doc/src/sgml/pgbench.sgml pgbench-mt/doc/src/sgml/pgbench.sgml
*** head/doc/src/sgml/pgbench.sgml	2009-05-08 07:01:18.000000000 +0900
--- pgbench-mt/doc/src/sgml/pgbench.sgml	2009-07-24 10:08:24.778572416 +0900
*************** pgbench <optional> <replaceable>options<
*** 172,177 ****
--- 172,185 ----
        </entry>
       </row>
       <row>
+       <entry><literal>-j</literal> <replaceable>threads</></entry>
+       <entry>
+        Number of worker threads. Clients are equally-divided into those
+        threads and executed in it. The number of clients must be a multiple
+        number of threads. Default is 1.
+       </entry>
+      </row>
+      <row>
        <entry><literal>-t</literal> <replaceable>transactions</></entry>
        <entry>
         Number of transactions each client runs.  Default is 10.
#21Josh Williams
joshwilliams@ij.net
In reply to: Greg Smith (#18)
Re: multi-threaded pgbench

On Wed, 2009-07-22 at 22:23 -0400, Greg Smith wrote:

Onto performance. My test system has a 16 cores of Xeon X5550 @
2.67GHz.
I created a little pgbench database (-s 10) and used the default
postgresql.conf parameters for everything but max_connections for a
rough
initial test.

To test on Windows, I set up a similar database on an 8-core 2.0GHz
E5335 (closest match I have.) It's compiled against a fresh CVS pull
from this morning, patched with the "20090724" updated version. I tried
to mirror the tests as much as possible, including the concurrent thread
counts despite having half the number of available cores. Doing that
didn't have much impact on the results, but more on that later.

Comparing the unpatched version to the new version running a single
client thread, there's no significant performance difference:

C:\pgsql85>bin\pgbenchorig.exe -S -c 8 -t 100000 pgbench
...
tps = 19061.234215 (including connections establishing)

C:\pgsql85>bin\pgbench.exe -S -c 8 -t 100000 pgbench
tps = 18852.928562 (including connections establishing)

As a basis of comparison the original pgbench was run with increasing
client counts, which shows the same drop off in throughput past the
16-client sweet spot:

con tps
8 18871
16 19161
24 18804
32 18670
64 17598
128 16664

However I was surprised to see these results for the patched version,
running 16 worker threads (apart from the 8 client run of course.)

C:\pgsql85>bin\pgbench.exe -S -j 16 -c 128 -t 100000 pgbench ...
con tps
8 18435 (-j 8)
16 18866
24 -----
32 17937
64 17016
128 15930

In all cases the patched version resulted in a lower performing output
than the unpatched version. It's clearly working, at least in that it's
launching the requested number of worker threads when looking at the
process. Adjusting the worker thread count down to match the number of
cores yielded identical results in the couple of test cases I ran.
Maybe pgbench itself is less of a bottleneck in this environment,
relatively speaking?

- Josh Williams

#22Greg Smith
gsmith@gregsmith.com
In reply to: Josh Williams (#21)
Re: multi-threaded pgbench

On Tue, 28 Jul 2009, Josh Williams wrote:

Maybe pgbench itself is less of a bottleneck in this environment,
relatively speaking?

On UNIXish systems, you know you've reached the conditions under which the
threaded pgbench would be helpful if the pgbench client program itself is
taking up a large percentage of a CPY just by itself. If your test system
is still setup, it might be interesting to try the 64 and 128 client cases
with Task Manager open, to see what percentage of the CPU the pgbench
driver program is using. If the pgbench client isn't already pegged at a
full CPU, I wouldn't necessarily threading it to help--it would just add
overhead that doesn't buy you anything, which seems to be what you're
measuring.

All the Linux tests suggest that limit tends up show up at over 20,000 TPS
nowawadys, so maybe your Window system is bottlenecking somewhere
completely different before it reaches saturation on the client.

In any case, Josh's review is exactly what I wanted to see here--the code
does compile and run successfully for someone besides its author under
Windows. Making it *effective* on that platform might end up being
outside the scope of what we want to chew on right now. I'll have updated
performance results to submit later this week against the updated patch.

--
* Greg Smith gsmith@gregsmith.com http://www.gregsmith.com Baltimore, MD

#23Josh Williams
joshwilliams@ij.net
In reply to: Greg Smith (#22)
Re: multi-threaded pgbench

On Tue, 2009-07-28 at 12:10 -0400, Greg Smith wrote:

If your test system
is still setup, it might be interesting to try the 64 and 128 client cases
with Task Manager open, to see what percentage of the CPU the pgbench
driver program is using. If the pgbench client isn't already pegged at a
full CPU, I wouldn't necessarily threading it to help--it would just add
overhead that doesn't buy you anything, which seems to be what you're
measuring.

That's a really good point, I do recall seeing pgbench taking only a
fraction of the CPU... Running it again, it hovers around 6 or 7
percent in both cases, so it's only using up around half a core.

Huh, running the patched version on a single thread with 128 clients
just got it to crash. Actually consistently, three times now. Will try
the same thing on the development box tomorrow morning to get some
better debugging information.

All the Linux tests suggest that limit tends up show up at over 20,000 TPS
nowawadys, so maybe your Window system is bottlenecking somewhere
completely different before it reaches saturation on the client.

I figured it was just indicating a limitation of the environment, where
Windows has some kind of inefficiency either in the PG port or just
something inherent in how the OS works. It does make me wonder where
exactly all that CPU time is going, though. OProfile, how I miss thee.
But that's a different discussion entirely.

- Josh Williams

#24Josh Williams
joshwilliams@ij.net
In reply to: Josh Williams (#23)
1 attachment(s)
Re: multi-threaded pgbench

On Tue, 2009-07-28 at 23:38 -0400, Josh Williams wrote:

Huh, running the patched version on a single thread with 128 clients
just got it to crash. Actually consistently, three times now. Will try
the same thing on the development box tomorrow morning to get some
better debugging information.

So yeah, buffer overrun.

In pgbench.c FD_SETSIZE is redefined to get around the Windows default
of 64. But this is done after bringing in winsock2.h (a couple levels
in as a result of first including postgres_fe.h). So any fd_set is
built with an array of 64 descriptors, while pgbench thinks it has 1024
available to work with.

This was introduced a while back; the multi-threaded patch just makes it
visible by giving it an important pointer to write over. Previously it
would just run over into the loop counter (and probably a couple other
things) and thus it'd continue on happily with the [sub]set it has.

In either case this seems to be a simple fix, to move that #define
earlier (see pgbench_win32.patch.)

- Josh Williams

Attachments:

pgbench_win32.patchtext/x-patch; charset=UTF-8; name=pgbench_win32.patchDownload
diff -c -r1.87 pgbench.c
*** contrib/pgbench/pgbench.c	11 Jun 2009 14:48:51 -0000	1.87
--- contrib/pgbench/pgbench.c	29 Jul 2009 21:18:18 -0000
***************
*** 26,31 ****
--- 26,36 ----
   * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
   *
   */
+ 
+ #ifdef WIN32
+ #define FD_SETSIZE 1024		/* set before winsock2.h is included */
+ #endif   /* ! WIN32 */
+ 
  #include "postgres_fe.h"
  
  #include "libpq-fe.h"
***************
*** 34,41 ****
  #include <ctype.h>
  
  #ifdef WIN32
- #undef FD_SETSIZE
- #define FD_SETSIZE 1024
  #include <win32.h>
  #else
  #include <signal.h>
--- 39,44 ----
#25Greg Smith
gsmith@gregsmith.com
In reply to: Josh Williams (#24)
Re: multi-threaded pgbench

This patch is wrapping up nicely. I re-tested against the updated
pgbench-mt_20090724 and now I get similar results whether or not
--enable-thread-safety is enabled on Linux, so that problem is gone.
Josh's successful Windows tests along with finding the bug he attached a
patch to is also encouraging.

I re-ran my performance tests with the same basic setup (16 core system,
database scale=10, read-only tests) but this time increased shared_buffers
to 256MB just to see if results popped up significantly (they didn't).

Here's a comparison of the original pgbench select-only TPS against the
new version using 1 thread:

clients
threads 16 32 64 128
none 91763 69707 68465 63730
1 90797 70117 66324 63626

I ran these a few times and those are basically the same result. If
there's a regression using 1 threads instead of 1 process, which I thought
I was seeing at one point with j=1/c=128, under closer investigation it
would have to be much smaller than the run to run variation of pgbench
because it vanished when I collected many runs of data.

Running the new pgbench with thread safety turned on:

clients
threads 16 32 64 128
1 89503 67849 67120 63499
2 97883 91888 87556 84430
4 95319 96409 90445 83569
8 96002 95411 88988 82383
16 103798 95056 87701 82253
32 X 95869 88253 82253

Running it without thread safety turned on so it uses processes instead
(this is the case I couldn't report on before):

clients
threads 16 32 64 128
1 89706 68702 64545 62770
2 99224 91677 88812 82442
4 96124 96552 90245 83311
8 97066 96000 89149 83266
16 103276 96088 88276 82652
32 X 97405 90082 83672

Those two tables are also identical relative to the run to run pgbench
noise.

This looks ready for a committer review to me, I'm happy that the patch
performs as expected and it seems to work across two platforms.

To step back for a second, I'm testing a fairly optimistic situation--the
standard RHEL 2.6.18 kernel which doesn't have any major issues here--and
I see a decent sized speedup (>30%) in the worst case. I've reported
before that running pgbench on newer Linux kernels (>=2.6.23) is horribly
slow, and sure enough the original results kicking off this thread showed
the same thing: only 11600 TPS on a modern 8 core system. That's less
than 1/4 what that server is capable of, and this patch allows working
around that issue nicely. pgbench not scaling up really a much worse
problem than my test results suggest.

--
* Greg Smith gsmith@gregsmith.com http://www.gregsmith.com Baltimore, MD

#26Magnus Hagander
magnus@hagander.net
In reply to: Josh Williams (#24)
Re: multi-threaded pgbench

On Wed, Jul 29, 2009 at 23:31, Josh Williams<joshwilliams@ij.net> wrote:

On Tue, 2009-07-28 at 23:38 -0400, Josh Williams wrote:

Huh, running the patched version on a single thread with 128 clients
just got it to crash.  Actually consistently, three times now.  Will try
the same thing on the development box tomorrow morning to get some
better debugging information.

So yeah, buffer overrun.

In pgbench.c FD_SETSIZE is redefined to get around the Windows default
of 64.  But this is done after bringing in winsock2.h (a couple levels
in as a result of first including postgres_fe.h).  So any fd_set is
built with an array of 64 descriptors, while pgbench thinks it has 1024
available to work with.

This was introduced a while back; the multi-threaded patch just makes it
visible by giving it an important pointer to write over.  Previously it
would just run over into the loop counter (and probably a couple other
things) and thus it'd continue on happily with the [sub]set it has.

Yikes.
Yeah, this is fallout from the hacking we did with moving the winsock
includes around a while back. At the time the #defines were added,
winsock came in through the win32.h file :S

In either case this seems to be a simple fix, to move that #define
earlier (see pgbench_win32.patch.)

Yes, and it seems to be entirely unrelated to the multithreaded patch.
Thus, applied as a separate patch.

--
Magnus Hagander
Self: http://www.hagander.net/
Work: http://www.redpill-linpro.com/