diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 56c37d5..8d7f8aa 100644
*** a/src/bin/pgbench/pgbench.c
--- b/src/bin/pgbench/pgbench.c
***************
*** 235,251 **** typedef struct StatsData
  } StatsData;
  
  /*
!  * Connection state
   */
  typedef struct
  {
  	PGconn	   *con;			/* connection handle to DB */
  	int			id;				/* client No. */
! 	int			state;			/* state No. */
! 	bool		listen;			/* whether an async query has been sent */
! 	bool		sleeping;		/* whether the client is napping */
! 	bool		throttling;		/* whether nap is for throttling */
! 	bool		is_throttled;	/* whether transaction throttling is done */
  	Variable   *variables;		/* array of variable definitions */
  	int			nvariables;		/* number of variables */
  	bool		vars_sorted;	/* are variables sorted by name? */
--- 235,305 ----
  } StatsData;
  
  /*
!  * Connection state machine states.
!  */
! typedef enum
! {
! 	/*
! 	 * In CSTATE_START_THROTTLE state, we calculate when to begin the next
! 	 * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
! 	 * sleeps until that moment.  (If throttling is not enabled, doCustom()
! 	 * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
! 	 */
! 	CSTATE_START_THROTTLE,
! 	CSTATE_THROTTLE,
! 
! 	/*
! 	 * CSTATE_START_TX performs start-of-transaction processing.  Establishes
! 	 * a new connection for the transaction, in --connect mode, and records
! 	 * the transaction start time.
! 	 */
! 	CSTATE_START_TX,
! 
! 	/*
! 	 * We loop through these states, to process each command in the
! 	 * script:
! 	 *
! 	 * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
! 	 * command, the command is sent to the server, and we move to
! 	 * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is
! 	 * set, and we enter the CSTATE_SLEEP state to wait for it to expire.
! 	 * Other meta-commands are executed immediately, and we proceed to next
! 	 * command.
! 	 *
! 	 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
! 	 * for the current command.
! 	 */
! 	CSTATE_START_COMMAND,
! 	CSTATE_WAIT_RESULT,
! 	CSTATE_SLEEP,					/* wait until txn_scheduled, for \sleep */
! 
! 	/*
! 	 * CSTATE_END_TX performs end-of-transaction processing.  Calculates
! 	 * latency, and logs the transaction.  In --connect mode, closes the
! 	 * current connection.  Chooses the next script to execute and starts
! 	 * over in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we
! 	 * have no more work to do.
! 	 */
! 	CSTATE_END_TX,
! 
! 	/*
! 	 * Final states.  CSTATE_ABORTED means that the script execution was
! 	 * aborted because a command failed, CSTATE_FINISHED means success.
! 	 */
! 	CSTATE_ABORTED,
! 	CSTATE_FINISHED,
! } ConnectionStateEnum;
! 
! /*
!  * Connection state.
   */
  typedef struct
  {
  	PGconn	   *con;			/* connection handle to DB */
  	int			id;				/* client No. */
! 	ConnectionStateEnum state;	/* state machine's current state. */
! 	int			command;		/* command No. */
! 
  	Variable   *variables;		/* array of variable definitions */
  	int			nvariables;		/* number of variables */
  	bool		vars_sorted;	/* are variables sorted by name? */
***************
*** 1381,1387 **** evalFunc(TState *thread, CState *st,
  				Assert(nargs == 1);
  
  				fprintf(stderr, "debug(script=%d,command=%d): ",
! 						st->use_file, st->state + 1);
  
  				if (varg->type == PGBT_INT)
  					fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
--- 1435,1441 ----
  				Assert(nargs == 1);
  
  				fprintf(stderr, "debug(script=%d,command=%d): ",
! 						st->use_file, st->command + 1);
  
  				if (varg->type == PGBT_INT)
  					fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
***************
*** 1732,1746 **** preparedStatementName(char *buffer, int file, int state)
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static bool
! clientDone(CState *st)
  {
! 	if (st->con != NULL)
! 	{
! 		PQfinish(st->con);
! 		st->con = NULL;
! 	}
! 	return false;				/* always false */
  }
  
  /* return a script number with a weighted choice. */
--- 1786,1796 ----
  	sprintf(buffer, "P%d_%d", file, state);
  }
  
! static void
! metaCommandFailed(CState *st)
  {
! 	fprintf(stderr, "client %d aborted in command %d; execution of meta-command failed\n",
! 			st->id, st->command);
  }
  
  /* return a script number with a weighted choice. */
***************
*** 1762,2185 **** chooseScript(TState *thread)
  	return i - 1;
  }
  
! /* return false iff client should be disconnected */
  static bool
! doCustom(TState *thread, CState *st, StatsData *agg)
  {
! 	PGresult   *res;
! 	Command   **commands;
! 	bool		trans_needs_throttle = false;
! 	instr_time	now;
  
! 	/*
! 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
! 	 * first time it's needed, and reuse the same value throughout this
! 	 * function after that. This also ensures that e.g. the calculated latency
! 	 * reported in the log file and in the totals are the same. Zero means
! 	 * "not set yet". Reset "now" when we step to the next command with "goto
! 	 * top", though.
! 	 */
! top:
! 	INSTR_TIME_SET_ZERO(now);
  
! 	commands = sql_script[st->use_file].commands;
  
! 	/*
! 	 * Handle throttling once per transaction by sleeping.  It is simpler to
! 	 * do this here rather than at the end, because so much complicated logic
! 	 * happens below when statements finish.
! 	 */
! 	if (throttle_delay && !st->is_throttled)
  	{
! 		/*
! 		 * Generate a delay such that the series of delays will approximate a
! 		 * Poisson distribution centered on the throttle_delay time.
! 		 *
! 		 * If transactions are too slow or a given wait is shorter than a
! 		 * transaction, the next transaction will start right away.
! 		 */
! 		int64		wait = getPoissonRand(thread, throttle_delay);
  
! 		thread->throttle_trigger += wait;
! 		st->txn_scheduled = thread->throttle_trigger;
  
! 		/* stop client if next transaction is beyond pgbench end of execution */
! 		if (duration > 0 && st->txn_scheduled > end_time)
! 			return clientDone(st);
  
! 		/*
! 		 * If this --latency-limit is used, and this slot is already late so
! 		 * that the transaction will miss the latency limit even if it
! 		 * completed immediately, we skip this time slot and iterate till the
! 		 * next slot that isn't late yet.
! 		 */
! 		if (latency_limit)
  		{
! 			int64		now_us;
  
! 			if (INSTR_TIME_IS_ZERO(now))
! 				INSTR_TIME_SET_CURRENT(now);
! 			now_us = INSTR_TIME_GET_MICROSEC(now);
! 			while (thread->throttle_trigger < now_us - latency_limit)
  			{
! 				processXactStats(thread, st, &now, true, agg);
! 				/* next rendez-vous */
! 				wait = getPoissonRand(thread, throttle_delay);
! 				thread->throttle_trigger += wait;
! 				st->txn_scheduled = thread->throttle_trigger;
  			}
  		}
  
! 		st->sleeping = true;
! 		st->throttling = true;
! 		st->is_throttled = true;
  		if (debug)
! 			fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
! 					st->id, wait);
  	}
  
! 	if (st->sleeping)
! 	{							/* are we sleeping? */
! 		if (INSTR_TIME_IS_ZERO(now))
! 			INSTR_TIME_SET_CURRENT(now);
! 		if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
! 			return true;		/* Still sleeping, nothing to do here */
! 		/* Else done sleeping, go ahead with next command */
! 		st->sleeping = false;
! 		st->throttling = false;
  	}
  
! 	if (st->listen)
! 	{							/* are we receiver? */
! 		if (commands[st->state]->type == SQL_COMMAND)
  		{
! 			if (debug)
! 				fprintf(stderr, "client %d receiving\n", st->id);
! 			if (!PQconsumeInput(st->con))
! 			{					/* there's something wrong */
! 				fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
! 				return clientDone(st);
! 			}
! 			if (PQisBusy(st->con))
! 				return true;	/* don't have the whole result yet */
  		}
  
! 		/*
! 		 * command finished: accumulate per-command execution times in
! 		 * thread-local data structure, if per-command latencies are requested
! 		 */
! 		if (is_latencies)
! 		{
! 			if (INSTR_TIME_IS_ZERO(now))
! 				INSTR_TIME_SET_CURRENT(now);
  
! 			/* XXX could use a mutex here, but we choose not to */
! 			addToSimpleStats(&commands[st->state]->stats,
! 							 INSTR_TIME_GET_DOUBLE(now) -
! 							 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! 		}
  
! 		/* transaction finished: calculate latency and log the transaction */
! 		if (commands[st->state + 1] == NULL)
! 		{
! 			if (progress || throttle_delay || latency_limit ||
! 				per_script_stats || use_log)
! 				processXactStats(thread, st, &now, false, agg);
! 			else
! 				thread->stats.cnt++;
! 		}
  
! 		if (commands[st->state]->type == SQL_COMMAND)
  		{
  			/*
! 			 * Read and discard the query result; note this is not included in
! 			 * the statement latency numbers.
  			 */
! 			res = PQgetResult(st->con);
! 			switch (PQresultStatus(res))
! 			{
! 				case PGRES_COMMAND_OK:
! 				case PGRES_TUPLES_OK:
! 				case PGRES_EMPTY_QUERY:
! 					break;		/* OK */
! 				default:
! 					fprintf(stderr, "client %d aborted in state %d: %s",
! 							st->id, st->state, PQerrorMessage(st->con));
! 					PQclear(res);
! 					return clientDone(st);
! 			}
! 			PQclear(res);
! 			discard_response(st);
! 		}
  
! 		if (commands[st->state + 1] == NULL)
! 		{
! 			if (is_connect)
! 			{
! 				PQfinish(st->con);
! 				st->con = NULL;
! 			}
  
! 			++st->cnt;
! 			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 				return clientDone(st);	/* exit success */
! 		}
  
! 		/* increment state counter */
! 		st->state++;
! 		if (commands[st->state] == NULL)
! 		{
! 			st->state = 0;
! 			st->use_file = chooseScript(thread);
! 			commands = sql_script[st->use_file].commands;
! 			if (debug)
! 				fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
! 						sql_script[st->use_file].desc);
! 			st->is_throttled = false;
  
  			/*
! 			 * No transaction is underway anymore, which means there is
! 			 * nothing to listen to right now.  When throttling rate limits
! 			 * are active, a sleep will happen next, as the next transaction
! 			 * starts.  And then in any case the next SQL command will set
! 			 * listen back to true.
  			 */
! 			st->listen = false;
! 			trans_needs_throttle = (throttle_delay > 0);
! 		}
! 	}
  
! 	if (st->con == NULL)
! 	{
! 		instr_time	start,
! 					end;
  
! 		INSTR_TIME_SET_CURRENT(start);
! 		if ((st->con = doConnect()) == NULL)
! 		{
! 			fprintf(stderr, "client %d aborted while establishing connection\n",
! 					st->id);
! 			return clientDone(st);
! 		}
! 		INSTR_TIME_SET_CURRENT(end);
! 		INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
! 
! 		/* Reset session-local state */
! 		st->listen = false;
! 		st->sleeping = false;
! 		st->throttling = false;
! 		st->is_throttled = false;
! 		memset(st->prepared, 0, sizeof(st->prepared));
! 	}
  
! 	/*
! 	 * This ensures that a throttling delay is inserted before proceeding with
! 	 * sql commands, after the first transaction. The first transaction
! 	 * throttling is performed when first entering doCustom.
! 	 */
! 	if (trans_needs_throttle)
! 	{
! 		trans_needs_throttle = false;
! 		goto top;
! 	}
  
! 	/* Record transaction start time under logging, progress or throttling */
! 	if ((use_log || progress || throttle_delay || latency_limit ||
! 		 per_script_stats) && st->state == 0)
! 	{
! 		INSTR_TIME_SET_CURRENT(st->txn_begin);
  
! 		/*
! 		 * When not throttling, this is also the transaction's scheduled start
! 		 * time.
! 		 */
! 		if (!throttle_delay)
! 			st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
! 	}
  
! 	/* Record statement start time if per-command latencies are requested */
! 	if (is_latencies)
! 		INSTR_TIME_SET_CURRENT(st->stmt_begin);
  
! 	if (commands[st->state]->type == SQL_COMMAND)
! 	{
! 		const Command *command = commands[st->state];
! 		int			r;
  
! 		if (querymode == QUERY_SIMPLE)
! 		{
! 			char	   *sql;
  
! 			sql = pg_strdup(command->argv[0]);
! 			sql = assignVariables(st, sql);
  
! 			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
! 			r = PQsendQuery(st->con, sql);
! 			free(sql);
! 		}
! 		else if (querymode == QUERY_EXTENDED)
! 		{
! 			const char *sql = command->argv[0];
! 			const char *params[MAX_ARGS];
  
! 			getQueryParams(st, command, params);
  
! 			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, sql);
! 			r = PQsendQueryParams(st->con, sql, command->argc - 1,
! 								  NULL, params, NULL, NULL, 0);
! 		}
! 		else if (querymode == QUERY_PREPARED)
! 		{
! 			char		name[MAX_PREPARE_NAME];
! 			const char *params[MAX_ARGS];
  
! 			if (!st->prepared[st->use_file])
! 			{
! 				int			j;
  
! 				for (j = 0; commands[j] != NULL; j++)
! 				{
! 					PGresult   *res;
! 					char		name[MAX_PREPARE_NAME];
! 
! 					if (commands[j]->type != SQL_COMMAND)
! 						continue;
! 					preparedStatementName(name, st->use_file, j);
! 					res = PQprepare(st->con, name,
! 						  commands[j]->argv[0], commands[j]->argc - 1, NULL);
! 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 						fprintf(stderr, "%s", PQerrorMessage(st->con));
! 					PQclear(res);
  				}
! 				st->prepared[st->use_file] = true;
! 			}
  
! 			getQueryParams(st, command, params);
! 			preparedStatementName(name, st->use_file, st->state);
  
! 			if (debug)
! 				fprintf(stderr, "client %d sending %s\n", st->id, name);
! 			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
! 									params, NULL, NULL, 0);
! 		}
! 		else	/* unknown sql mode */
! 			r = 0;
  
! 		if (r == 0)
! 		{
! 			if (debug)
! 				fprintf(stderr, "client %d could not send %s\n",
! 						st->id, command->argv[0]);
! 			st->ecnt++;
! 		}
! 		else
! 			st->listen = true;	/* flags that should be listened */
! 	}
! 	else if (commands[st->state]->type == META_COMMAND)
! 	{
! 		int			argc = commands[st->state]->argc,
! 					i;
! 		char	  **argv = commands[st->state]->argv;
  
! 		if (debug)
! 		{
! 			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
! 			for (i = 1; i < argc; i++)
! 				fprintf(stderr, " %s", argv[i]);
! 			fprintf(stderr, "\n");
! 		}
  
! 		if (pg_strcasecmp(argv[0], "set") == 0)
! 		{
! 			PgBenchExpr *expr = commands[st->state]->expr;
! 			PgBenchValue result;
  
! 			if (!evaluateExpr(thread, st, expr, &result))
! 			{
! 				st->ecnt++;
! 				return true;
! 			}
  
! 			if (!putVariableNumber(st, argv[0], argv[1], &result))
! 			{
! 				st->ecnt++;
! 				return true;
! 			}
  
! 			st->listen = true;
! 		}
! 		else if (pg_strcasecmp(argv[0], "sleep") == 0)
! 		{
! 			char	   *var;
! 			int			usec;
! 			instr_time	now;
  
! 			if (*argv[1] == ':')
! 			{
! 				if ((var = getVariable(st, argv[1] + 1)) == NULL)
  				{
! 					fprintf(stderr, "%s: undefined variable \"%s\"\n",
! 							argv[0], argv[1]);
! 					st->ecnt++;
! 					return true;
  				}
- 				usec = atoi(var);
- 			}
- 			else
- 				usec = atoi(argv[1]);
  
! 			if (argc > 2)
! 			{
! 				if (pg_strcasecmp(argv[2], "ms") == 0)
! 					usec *= 1000;
! 				else if (pg_strcasecmp(argv[2], "s") == 0)
! 					usec *= 1000000;
! 			}
! 			else
! 				usec *= 1000000;
  
! 			INSTR_TIME_SET_CURRENT(now);
! 			st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
! 			st->sleeping = true;
  
! 			st->listen = true;
! 		}
! 		else if (pg_strcasecmp(argv[0], "setshell") == 0)
! 		{
! 			bool		ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
  
! 			if (timer_exceeded) /* timeout */
! 				return clientDone(st);
! 			else if (!ret)		/* on error */
! 			{
! 				st->ecnt++;
! 				return true;
! 			}
! 			else	/* succeeded */
! 				st->listen = true;
! 		}
! 		else if (pg_strcasecmp(argv[0], "shell") == 0)
! 		{
! 			bool		ret = runShellCommand(st, NULL, argv + 1, argc - 1);
  
! 			if (timer_exceeded) /* timeout */
! 				return clientDone(st);
! 			else if (!ret)		/* on error */
! 			{
! 				st->ecnt++;
! 				return true;
! 			}
! 			else	/* succeeded */
! 				st->listen = true;
  		}
- 
- 		/* after a meta command, immediately proceed with next command */
- 		goto top;
  	}
- 
- 	return true;
  }
  
  /*
--- 1812,2423 ----
  	return i - 1;
  }
  
! /* Send a SQL command, using the chosen querymode */
  static bool
! sendCommand(CState *st, Command *command)
  {
! 	int			r;
  
! 	if (querymode == QUERY_SIMPLE)
! 	{
! 		char	   *sql;
  
! 		sql = pg_strdup(command->argv[0]);
! 		sql = assignVariables(st, sql);
  
! 		if (debug)
! 			fprintf(stderr, "client %d sending %s\n", st->id, sql);
! 		r = PQsendQuery(st->con, sql);
! 		free(sql);
! 	}
! 	else if (querymode == QUERY_EXTENDED)
  	{
! 		const char *sql = command->argv[0];
! 		const char *params[MAX_ARGS];
  
! 		getQueryParams(st, command, params);
  
! 		if (debug)
! 			fprintf(stderr, "client %d sending %s\n", st->id, sql);
! 		r = PQsendQueryParams(st->con, sql, command->argc - 1,
! 							  NULL, params, NULL, NULL, 0);
! 	}
! 	else if (querymode == QUERY_PREPARED)
! 	{
! 		char		name[MAX_PREPARE_NAME];
! 		const char *params[MAX_ARGS];
  
! 		if (!st->prepared[st->use_file])
  		{
! 			int			j;
! 			Command **commands = sql_script[st->use_file].commands;
  
! 			for (j = 0; commands[j] != NULL; j++)
  			{
! 				PGresult   *res;
! 				char		name[MAX_PREPARE_NAME];
! 
! 				if (commands[j]->type != SQL_COMMAND)
! 					continue;
! 				preparedStatementName(name, st->use_file, j);
! 				res = PQprepare(st->con, name,
! 								commands[j]->argv[0], commands[j]->argc - 1, NULL);
! 				if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 					fprintf(stderr, "%s", PQerrorMessage(st->con));
! 				PQclear(res);
  			}
+ 			st->prepared[st->use_file] = true;
  		}
  
! 		getQueryParams(st, command, params);
! 		preparedStatementName(name, st->use_file, st->command);
! 
  		if (debug)
! 			fprintf(stderr, "client %d sending %s\n", st->id, name);
! 		r = PQsendQueryPrepared(st->con, name, command->argc - 1,
! 								params, NULL, NULL, 0);
  	}
+ 	else	/* unknown sql mode */
+ 		r = 0;
  
! 	if (r == 0)
! 	{
! 		if (debug)
! 			fprintf(stderr, "client %d could not send %s\n",
! 					st->id, command->argv[0]);
! 		st->ecnt++;
! 		return false;
  	}
+ 	else
+ 		return true;
+ }
+ 
+ /*
+  * "execute" a sleep command. Parses the argument, and returns the
+  * requested amount of delay, in microseconds. Returns true on
+  * success, false on error.
+  */
+ static bool
+ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+ {
+ 	char	   *var;
+ 	int			usec;
  
! 	if (*argv[1] == ':')
! 	{
! 		if ((var = getVariable(st, argv[1] + 1)) == NULL)
  		{
! 			fprintf(stderr, "%s: undefined variable \"%s\"\n",
! 					argv[0], argv[1]);
! 			return false;
  		}
+ 		usec = atoi(var);
+ 	}
+ 	else
+ 		usec = atoi(argv[1]);
  
! 	if (argc > 2)
! 	{
! 		if (pg_strcasecmp(argv[2], "ms") == 0)
! 			usec *= 1000;
! 		else if (pg_strcasecmp(argv[2], "s") == 0)
! 			usec *= 1000000;
! 	}
! 	else
! 		usec *= 1000000;
  
! 	*usecs = usec;
! 	return true;
! }
  
! /*
!  * Advance the state machine of a connection, if possible.
!  */
! static void
! doCustom(TState *thread, CState *st, StatsData *agg)
! {
! 	PGresult   *res;
! 	Command	   *command;
! 	instr_time	now;
! 	bool		end_tx_processed = false;
! 
! 	/*
! 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
! 	 * first time it's needed, and reuse the same value throughout this
! 	 * function after that.  This also ensures that e.g. the calculated latency
! 	 * reported in the log file and in the totals are the same. Zero means
! 	 * "not set yet".  Reset "now" when we execute shell commands or expressions,
! 	 * which might take a non-neglicible amount of time, though.
! 	 */
! 	INSTR_TIME_SET_ZERO(now);
  
! 	/*
! 	 * Loop in the state machine, until we have to wait for a result from the
! 	 * server (or have to sleep, for throttling or for \sleep).
! 	 *
! 	 * Note: In the switch-statement below, 'break' will loop back here,
! 	 * meaning "continue in the state machine".  Return is used to return to
! 	 * the caller.  (XXX: Would it be better to replace the 'break's with
! 	 * 'continue's?)
! 	 */
! 	for (;;)
! 	{
! 		switch(st->state)
  		{
  			/*
! 			 * Handle throttling once per transaction by sleeping.
  			 */
! 			case CSTATE_START_THROTTLE:
! 				if (throttle_delay > 0)
! 				{
! 					/*
! 					 * Generate a delay such that the series of delays will
! 					 * approximate a Poisson distribution centered on the
! 					 * throttle_delay time.
! 					 *
! 					 * If transactions are too slow or a given wait is shorter
! 					 * than a transaction, the next transaction will start
! 					 * right away.
! 					 */
! 					int64		wait = getPoissonRand(thread, throttle_delay);
! 
! 					thread->throttle_trigger += wait;
! 					st->txn_scheduled = thread->throttle_trigger;
! 
! 					/*
! 					 * stop client if next transaction is beyond pgbench end
! 					 * of execution
! 					 */
! 					if (duration > 0 && st->txn_scheduled > end_time)
! 					{
! 						st->state = CSTATE_FINISHED;
! 						break;
! 					}
  
! 					/*
! 					 * If this --latency-limit is used, and this slot is
! 					 * already late so that the transaction will miss the
! 					 * latency limit even if it completed immediately, we skip
! 					 * this time slot and iterate till the next slot that
! 					 * isn't late yet.
! 					 */
! 					if (latency_limit)
! 					{
! 						int64		now_us;
  
! 						if (INSTR_TIME_IS_ZERO(now))
! 							INSTR_TIME_SET_CURRENT(now);
! 						now_us = INSTR_TIME_GET_MICROSEC(now);
! 						while (thread->throttle_trigger < now_us - latency_limit)
! 						{
! 							processXactStats(thread, st, &now, true, agg);
! 							/* next rendez-vous */
! 							wait = getPoissonRand(thread, throttle_delay);
! 							thread->throttle_trigger += wait;
! 							st->txn_scheduled = thread->throttle_trigger;
! 						}
! 					}
  
! 					st->state = CSTATE_THROTTLE;
! 					if (debug)
! 						fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
! 								st->id, wait);
! 				}
! 				else
! 				{
! 					/* no throttling, go directly to CSTATE_START_TX state */
! 					st->state = CSTATE_START_TX;
! 				}
! 				break;
  
  			/*
! 			 * Wait until it's time to start next transaction.
  			 */
! 			case CSTATE_THROTTLE:
! 				if (INSTR_TIME_IS_ZERO(now))
! 					INSTR_TIME_SET_CURRENT(now);
! 				if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
! 					return;			/* Still sleeping, nothing to do here */
! 
! 				/* Else done sleeping, start the transaction */
! 				st->state = CSTATE_START_TX;
! 				break;
  
! 			/* Start new transaction */
! 			case CSTATE_START_TX:
! 				/*
! 				 * Establish connection on first call, or if is_connect is
! 				 * true.
! 				 */
! 				if (st->con == NULL)
! 				{
! 					instr_time	start;
  
! 					if (INSTR_TIME_IS_ZERO(now))
! 						INSTR_TIME_SET_CURRENT(now);
! 					start = now;
! 					if ((st->con = doConnect()) == NULL)
! 					{
! 						fprintf(stderr, "client %d aborted while establishing connection\n",
! 							st->id);
! 						st->state = CSTATE_ABORTED;
! 						break;
! 					}
! 					INSTR_TIME_SET_CURRENT(now);
! 					INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
  
! 					/* Reset session-local state */
! 					memset(st->prepared, 0, sizeof(st->prepared));
! 				}
  
! 				/*
! 				 * Record transaction start time under logging, progress or
! 				 * throttling.
! 				 */
! 				if (use_log || progress || throttle_delay || latency_limit ||
! 					per_script_stats)
! 				{
! 					if (INSTR_TIME_IS_ZERO(now))
! 						INSTR_TIME_SET_CURRENT(now);
! 					st->txn_begin = now;
! 
! 					/*
! 					 * When not throttling, this is also the transaction's
! 					 * scheduled start time.
! 					 */
! 					if (!throttle_delay)
! 						st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
! 				}
  
! 				/* Begin with the first command */
! 				st->command = 0;
! 				st->state = CSTATE_START_COMMAND;
! 				break;
  
! 			/* Send a command to server (or execute a meta-command) */
! 			case CSTATE_START_COMMAND:
! 				command = sql_script[st->use_file].commands[st->command];
  
! 				/*
! 				 * If we reached the end of the script, move to end-of-xact
! 				 * processing.
! 				 */
! 				if (command == NULL)
! 				{
! 					st->state = CSTATE_END_TX;
! 					break;
! 				}
  
! 				/*
! 				 * Record statement start time if per-command latencies
! 				 * are requested
! 				 */
! 				if (is_latencies)
! 				{
! 					if (INSTR_TIME_IS_ZERO(now))
! 						INSTR_TIME_SET_CURRENT(now);
! 					st->stmt_begin = now;
! 				}
  
! 				if (command->type == SQL_COMMAND)
! 				{
! 					if (!sendCommand(st, command))
! 					{
! 						/*
! 						 * Failed. Stay in CSTATE_START_COMMAND state, to
! 						 * retry.
! 						 */
! 						return;
! 					}
! 					else
! 						st->state = CSTATE_WAIT_RESULT;
! 				}
! 				else if (command->type == META_COMMAND)
! 				{
! 					int			argc = command->argc,
! 								i;
! 					char	  **argv = command->argv;
  
! 					if (debug)
! 					{
! 						fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
! 						for (i = 1; i < argc; i++)
! 							fprintf(stderr, " %s", argv[i]);
! 						fprintf(stderr, "\n");
! 					}
  
! 					if (pg_strcasecmp(argv[0], "sleep") == 0)
! 					{
! 						/*
! 						 * A \sleep doesn't execute anything, we just get the
! 						 * delay from the argument, and enter the CSTATE_SLEEP
! 						 * state.  (The per-command latency will be recorded
! 						 * in CSTATE_SLEEP state, not here, after the delay has
! 						 * elapsed.)
! 						 */
! 						int			usec;
! 
! 						if (!evaluateSleep(st, argc, argv, &usec))
! 						{
! 							metaCommandFailed(st);
! 							st->state = CSTATE_ABORTED;
! 							break;
! 						}
  
! 						if (INSTR_TIME_IS_ZERO(now))
! 							INSTR_TIME_SET_CURRENT(now);
! 						st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
! 						st->state = CSTATE_SLEEP;
! 						break;
! 					}
! 					else
! 					{
! 						if (pg_strcasecmp(argv[0], "set") == 0)
! 						{
! 							PgBenchExpr *expr = command->expr;
! 							PgBenchValue result;
  
! 							if (!evaluateExpr(thread, st, expr, &result))
! 							{
! 								metaCommandFailed(st);
! 								st->state = CSTATE_ABORTED;
! 								break;
! 							}
  
! 							if (!putVariableNumber(st, argv[0], argv[1], &result))
! 							{
! 								metaCommandFailed(st);
! 								st->state = CSTATE_ABORTED;
! 								break;
! 							}
! 						}
! 						else if (pg_strcasecmp(argv[0], "setshell") == 0)
! 						{
! 							bool		ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
! 
! 							if (timer_exceeded) /* timeout */
! 							{
! 								st->state = CSTATE_FINISHED;
! 								break;
! 							}
! 							else if (!ret)		/* on error */
! 							{
! 								st->state = CSTATE_ABORTED;
! 								metaCommandFailed(st);
! 								break;
! 							}
! 							else
! 							{
! 								/* succeeded */
! 							}
! 						}
! 						else if (pg_strcasecmp(argv[0], "shell") == 0)
! 						{
! 							bool		ret = runShellCommand(st, NULL, argv + 1, argc - 1);
! 
! 							if (timer_exceeded) /* timeout */
! 							{
! 								st->state = CSTATE_FINISHED;
! 								break;
! 							}
! 							else if (!ret)		/* on error */
! 							{
! 								st->state = CSTATE_ABORTED;
! 								metaCommandFailed(st);
! 								break;
! 							}
! 							else
! 							{
! 								/* succeeded */
! 							}
! 						}
! 						/*
! 						 * executing the expression or shell command might
! 						 * take a non-neglicible amount of time, so reset
! 						 * 'now'
! 						 */
! 						INSTR_TIME_SET_ZERO(now);
! 
! 						/*
! 						 * meta-command executed: accumulate per-command
! 						 * execution times in thread-local data structure, if
! 						 * per-command latencies are requested
! 						 */
! 						if (is_latencies)
! 						{
! 							if (INSTR_TIME_IS_ZERO(now))
! 								INSTR_TIME_SET_CURRENT(now);
! 
! 							/* XXX could use a mutex here, but we choose not to */
! 							addToSimpleStats(&command->stats,
! 											 INSTR_TIME_GET_DOUBLE(now) -
! 											 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! 						}
! 
! 						/* after a meta command, proceed with next command */
! 						st->command++;
! 						st->state = CSTATE_START_COMMAND;
! 					}
  				}
! 				break;
  
! 			/* Wait for the current SQL command to complete */
! 			case CSTATE_WAIT_RESULT:
! 				command = sql_script[st->use_file].commands[st->command];
! 				if (debug)
! 					fprintf(stderr, "client %d receiving\n", st->id);
! 				if (!PQconsumeInput(st->con))
! 				{					/* there's something wrong */
! 					fprintf(stderr, "client %d aborted in command %d; perhaps the backend died while processing\n", st->id, st->command);
! 					st->state = CSTATE_ABORTED;
! 					break;
! 				}
! 				if (PQisBusy(st->con))
! 					return;			/* don't have the whole result yet */
  
! 				/*
! 				 * command finished: accumulate per-command execution times in
! 				 * thread-local data structure, if per-command latencies are requested
! 				 */
! 				if (is_latencies)
! 				{
! 					if (INSTR_TIME_IS_ZERO(now))
! 						INSTR_TIME_SET_CURRENT(now);
  
! 					/* XXX could use a mutex here, but we choose not to */
! 					addToSimpleStats(&command->stats,
! 									 INSTR_TIME_GET_DOUBLE(now) -
! 									 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! 				}
  
! 				/*
! 				 * Read and discard the query result; note this is not included in
! 				 * the statement latency numbers.
! 				 */
! 				/*
! 				 * XXX: If I'm reading the old code correctly, this used to not be included in
! 				 * the transaction latency either, for the last command in the transaction.
! 				 * Now it is.
! 				 */
! 				res = PQgetResult(st->con);
! 				switch (PQresultStatus(res))
! 				{
! 					case PGRES_COMMAND_OK:
! 					case PGRES_TUPLES_OK:
! 					case PGRES_EMPTY_QUERY:
! 						/* OK */
! 						PQclear(res);
! 						discard_response(st);
! 						st->command++;
! 						st->state = CSTATE_START_COMMAND;
! 						break;
! 					default:
! 						fprintf(stderr, "client %d aborted in command %d: %s",
! 								st->id, st->command, PQerrorMessage(st->con));
! 						PQclear(res);
! 						st->state = CSTATE_ABORTED;
! 						break;
! 				}
! 				break;
  
! 			/*
! 			 * Wait until txn_scheduled. This state is entered after a \sleep
! 			 * metacommand.
! 			 */
! 			case CSTATE_SLEEP:
! 				if (INSTR_TIME_IS_ZERO(now))
! 					INSTR_TIME_SET_CURRENT(now);
! 				if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
! 					return;			/* Still sleeping, nothing to do here */
  
! 				/* Else done sleeping. */
  
! 				/*
! 				 * Meta-command executed: accumulate per-command execution
! 				 * times in thread-local data structure, if per-command
! 				 * latencies are requested.
! 				 */
! 				if (is_latencies)
! 				{
! 					if (INSTR_TIME_IS_ZERO(now))
! 						INSTR_TIME_SET_CURRENT(now);
  
! 					/* XXX could use a mutex here, but we choose not to */
! 					command = sql_script[st->use_file].commands[st->command];
! 					addToSimpleStats(&command->stats,
! 									 INSTR_TIME_GET_DOUBLE(now) -
! 									 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! 				}
  
! 				/* Go ahead with next command */
! 				st->command++;
! 				st->state = CSTATE_START_COMMAND;
! 				break;
! 
! 			/*
! 			 * End of transaction.
! 			 */
! 			case CSTATE_END_TX:
! 				/* transaction finished: calculate latency and log the transaction */
! 				if (progress || throttle_delay || latency_limit ||
! 					per_script_stats || use_log)
! 					processXactStats(thread, st, &now, false, agg);
! 				else
! 					thread->stats.cnt++;
! 
! 				if (is_connect)
  				{
! 					PQfinish(st->con);
! 					st->con = NULL;
! 					INSTR_TIME_SET_ZERO(now);
  				}
  
! 				++st->cnt;
! 				if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! 				{
! 					/* exit success */
! 					st->state = CSTATE_FINISHED;
! 					break;
! 				}
  
! 				/* Select next transaction to run. */
! 				st->use_file = chooseScript(thread);
! 				if (debug)
! 					fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
! 							sql_script[st->use_file].desc);
  
! 				/*
! 				 * No transaction is underway anymore.
! 				 */
! 				st->command = -1;
! 				st->state = CSTATE_START_THROTTLE;
  
! 				/*
! 				 * If we paced through all commands in the script in this loop,
! 				 * without returning to the caller even once, do it now.  This
! 				 * gives the thread a chance to process other connections, and
! 				 * to do progress reporting.  This can currently only happen
! 				 * if the script consists entirely of meta-commands.
! 				 */
! 				if (end_tx_processed)
! 					return;
! 				else
! 				{
! 					end_tx_processed = true;
! 					break;
! 				}
  
! 			/* Final states.  Close the connection if it's still open. */
! 			case CSTATE_ABORTED:
! 			case CSTATE_FINISHED:
! 				if (st->con != NULL)
! 				{
! 					PQfinish(st->con);
! 					st->con = NULL;
! 				}
! 				return;
  		}
  	}
  }
  
  /*
***************
*** 4178,4206 **** threadRun(void *arg)
  	initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
  	last = aggs;
  
! 	/* send start up queries in async manner */
  	for (i = 0; i < nstate; i++)
  	{
  		CState	   *st = &state[i];
- 		int			prev_ecnt = st->ecnt;
- 		Command   **commands;
  
  		st->use_file = chooseScript(thread);
! 		commands = sql_script[st->use_file].commands;
! 		if (debug)
! 			fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
! 					sql_script[st->use_file].desc);
! 		if (!doCustom(thread, st, &aggs))
! 			remains--;			/* I've aborted */
! 
! 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
! 		{
! 			fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
! 					i, st->state);
! 			remains--;			/* I've aborted */
! 			PQfinish(st->con);
! 			st->con = NULL;
! 		}
  	}
  
  	while (remains > 0)
--- 4416,4428 ----
  	initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
  	last = aggs;
  
! 	/* Initialize the state machines with the first script to run */
  	for (i = 0; i < nstate; i++)
  	{
  		CState	   *st = &state[i];
  
  		st->use_file = chooseScript(thread);
! 		st->state = CSTATE_START_THROTTLE;
  	}
  
  	while (remains > 0)
***************
*** 4217,4275 **** threadRun(void *arg)
  		for (i = 0; i < nstate; i++)
  		{
  			CState	   *st = &state[i];
- 			Command   **commands = sql_script[st->use_file].commands;
  			int			sock;
  
! 			if (st->con == NULL)
  			{
  				continue;
  			}
! 			else if (st->sleeping)
  			{
! 				if (st->throttling && timer_exceeded)
! 				{
! 					/* interrupt client which has not started a transaction */
! 					remains--;
! 					st->sleeping = false;
! 					st->throttling = false;
! 					PQfinish(st->con);
! 					st->con = NULL;
! 					continue;
! 				}
! 				else	/* just a nap from the script */
! 				{
! 					int			this_usec;
! 
! 					if (min_usec == PG_INT64_MAX)
! 					{
! 						instr_time	now;
  
! 						INSTR_TIME_SET_CURRENT(now);
! 						now_usec = INSTR_TIME_GET_MICROSEC(now);
! 					}
  
! 					this_usec = st->txn_scheduled - now_usec;
! 					if (min_usec > this_usec)
! 						min_usec = this_usec;
  				}
  			}
! 			else if (commands[st->state]->type == META_COMMAND)
  			{
! 				min_usec = 0;	/* the connection is ready to run */
  				break;
  			}
! 
! 			sock = PQsocket(st->con);
! 			if (sock < 0)
  			{
! 				fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
! 				goto done;
  			}
- 
- 			FD_SET(sock, &input_mask);
- 
- 			if (maxsock < sock)
- 				maxsock = sock;
  		}
  
  		/* also wake up to print the next progress report on time */
--- 4439,4497 ----
  		for (i = 0; i < nstate; i++)
  		{
  			CState	   *st = &state[i];
  			int			sock;
  
! 			if (st->state == CSTATE_THROTTLE && timer_exceeded)
  			{
+ 				/* interrupt client which has not started a transaction */
+ 				st->state = CSTATE_FINISHED;
+ 				remains--;
+ 				PQfinish(st->con);
+ 				st->con = NULL;
  				continue;
  			}
! 			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
  			{
! 				/* a nap from the script, or sleep for throttling */
! 				int			this_usec;
  
! 				if (min_usec == PG_INT64_MAX)
! 				{
! 					instr_time	now;
  
! 					INSTR_TIME_SET_CURRENT(now);
! 					now_usec = INSTR_TIME_GET_MICROSEC(now);
  				}
+ 
+ 				this_usec = st->txn_scheduled - now_usec;
+ 				if (min_usec > this_usec)
+ 					min_usec = this_usec;
  			}
! 			else if (st->state == CSTATE_WAIT_RESULT)
  			{
! 				/*
! 				 * waiting for result from server - nothing to do unless the
! 				 * socket is readable
! 				 */
! 				sock = PQsocket(st->con);
! 				if (sock < 0)
! 				{
! 					fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
! 					goto done;
! 				}
! 
! 				FD_SET(sock, &input_mask);
! 
! 				if (maxsock < sock)
! 					maxsock = sock;
  				break;
  			}
! 			else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
  			{
! 				/* the connection is ready to run */
! 				min_usec = 0;
! 				break;
  			}
  		}
  
  		/* also wake up to print the next progress report on time */
***************
*** 4295,4301 **** threadRun(void *arg)
  		 * specified in the script ends, or it's time to print a progress
  		 * report.
  		 */
! 		if (min_usec > 0 && maxsock != -1)
  		{
  			int			nsocks; /* return from select(2) */
  
--- 4517,4523 ----
  		 * specified in the script ends, or it's time to print a progress
  		 * report.
  		 */
! 		if (min_usec > 0 || maxsock != -1)
  		{
  			int			nsocks; /* return from select(2) */
  
***************
*** 4319,4332 **** threadRun(void *arg)
  			}
  		}
  
! 		/* ok, backend returns reply */
  		for (i = 0; i < nstate; i++)
  		{
  			CState	   *st = &state[i];
! 			Command   **commands = sql_script[st->use_file].commands;
! 			int			prev_ecnt = st->ecnt;
  
! 			if (st->con)
  			{
  				int			sock = PQsocket(st->con);
  
--- 4541,4553 ----
  			}
  		}
  
! 		/* ok, advance the state machine of each connection */
  		for (i = 0; i < nstate; i++)
  		{
  			CState	   *st = &state[i];
! 			bool		ready;
  
! 			if (st->state == CSTATE_WAIT_RESULT && st->con)
  			{
  				int			sock = PQsocket(st->con);
  
***************
*** 4336,4356 **** threadRun(void *arg)
  							PQerrorMessage(st->con));
  					goto done;
  				}
! 				if (FD_ISSET(sock, &input_mask) ||
! 					commands[st->state]->type == META_COMMAND)
! 				{
! 					if (!doCustom(thread, st, &aggs))
! 						remains--;		/* I've aborted */
! 				}
  			}
  
! 			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
  			{
! 				fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
! 						i, st->state);
! 				remains--;		/* I've aborted */
! 				PQfinish(st->con);
! 				st->con = NULL;
  			}
  		}
  
--- 4557,4575 ----
  							PQerrorMessage(st->con));
  					goto done;
  				}
! 
! 				ready = FD_ISSET(sock, &input_mask);
  			}
+ 			else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ 				ready = false;
+ 			else
+ 				ready = true;
  
! 			if (ready)
  			{
! 				doCustom(thread, st, &aggs);
! 				if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
! 					remains--;
  			}
  		}
  
