pgbench - minor fix for meta command only scripts
While testing meta-command pgbench only scripts, I noticed that there is
an infinite loop in threadRun, which means that other tasks such as
reporting progress do not get a chance.
The attached patch breaks this loop by always returning at the end of a
script.
On "pgbench -T 3 -P 1 -f noop.sql", before this patch, the progress is not
shown, after it is.
--
Fabien.
Attachments:
pgbench-no-sql-fix-1.sqlapplication/x-sql; name=pgbench-no-sql-fix-1.sqlDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 87fb006..d06f581 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -2172,8 +2172,14 @@ top:
st->listen = true;
}
- /* after a meta command, immediately proceed with next command */
- goto top;
+ /*
+ * After a meta command, immediately proceed with next command...
+ * although not if last. This exception ensures that a meta command
+ * only script does not always loop in doCustom, so that other tasks
+ * in threadRun, eg progress reporting or switching client, get a chance.
+ */
+ if (commands[st->state + 1] != NULL)
+ goto top;
}
return true;
On Sat, Jul 9, 2016 at 4:09 PM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:
While testing meta-command pgbench only scripts, I noticed that there is an
infinite loop in threadRun, which means that other tasks such as reporting
progress do not get a chance.The attached patch breaks this loop by always returning at the end of a
script.On "pgbench -T 3 -P 1 -f noop.sql", before this patch, the progress is not
shown, after it is.
You may want to name your patches with .patch or .diff. Using .sql is
disturbing style :)
Indeed, not reporting the progress back to the client in the case of a
script with only meta commands is non-intuitive.
- /* after a meta command, immediately proceed with next command */
- goto top;
+ /*
+ * After a meta command, immediately proceed with next command...
+ * although not if last. This exception ensures that a meta command
+ * only script does not always loop in doCustom, so that other tasks
+ * in threadRun, eg progress reporting or switching client,
get a chance.
+ */
+ if (commands[st->state + 1] != NULL)
+ goto top;
This looks good to me. I'd just rewrite the comment block with
something like that, more simplified:
+ /*
+ * After a meta command, immediately proceed with next command.
+ * But if this is the last command, just leave.
+ */
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello Michaᅵl,
You may want to name your patches with .patch or .diff. Using .sql is
disturbing style :)
Indeed! :-)
Indeed, not reporting the progress back to the client in the case of a
script with only meta commands is non-intuitive.This looks good to me. I'd just rewrite the comment block with
something like that, more simplified:
Ok. Here is an updated version, with a better suffix and a simplified
comment.
Thanks,
--
Fabien.
Attachments:
pgbench-no-sql-fix-2.patchtext/x-diff; name=pgbench-no-sql-fix-2.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 87fb006..4e7449e 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -2172,8 +2172,12 @@ top:
st->listen = true;
}
- /* after a meta command, immediately proceed with next command */
- goto top;
+ /*
+ * After a meta command immediately proceed with next command,
+ * but if it is the last command, just leave.
+ */
+ if (commands[st->state + 1] != NULL)
+ goto top;
}
return true;
Fabien COELHO <coelho@cri.ensmp.fr> writes:
Ok. Here is an updated version, with a better suffix and a simplified
comment.
Doesn't this break the handling of latency calculations, or at least make
the results completely different for the last metacommand than what they
would be for a non-last command? It looks like it needs to loop back so
that the latency calculation is completed for the metacommand before it
can exit. Seems to me it would probably make more sense to fall out at
the end of the "transaction finished" if-block, around line 1923 in HEAD.
(The code structure in here seems like a complete mess to me, but probably
now is not the time to refactor it.)
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello Tom,
Ok. Here is an updated version, with a better suffix and a simplified
comment.Doesn't this break the handling of latency calculations, or at least make
the results completely different for the last metacommand than what they
would be for a non-last command? It looks like it needs to loop back so
that the latency calculation is completed for the metacommand before it
can exit. Seems to me it would probably make more sense to fall out at
the end of the "transaction finished" if-block, around line 1923 in HEAD.
Indeed, it would trouble a little bit the stats computation by delaying
the recording of the end of statement & transaction.
However line 1923 is a shortcut for ending pgbench, but at the end of a
transaction more stuff must be done, eg choosing the next script and
reconnecting, before exiting. The solution is more contrived.
The attached patch provides a solution which ensures the return in the
right condition and after the stat collection. The code structure requires
another ugly boolean to proceed so as to preserve doing the reconnection
between the decision that the return must be done and the place where it
can be done, after reconnecting.
(The code structure in here seems like a complete mess to me, but probably
now is not the time to refactor it.)
I fully agree that the code structure is a total mess:-( Maybe I'll try to
submit a simpler one some day.
Basically the doCustom function is not resilient, you cannot exit from
anywhere and hope that re-entring would achieve a consistent behavior.
While reading the code to find a better place for a return, I noted some
possible inconsistencies in recording stats, which are noted as comments
in the attached patch.
Calling chooseScript is done both from outside for initialization and from
inside doCustom, where it could be done once and more clearly in doCustom.
Boolean listen is not reset because the script is expected to execute
directly the start of the next statement. I succeeded in convincing myself
that it actually works, but it is unobvious to spot why. I think that a
simpler pattern would be welcome. Also, some other things (eg prepared)
are not reset in all cases, not sure why.
The goto should probably be replaced by a while.
...
--
Fabien.
Attachments:
pgbench-latency-t-2.patchtext/x-diff; name=pgbench-latency-t-2.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 87fb006..8c5df14 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -1766,7 +1766,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
{
PGresult *res;
Command **commands;
- bool trans_needs_throttle = false;
+ bool trans_needs_throttle = false,
+ return_before_next_trans = false;
instr_time now;
/*
@@ -1849,6 +1850,8 @@ top:
if (st->listen)
{ /* are we receiver? */
+ bool listened_a_meta = commands[st->state]->type == META_COMMAND;
+
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
@@ -1892,6 +1895,7 @@ top:
/*
* Read and discard the query result; note this is not included in
* the statement latency numbers.
+ * Should this be done before recording the statement stats?
*/
res = PQgetResult(st->con);
switch (PQresultStatus(res))
@@ -1913,6 +1917,7 @@ top:
{
if (is_connect)
{
+ /* Should transaction stats recorded above count this time? */
PQfinish(st->con);
st->con = NULL;
}
@@ -1942,12 +1947,17 @@ top:
* listen back to true.
*/
st->listen = false;
+
+ if (listened_a_meta)
+ return_before_next_trans = true;
+
trans_needs_throttle = (throttle_delay > 0);
}
}
if (st->con == NULL)
{
+ /* Why is connection time is out of transaction time stats? */
instr_time start,
end;
@@ -1969,6 +1979,10 @@ top:
memset(st->prepared, 0, sizeof(st->prepared));
}
+ /* ensure that meta-only scripts sometimes return */
+ if (return_before_next_trans)
+ return true;
+
/*
* This ensures that a throttling delay is inserted before proceeding with
* sql commands, after the first transaction. The first transaction
The attached patch provides a solution which ensures the return in the right
condition and after the stat collection. The code structure requires another
ugly boolean to proceed so as to preserve doing the reconnection between the
decision that the return must be done and the place where it can be done,
after reconnecting.
Ooops, the attached patched was the right content but wrongly named:-(
Here it is again with a consistent name.
Sorry for the noise.
--
Fabien.
Attachments:
pgbench-no-sql-fix-3.patchtext/x-diff; name=pgbench-no-sql-fix-3.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 87fb006..8c5df14 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -1766,7 +1766,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
{
PGresult *res;
Command **commands;
- bool trans_needs_throttle = false;
+ bool trans_needs_throttle = false,
+ return_before_next_trans = false;
instr_time now;
/*
@@ -1849,6 +1850,8 @@ top:
if (st->listen)
{ /* are we receiver? */
+ bool listened_a_meta = commands[st->state]->type == META_COMMAND;
+
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
@@ -1892,6 +1895,7 @@ top:
/*
* Read and discard the query result; note this is not included in
* the statement latency numbers.
+ * Should this be done before recording the statement stats?
*/
res = PQgetResult(st->con);
switch (PQresultStatus(res))
@@ -1913,6 +1917,7 @@ top:
{
if (is_connect)
{
+ /* Should transaction stats recorded above count this time? */
PQfinish(st->con);
st->con = NULL;
}
@@ -1942,12 +1947,17 @@ top:
* listen back to true.
*/
st->listen = false;
+
+ if (listened_a_meta)
+ return_before_next_trans = true;
+
trans_needs_throttle = (throttle_delay > 0);
}
}
if (st->con == NULL)
{
+ /* Why is connection time is out of transaction time stats? */
instr_time start,
end;
@@ -1969,6 +1979,10 @@ top:
memset(st->prepared, 0, sizeof(st->prepared));
}
+ /* ensure that meta-only scripts sometimes return */
+ if (return_before_next_trans)
+ return true;
+
/*
* This ensures that a throttling delay is inserted before proceeding with
* sql commands, after the first transaction. The first transaction
On 07/13/2016 11:14 AM, Fabien COELHO wrote:
(The code structure in here seems like a complete mess to me, but probably
now is not the time to refactor it.)I fully agree that the code structure is a total mess:-( Maybe I'll try to
submit a simpler one some day.Basically the doCustom function is not resilient, you cannot exit from
anywhere and hope that re-entring would achieve a consistent behavior.While reading the code to find a better place for a return, I noted some
possible inconsistencies in recording stats, which are noted as comments
in the attached patch.Calling chooseScript is done both from outside for initialization and from
inside doCustom, where it could be done once and more clearly in doCustom.Boolean listen is not reset because the script is expected to execute
directly the start of the next statement. I succeeded in convincing myself
that it actually works, but it is unobvious to spot why. I think that a
simpler pattern would be welcome. Also, some other things (eg prepared)
are not reset in all cases, not sure why.The goto should probably be replaced by a while.
...
Yeah, it really is quite a mess. I tried to review your patch, and I
think it's correct, but I couldn't totally convince myself, because of
the existing messiness of the logic. So I bit the bullet and started
refactoring.
I came up with the attached. It refactors the logic in doCustom() into a
state machine. I think this is much clearer, what do you think?
@@ -1892,6 +1895,7 @@ top: /* * Read and discard the query result; note this is not included in * the statement latency numbers. + * Should this be done before recording the statement stats? */ res = PQgetResult(st->con); switch (PQresultStatus(res))
Well, the comment right there says "note this is not included in the
statement latency numbers", so apparently it's intentional. Whether it's
a good idea or not, I don't know :-). It does seem a bit surprising.
But what seems more bogus to me is that we do that after recording the
*transaction* stats, if this was the last command. So the PQgetResult()
of the last command in the transaction is not included in the
transaction stats, even though the PQgetResult() calls for any previous
commands are. (Perhaps that's what you meant too?)
I changed that in my patch, it would've been inconvenient to keep that
old behavior, and it doesn't make any sense to me anyway.
- Heikki
Attachments:
refactor-pgbench-doCustom.patchtext/x-patch; name=refactor-pgbench-doCustom.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 56c37d5..8d7f8aa 100644
*** a/src/bin/pgbench/pgbench.c
--- b/src/bin/pgbench/pgbench.c
***************
*** 235,251 **** typedef struct StatsData
} StatsData;
/*
! * Connection state
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
! int state; /* state No. */
! bool listen; /* whether an async query has been sent */
! bool sleeping; /* whether the client is napping */
! bool throttling; /* whether nap is for throttling */
! bool is_throttled; /* whether transaction throttling is done */
Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
--- 235,305 ----
} StatsData;
/*
! * Connection state machine states.
! */
! typedef enum
! {
! /*
! * In CSTATE_START_THROTTLE state, we calculate when to begin the next
! * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
! * sleeps until that moment. (If throttling is not enabled, doCustom()
! * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
! */
! CSTATE_START_THROTTLE,
! CSTATE_THROTTLE,
!
! /*
! * CSTATE_START_TX performs start-of-transaction processing. Establishes
! * a new connection for the transaction, in --connect mode, and records
! * the transaction start time.
! */
! CSTATE_START_TX,
!
! /*
! * We loop through these states, to process each command in the
! * script:
! *
! * CSTATE_START_COMMAND starts the execution of a command. On a SQL
! * command, the command is sent to the server, and we move to
! * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is
! * set, and we enter the CSTATE_SLEEP state to wait for it to expire.
! * Other meta-commands are executed immediately, and we proceed to next
! * command.
! *
! * CSTATE_WAIT_RESULT waits until we get a result set back from the server
! * for the current command.
! */
! CSTATE_START_COMMAND,
! CSTATE_WAIT_RESULT,
! CSTATE_SLEEP, /* wait until txn_scheduled, for \sleep */
!
! /*
! * CSTATE_END_TX performs end-of-transaction processing. Calculates
! * latency, and logs the transaction. In --connect mode, closes the
! * current connection. Chooses the next script to execute and starts
! * over in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we
! * have no more work to do.
! */
! CSTATE_END_TX,
!
! /*
! * Final states. CSTATE_ABORTED means that the script execution was
! * aborted because a command failed, CSTATE_FINISHED means success.
! */
! CSTATE_ABORTED,
! CSTATE_FINISHED,
! } ConnectionStateEnum;
!
! /*
! * Connection state.
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
! ConnectionStateEnum state; /* state machine's current state. */
! int command; /* command No. */
!
Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
***************
*** 1381,1387 **** evalFunc(TState *thread, CState *st,
Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ",
! st->use_file, st->state + 1);
if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
--- 1435,1441 ----
Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ",
! st->use_file, st->command + 1);
if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
***************
*** 1732,1746 **** preparedStatementName(char *buffer, int file, int state)
sprintf(buffer, "P%d_%d", file, state);
}
! static bool
! clientDone(CState *st)
{
! if (st->con != NULL)
! {
! PQfinish(st->con);
! st->con = NULL;
! }
! return false; /* always false */
}
/* return a script number with a weighted choice. */
--- 1786,1796 ----
sprintf(buffer, "P%d_%d", file, state);
}
! static void
! metaCommandFailed(CState *st)
{
! fprintf(stderr, "client %d aborted in command %d; execution of meta-command failed\n",
! st->id, st->command);
}
/* return a script number with a weighted choice. */
***************
*** 1762,2185 **** chooseScript(TState *thread)
return i - 1;
}
! /* return false iff client should be disconnected */
static bool
! doCustom(TState *thread, CState *st, StatsData *agg)
{
! PGresult *res;
! Command **commands;
! bool trans_needs_throttle = false;
! instr_time now;
! /*
! * gettimeofday() isn't free, so we get the current timestamp lazily the
! * first time it's needed, and reuse the same value throughout this
! * function after that. This also ensures that e.g. the calculated latency
! * reported in the log file and in the totals are the same. Zero means
! * "not set yet". Reset "now" when we step to the next command with "goto
! * top", though.
! */
! top:
! INSTR_TIME_SET_ZERO(now);
! commands = sql_script[st->use_file].commands;
! /*
! * Handle throttling once per transaction by sleeping. It is simpler to
! * do this here rather than at the end, because so much complicated logic
! * happens below when statements finish.
! */
! if (throttle_delay && !st->is_throttled)
{
! /*
! * Generate a delay such that the series of delays will approximate a
! * Poisson distribution centered on the throttle_delay time.
! *
! * If transactions are too slow or a given wait is shorter than a
! * transaction, the next transaction will start right away.
! */
! int64 wait = getPoissonRand(thread, throttle_delay);
! thread->throttle_trigger += wait;
! st->txn_scheduled = thread->throttle_trigger;
! /* stop client if next transaction is beyond pgbench end of execution */
! if (duration > 0 && st->txn_scheduled > end_time)
! return clientDone(st);
! /*
! * If this --latency-limit is used, and this slot is already late so
! * that the transaction will miss the latency limit even if it
! * completed immediately, we skip this time slot and iterate till the
! * next slot that isn't late yet.
! */
! if (latency_limit)
{
! int64 now_us;
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! now_us = INSTR_TIME_GET_MICROSEC(now);
! while (thread->throttle_trigger < now_us - latency_limit)
{
! processXactStats(thread, st, &now, true, agg);
! /* next rendez-vous */
! wait = getPoissonRand(thread, throttle_delay);
! thread->throttle_trigger += wait;
! st->txn_scheduled = thread->throttle_trigger;
}
}
! st->sleeping = true;
! st->throttling = true;
! st->is_throttled = true;
if (debug)
! fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
! st->id, wait);
}
! if (st->sleeping)
! { /* are we sleeping? */
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
! return true; /* Still sleeping, nothing to do here */
! /* Else done sleeping, go ahead with next command */
! st->sleeping = false;
! st->throttling = false;
}
! if (st->listen)
! { /* are we receiver? */
! if (commands[st->state]->type == SQL_COMMAND)
{
! if (debug)
! fprintf(stderr, "client %d receiving\n", st->id);
! if (!PQconsumeInput(st->con))
! { /* there's something wrong */
! fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
! return clientDone(st);
! }
! if (PQisBusy(st->con))
! return true; /* don't have the whole result yet */
}
! /*
! * command finished: accumulate per-command execution times in
! * thread-local data structure, if per-command latencies are requested
! */
! if (is_latencies)
! {
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! /* XXX could use a mutex here, but we choose not to */
! addToSimpleStats(&commands[st->state]->stats,
! INSTR_TIME_GET_DOUBLE(now) -
! INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! }
! /* transaction finished: calculate latency and log the transaction */
! if (commands[st->state + 1] == NULL)
! {
! if (progress || throttle_delay || latency_limit ||
! per_script_stats || use_log)
! processXactStats(thread, st, &now, false, agg);
! else
! thread->stats.cnt++;
! }
! if (commands[st->state]->type == SQL_COMMAND)
{
/*
! * Read and discard the query result; note this is not included in
! * the statement latency numbers.
*/
! res = PQgetResult(st->con);
! switch (PQresultStatus(res))
! {
! case PGRES_COMMAND_OK:
! case PGRES_TUPLES_OK:
! case PGRES_EMPTY_QUERY:
! break; /* OK */
! default:
! fprintf(stderr, "client %d aborted in state %d: %s",
! st->id, st->state, PQerrorMessage(st->con));
! PQclear(res);
! return clientDone(st);
! }
! PQclear(res);
! discard_response(st);
! }
! if (commands[st->state + 1] == NULL)
! {
! if (is_connect)
! {
! PQfinish(st->con);
! st->con = NULL;
! }
! ++st->cnt;
! if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! return clientDone(st); /* exit success */
! }
! /* increment state counter */
! st->state++;
! if (commands[st->state] == NULL)
! {
! st->state = 0;
! st->use_file = chooseScript(thread);
! commands = sql_script[st->use_file].commands;
! if (debug)
! fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
! sql_script[st->use_file].desc);
! st->is_throttled = false;
/*
! * No transaction is underway anymore, which means there is
! * nothing to listen to right now. When throttling rate limits
! * are active, a sleep will happen next, as the next transaction
! * starts. And then in any case the next SQL command will set
! * listen back to true.
*/
! st->listen = false;
! trans_needs_throttle = (throttle_delay > 0);
! }
! }
! if (st->con == NULL)
! {
! instr_time start,
! end;
! INSTR_TIME_SET_CURRENT(start);
! if ((st->con = doConnect()) == NULL)
! {
! fprintf(stderr, "client %d aborted while establishing connection\n",
! st->id);
! return clientDone(st);
! }
! INSTR_TIME_SET_CURRENT(end);
! INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
!
! /* Reset session-local state */
! st->listen = false;
! st->sleeping = false;
! st->throttling = false;
! st->is_throttled = false;
! memset(st->prepared, 0, sizeof(st->prepared));
! }
! /*
! * This ensures that a throttling delay is inserted before proceeding with
! * sql commands, after the first transaction. The first transaction
! * throttling is performed when first entering doCustom.
! */
! if (trans_needs_throttle)
! {
! trans_needs_throttle = false;
! goto top;
! }
! /* Record transaction start time under logging, progress or throttling */
! if ((use_log || progress || throttle_delay || latency_limit ||
! per_script_stats) && st->state == 0)
! {
! INSTR_TIME_SET_CURRENT(st->txn_begin);
! /*
! * When not throttling, this is also the transaction's scheduled start
! * time.
! */
! if (!throttle_delay)
! st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
! }
! /* Record statement start time if per-command latencies are requested */
! if (is_latencies)
! INSTR_TIME_SET_CURRENT(st->stmt_begin);
! if (commands[st->state]->type == SQL_COMMAND)
! {
! const Command *command = commands[st->state];
! int r;
! if (querymode == QUERY_SIMPLE)
! {
! char *sql;
! sql = pg_strdup(command->argv[0]);
! sql = assignVariables(st, sql);
! if (debug)
! fprintf(stderr, "client %d sending %s\n", st->id, sql);
! r = PQsendQuery(st->con, sql);
! free(sql);
! }
! else if (querymode == QUERY_EXTENDED)
! {
! const char *sql = command->argv[0];
! const char *params[MAX_ARGS];
! getQueryParams(st, command, params);
! if (debug)
! fprintf(stderr, "client %d sending %s\n", st->id, sql);
! r = PQsendQueryParams(st->con, sql, command->argc - 1,
! NULL, params, NULL, NULL, 0);
! }
! else if (querymode == QUERY_PREPARED)
! {
! char name[MAX_PREPARE_NAME];
! const char *params[MAX_ARGS];
! if (!st->prepared[st->use_file])
! {
! int j;
! for (j = 0; commands[j] != NULL; j++)
! {
! PGresult *res;
! char name[MAX_PREPARE_NAME];
!
! if (commands[j]->type != SQL_COMMAND)
! continue;
! preparedStatementName(name, st->use_file, j);
! res = PQprepare(st->con, name,
! commands[j]->argv[0], commands[j]->argc - 1, NULL);
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! fprintf(stderr, "%s", PQerrorMessage(st->con));
! PQclear(res);
}
! st->prepared[st->use_file] = true;
! }
! getQueryParams(st, command, params);
! preparedStatementName(name, st->use_file, st->state);
! if (debug)
! fprintf(stderr, "client %d sending %s\n", st->id, name);
! r = PQsendQueryPrepared(st->con, name, command->argc - 1,
! params, NULL, NULL, 0);
! }
! else /* unknown sql mode */
! r = 0;
! if (r == 0)
! {
! if (debug)
! fprintf(stderr, "client %d could not send %s\n",
! st->id, command->argv[0]);
! st->ecnt++;
! }
! else
! st->listen = true; /* flags that should be listened */
! }
! else if (commands[st->state]->type == META_COMMAND)
! {
! int argc = commands[st->state]->argc,
! i;
! char **argv = commands[st->state]->argv;
! if (debug)
! {
! fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
! for (i = 1; i < argc; i++)
! fprintf(stderr, " %s", argv[i]);
! fprintf(stderr, "\n");
! }
! if (pg_strcasecmp(argv[0], "set") == 0)
! {
! PgBenchExpr *expr = commands[st->state]->expr;
! PgBenchValue result;
! if (!evaluateExpr(thread, st, expr, &result))
! {
! st->ecnt++;
! return true;
! }
! if (!putVariableNumber(st, argv[0], argv[1], &result))
! {
! st->ecnt++;
! return true;
! }
! st->listen = true;
! }
! else if (pg_strcasecmp(argv[0], "sleep") == 0)
! {
! char *var;
! int usec;
! instr_time now;
! if (*argv[1] == ':')
! {
! if ((var = getVariable(st, argv[1] + 1)) == NULL)
{
! fprintf(stderr, "%s: undefined variable \"%s\"\n",
! argv[0], argv[1]);
! st->ecnt++;
! return true;
}
- usec = atoi(var);
- }
- else
- usec = atoi(argv[1]);
! if (argc > 2)
! {
! if (pg_strcasecmp(argv[2], "ms") == 0)
! usec *= 1000;
! else if (pg_strcasecmp(argv[2], "s") == 0)
! usec *= 1000000;
! }
! else
! usec *= 1000000;
! INSTR_TIME_SET_CURRENT(now);
! st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
! st->sleeping = true;
! st->listen = true;
! }
! else if (pg_strcasecmp(argv[0], "setshell") == 0)
! {
! bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
! if (timer_exceeded) /* timeout */
! return clientDone(st);
! else if (!ret) /* on error */
! {
! st->ecnt++;
! return true;
! }
! else /* succeeded */
! st->listen = true;
! }
! else if (pg_strcasecmp(argv[0], "shell") == 0)
! {
! bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
! if (timer_exceeded) /* timeout */
! return clientDone(st);
! else if (!ret) /* on error */
! {
! st->ecnt++;
! return true;
! }
! else /* succeeded */
! st->listen = true;
}
-
- /* after a meta command, immediately proceed with next command */
- goto top;
}
-
- return true;
}
/*
--- 1812,2423 ----
return i - 1;
}
! /* Send a SQL command, using the chosen querymode */
static bool
! sendCommand(CState *st, Command *command)
{
! int r;
! if (querymode == QUERY_SIMPLE)
! {
! char *sql;
! sql = pg_strdup(command->argv[0]);
! sql = assignVariables(st, sql);
! if (debug)
! fprintf(stderr, "client %d sending %s\n", st->id, sql);
! r = PQsendQuery(st->con, sql);
! free(sql);
! }
! else if (querymode == QUERY_EXTENDED)
{
! const char *sql = command->argv[0];
! const char *params[MAX_ARGS];
! getQueryParams(st, command, params);
! if (debug)
! fprintf(stderr, "client %d sending %s\n", st->id, sql);
! r = PQsendQueryParams(st->con, sql, command->argc - 1,
! NULL, params, NULL, NULL, 0);
! }
! else if (querymode == QUERY_PREPARED)
! {
! char name[MAX_PREPARE_NAME];
! const char *params[MAX_ARGS];
! if (!st->prepared[st->use_file])
{
! int j;
! Command **commands = sql_script[st->use_file].commands;
! for (j = 0; commands[j] != NULL; j++)
{
! PGresult *res;
! char name[MAX_PREPARE_NAME];
!
! if (commands[j]->type != SQL_COMMAND)
! continue;
! preparedStatementName(name, st->use_file, j);
! res = PQprepare(st->con, name,
! commands[j]->argv[0], commands[j]->argc - 1, NULL);
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! fprintf(stderr, "%s", PQerrorMessage(st->con));
! PQclear(res);
}
+ st->prepared[st->use_file] = true;
}
! getQueryParams(st, command, params);
! preparedStatementName(name, st->use_file, st->command);
!
if (debug)
! fprintf(stderr, "client %d sending %s\n", st->id, name);
! r = PQsendQueryPrepared(st->con, name, command->argc - 1,
! params, NULL, NULL, 0);
}
+ else /* unknown sql mode */
+ r = 0;
! if (r == 0)
! {
! if (debug)
! fprintf(stderr, "client %d could not send %s\n",
! st->id, command->argv[0]);
! st->ecnt++;
! return false;
}
+ else
+ return true;
+ }
+
+ /*
+ * "execute" a sleep command. Parses the argument, and returns the
+ * requested amount of delay, in microseconds. Returns true on
+ * success, false on error.
+ */
+ static bool
+ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+ {
+ char *var;
+ int usec;
! if (*argv[1] == ':')
! {
! if ((var = getVariable(st, argv[1] + 1)) == NULL)
{
! fprintf(stderr, "%s: undefined variable \"%s\"\n",
! argv[0], argv[1]);
! return false;
}
+ usec = atoi(var);
+ }
+ else
+ usec = atoi(argv[1]);
! if (argc > 2)
! {
! if (pg_strcasecmp(argv[2], "ms") == 0)
! usec *= 1000;
! else if (pg_strcasecmp(argv[2], "s") == 0)
! usec *= 1000000;
! }
! else
! usec *= 1000000;
! *usecs = usec;
! return true;
! }
! /*
! * Advance the state machine of a connection, if possible.
! */
! static void
! doCustom(TState *thread, CState *st, StatsData *agg)
! {
! PGresult *res;
! Command *command;
! instr_time now;
! bool end_tx_processed = false;
!
! /*
! * gettimeofday() isn't free, so we get the current timestamp lazily the
! * first time it's needed, and reuse the same value throughout this
! * function after that. This also ensures that e.g. the calculated latency
! * reported in the log file and in the totals are the same. Zero means
! * "not set yet". Reset "now" when we execute shell commands or expressions,
! * which might take a non-neglicible amount of time, though.
! */
! INSTR_TIME_SET_ZERO(now);
! /*
! * Loop in the state machine, until we have to wait for a result from the
! * server (or have to sleep, for throttling or for \sleep).
! *
! * Note: In the switch-statement below, 'break' will loop back here,
! * meaning "continue in the state machine". Return is used to return to
! * the caller. (XXX: Would it be better to replace the 'break's with
! * 'continue's?)
! */
! for (;;)
! {
! switch(st->state)
{
/*
! * Handle throttling once per transaction by sleeping.
*/
! case CSTATE_START_THROTTLE:
! if (throttle_delay > 0)
! {
! /*
! * Generate a delay such that the series of delays will
! * approximate a Poisson distribution centered on the
! * throttle_delay time.
! *
! * If transactions are too slow or a given wait is shorter
! * than a transaction, the next transaction will start
! * right away.
! */
! int64 wait = getPoissonRand(thread, throttle_delay);
!
! thread->throttle_trigger += wait;
! st->txn_scheduled = thread->throttle_trigger;
!
! /*
! * stop client if next transaction is beyond pgbench end
! * of execution
! */
! if (duration > 0 && st->txn_scheduled > end_time)
! {
! st->state = CSTATE_FINISHED;
! break;
! }
! /*
! * If this --latency-limit is used, and this slot is
! * already late so that the transaction will miss the
! * latency limit even if it completed immediately, we skip
! * this time slot and iterate till the next slot that
! * isn't late yet.
! */
! if (latency_limit)
! {
! int64 now_us;
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! now_us = INSTR_TIME_GET_MICROSEC(now);
! while (thread->throttle_trigger < now_us - latency_limit)
! {
! processXactStats(thread, st, &now, true, agg);
! /* next rendez-vous */
! wait = getPoissonRand(thread, throttle_delay);
! thread->throttle_trigger += wait;
! st->txn_scheduled = thread->throttle_trigger;
! }
! }
! st->state = CSTATE_THROTTLE;
! if (debug)
! fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
! st->id, wait);
! }
! else
! {
! /* no throttling, go directly to CSTATE_START_TX state */
! st->state = CSTATE_START_TX;
! }
! break;
/*
! * Wait until it's time to start next transaction.
*/
! case CSTATE_THROTTLE:
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
! return; /* Still sleeping, nothing to do here */
!
! /* Else done sleeping, start the transaction */
! st->state = CSTATE_START_TX;
! break;
! /* Start new transaction */
! case CSTATE_START_TX:
! /*
! * Establish connection on first call, or if is_connect is
! * true.
! */
! if (st->con == NULL)
! {
! instr_time start;
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! start = now;
! if ((st->con = doConnect()) == NULL)
! {
! fprintf(stderr, "client %d aborted while establishing connection\n",
! st->id);
! st->state = CSTATE_ABORTED;
! break;
! }
! INSTR_TIME_SET_CURRENT(now);
! INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
! /* Reset session-local state */
! memset(st->prepared, 0, sizeof(st->prepared));
! }
! /*
! * Record transaction start time under logging, progress or
! * throttling.
! */
! if (use_log || progress || throttle_delay || latency_limit ||
! per_script_stats)
! {
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! st->txn_begin = now;
!
! /*
! * When not throttling, this is also the transaction's
! * scheduled start time.
! */
! if (!throttle_delay)
! st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
! }
! /* Begin with the first command */
! st->command = 0;
! st->state = CSTATE_START_COMMAND;
! break;
! /* Send a command to server (or execute a meta-command) */
! case CSTATE_START_COMMAND:
! command = sql_script[st->use_file].commands[st->command];
! /*
! * If we reached the end of the script, move to end-of-xact
! * processing.
! */
! if (command == NULL)
! {
! st->state = CSTATE_END_TX;
! break;
! }
! /*
! * Record statement start time if per-command latencies
! * are requested
! */
! if (is_latencies)
! {
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! st->stmt_begin = now;
! }
! if (command->type == SQL_COMMAND)
! {
! if (!sendCommand(st, command))
! {
! /*
! * Failed. Stay in CSTATE_START_COMMAND state, to
! * retry.
! */
! return;
! }
! else
! st->state = CSTATE_WAIT_RESULT;
! }
! else if (command->type == META_COMMAND)
! {
! int argc = command->argc,
! i;
! char **argv = command->argv;
! if (debug)
! {
! fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
! for (i = 1; i < argc; i++)
! fprintf(stderr, " %s", argv[i]);
! fprintf(stderr, "\n");
! }
! if (pg_strcasecmp(argv[0], "sleep") == 0)
! {
! /*
! * A \sleep doesn't execute anything, we just get the
! * delay from the argument, and enter the CSTATE_SLEEP
! * state. (The per-command latency will be recorded
! * in CSTATE_SLEEP state, not here, after the delay has
! * elapsed.)
! */
! int usec;
!
! if (!evaluateSleep(st, argc, argv, &usec))
! {
! metaCommandFailed(st);
! st->state = CSTATE_ABORTED;
! break;
! }
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
! st->state = CSTATE_SLEEP;
! break;
! }
! else
! {
! if (pg_strcasecmp(argv[0], "set") == 0)
! {
! PgBenchExpr *expr = command->expr;
! PgBenchValue result;
! if (!evaluateExpr(thread, st, expr, &result))
! {
! metaCommandFailed(st);
! st->state = CSTATE_ABORTED;
! break;
! }
! if (!putVariableNumber(st, argv[0], argv[1], &result))
! {
! metaCommandFailed(st);
! st->state = CSTATE_ABORTED;
! break;
! }
! }
! else if (pg_strcasecmp(argv[0], "setshell") == 0)
! {
! bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
!
! if (timer_exceeded) /* timeout */
! {
! st->state = CSTATE_FINISHED;
! break;
! }
! else if (!ret) /* on error */
! {
! st->state = CSTATE_ABORTED;
! metaCommandFailed(st);
! break;
! }
! else
! {
! /* succeeded */
! }
! }
! else if (pg_strcasecmp(argv[0], "shell") == 0)
! {
! bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
!
! if (timer_exceeded) /* timeout */
! {
! st->state = CSTATE_FINISHED;
! break;
! }
! else if (!ret) /* on error */
! {
! st->state = CSTATE_ABORTED;
! metaCommandFailed(st);
! break;
! }
! else
! {
! /* succeeded */
! }
! }
! /*
! * executing the expression or shell command might
! * take a non-neglicible amount of time, so reset
! * 'now'
! */
! INSTR_TIME_SET_ZERO(now);
!
! /*
! * meta-command executed: accumulate per-command
! * execution times in thread-local data structure, if
! * per-command latencies are requested
! */
! if (is_latencies)
! {
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
!
! /* XXX could use a mutex here, but we choose not to */
! addToSimpleStats(&command->stats,
! INSTR_TIME_GET_DOUBLE(now) -
! INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! }
!
! /* after a meta command, proceed with next command */
! st->command++;
! st->state = CSTATE_START_COMMAND;
! }
}
! break;
! /* Wait for the current SQL command to complete */
! case CSTATE_WAIT_RESULT:
! command = sql_script[st->use_file].commands[st->command];
! if (debug)
! fprintf(stderr, "client %d receiving\n", st->id);
! if (!PQconsumeInput(st->con))
! { /* there's something wrong */
! fprintf(stderr, "client %d aborted in command %d; perhaps the backend died while processing\n", st->id, st->command);
! st->state = CSTATE_ABORTED;
! break;
! }
! if (PQisBusy(st->con))
! return; /* don't have the whole result yet */
! /*
! * command finished: accumulate per-command execution times in
! * thread-local data structure, if per-command latencies are requested
! */
! if (is_latencies)
! {
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! /* XXX could use a mutex here, but we choose not to */
! addToSimpleStats(&command->stats,
! INSTR_TIME_GET_DOUBLE(now) -
! INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! }
! /*
! * Read and discard the query result; note this is not included in
! * the statement latency numbers.
! */
! /*
! * XXX: If I'm reading the old code correctly, this used to not be included in
! * the transaction latency either, for the last command in the transaction.
! * Now it is.
! */
! res = PQgetResult(st->con);
! switch (PQresultStatus(res))
! {
! case PGRES_COMMAND_OK:
! case PGRES_TUPLES_OK:
! case PGRES_EMPTY_QUERY:
! /* OK */
! PQclear(res);
! discard_response(st);
! st->command++;
! st->state = CSTATE_START_COMMAND;
! break;
! default:
! fprintf(stderr, "client %d aborted in command %d: %s",
! st->id, st->command, PQerrorMessage(st->con));
! PQclear(res);
! st->state = CSTATE_ABORTED;
! break;
! }
! break;
! /*
! * Wait until txn_scheduled. This state is entered after a \sleep
! * metacommand.
! */
! case CSTATE_SLEEP:
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
! return; /* Still sleeping, nothing to do here */
! /* Else done sleeping. */
! /*
! * Meta-command executed: accumulate per-command execution
! * times in thread-local data structure, if per-command
! * latencies are requested.
! */
! if (is_latencies)
! {
! if (INSTR_TIME_IS_ZERO(now))
! INSTR_TIME_SET_CURRENT(now);
! /* XXX could use a mutex here, but we choose not to */
! command = sql_script[st->use_file].commands[st->command];
! addToSimpleStats(&command->stats,
! INSTR_TIME_GET_DOUBLE(now) -
! INSTR_TIME_GET_DOUBLE(st->stmt_begin));
! }
! /* Go ahead with next command */
! st->command++;
! st->state = CSTATE_START_COMMAND;
! break;
!
! /*
! * End of transaction.
! */
! case CSTATE_END_TX:
! /* transaction finished: calculate latency and log the transaction */
! if (progress || throttle_delay || latency_limit ||
! per_script_stats || use_log)
! processXactStats(thread, st, &now, false, agg);
! else
! thread->stats.cnt++;
!
! if (is_connect)
{
! PQfinish(st->con);
! st->con = NULL;
! INSTR_TIME_SET_ZERO(now);
}
! ++st->cnt;
! if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
! {
! /* exit success */
! st->state = CSTATE_FINISHED;
! break;
! }
! /* Select next transaction to run. */
! st->use_file = chooseScript(thread);
! if (debug)
! fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
! sql_script[st->use_file].desc);
! /*
! * No transaction is underway anymore.
! */
! st->command = -1;
! st->state = CSTATE_START_THROTTLE;
! /*
! * If we paced through all commands in the script in this loop,
! * without returning to the caller even once, do it now. This
! * gives the thread a chance to process other connections, and
! * to do progress reporting. This can currently only happen
! * if the script consists entirely of meta-commands.
! */
! if (end_tx_processed)
! return;
! else
! {
! end_tx_processed = true;
! break;
! }
! /* Final states. Close the connection if it's still open. */
! case CSTATE_ABORTED:
! case CSTATE_FINISHED:
! if (st->con != NULL)
! {
! PQfinish(st->con);
! st->con = NULL;
! }
! return;
}
}
}
/*
***************
*** 4178,4206 **** threadRun(void *arg)
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
! /* send start up queries in async manner */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- int prev_ecnt = st->ecnt;
- Command **commands;
st->use_file = chooseScript(thread);
! commands = sql_script[st->use_file].commands;
! if (debug)
! fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
! sql_script[st->use_file].desc);
! if (!doCustom(thread, st, &aggs))
! remains--; /* I've aborted */
!
! if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
! {
! fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
! i, st->state);
! remains--; /* I've aborted */
! PQfinish(st->con);
! st->con = NULL;
! }
}
while (remains > 0)
--- 4416,4428 ----
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
! /* Initialize the state machines with the first script to run */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
st->use_file = chooseScript(thread);
! st->state = CSTATE_START_THROTTLE;
}
while (remains > 0)
***************
*** 4217,4275 **** threadRun(void *arg)
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
int sock;
! if (st->con == NULL)
{
continue;
}
! else if (st->sleeping)
{
! if (st->throttling && timer_exceeded)
! {
! /* interrupt client which has not started a transaction */
! remains--;
! st->sleeping = false;
! st->throttling = false;
! PQfinish(st->con);
! st->con = NULL;
! continue;
! }
! else /* just a nap from the script */
! {
! int this_usec;
!
! if (min_usec == PG_INT64_MAX)
! {
! instr_time now;
! INSTR_TIME_SET_CURRENT(now);
! now_usec = INSTR_TIME_GET_MICROSEC(now);
! }
! this_usec = st->txn_scheduled - now_usec;
! if (min_usec > this_usec)
! min_usec = this_usec;
}
}
! else if (commands[st->state]->type == META_COMMAND)
{
! min_usec = 0; /* the connection is ready to run */
break;
}
!
! sock = PQsocket(st->con);
! if (sock < 0)
{
! fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
! goto done;
}
-
- FD_SET(sock, &input_mask);
-
- if (maxsock < sock)
- maxsock = sock;
}
/* also wake up to print the next progress report on time */
--- 4439,4497 ----
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
int sock;
! if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
+ /* interrupt client which has not started a transaction */
+ st->state = CSTATE_FINISHED;
+ remains--;
+ PQfinish(st->con);
+ st->con = NULL;
continue;
}
! else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
! /* a nap from the script, or sleep for throttling */
! int this_usec;
! if (min_usec == PG_INT64_MAX)
! {
! instr_time now;
! INSTR_TIME_SET_CURRENT(now);
! now_usec = INSTR_TIME_GET_MICROSEC(now);
}
+
+ this_usec = st->txn_scheduled - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
}
! else if (st->state == CSTATE_WAIT_RESULT)
{
! /*
! * waiting for result from server - nothing to do unless the
! * socket is readable
! */
! sock = PQsocket(st->con);
! if (sock < 0)
! {
! fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
! goto done;
! }
!
! FD_SET(sock, &input_mask);
!
! if (maxsock < sock)
! maxsock = sock;
break;
}
! else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
{
! /* the connection is ready to run */
! min_usec = 0;
! break;
}
}
/* also wake up to print the next progress report on time */
***************
*** 4295,4301 **** threadRun(void *arg)
* specified in the script ends, or it's time to print a progress
* report.
*/
! if (min_usec > 0 && maxsock != -1)
{
int nsocks; /* return from select(2) */
--- 4517,4523 ----
* specified in the script ends, or it's time to print a progress
* report.
*/
! if (min_usec > 0 || maxsock != -1)
{
int nsocks; /* return from select(2) */
***************
*** 4319,4332 **** threadRun(void *arg)
}
}
! /* ok, backend returns reply */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
! Command **commands = sql_script[st->use_file].commands;
! int prev_ecnt = st->ecnt;
! if (st->con)
{
int sock = PQsocket(st->con);
--- 4541,4553 ----
}
}
! /* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
! bool ready;
! if (st->state == CSTATE_WAIT_RESULT && st->con)
{
int sock = PQsocket(st->con);
***************
*** 4336,4356 **** threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
! if (FD_ISSET(sock, &input_mask) ||
! commands[st->state]->type == META_COMMAND)
! {
! if (!doCustom(thread, st, &aggs))
! remains--; /* I've aborted */
! }
}
! if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
! fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
! i, st->state);
! remains--; /* I've aborted */
! PQfinish(st->con);
! st->con = NULL;
}
}
--- 4557,4575 ----
PQerrorMessage(st->con));
goto done;
}
!
! ready = FD_ISSET(sock, &input_mask);
}
+ else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ ready = false;
+ else
+ ready = true;
! if (ready)
{
! doCustom(thread, st, &aggs);
! if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
! remains--;
}
}
Hello Heikki,
Yeah, it really is quite a mess. I tried to review your patch, and I think
it's correct, but I couldn't totally convince myself, because of the existing
messiness of the logic.
Alas:-(
So I bit the bullet and started refactoring.
Wow!
I came up with the attached. It refactors the logic in doCustom() into a
state machine.
Sounds good! This can only help.
I think this is much clearer, what do you think?
I think that something was really needed. I'm going to review and test
this patch very carefully, probably over next week-end, and report.
--
Fabien.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello Heikki,
Yeah, it really is quite a mess. I tried to review your patch, and I think
it's correct, but I couldn't totally convince myself, because of the existing
messiness of the logic. So I bit the bullet and started refactoring.I came up with the attached. It refactors the logic in doCustom() into a
state machine. I think this is much clearer, what do you think?
The patch did not apply to master because of you committed the sleep fix
in between. I updated the patch so that the fix is included as well.
I think that this is really needed. The code is much clearer and simple to
understand with the state machines & additional functions. This is a
definite improvement to the code base.
I've done quite some testing with various options (-r, --rate,
--latency-limit, -C...) and got pretty reasonnable results.
Although I cannot be absolutely sure that the refactoring does not
introduce any new bug, I'm convinced that it will be much easier to find
them:-)
Attached are some small changes to your version:
I have added the sleep_until fix.
I have fixed a bug introduced in the patch by changing && by || in the
(min_sec > 0 && maxsock != -1) condition which was inducing errors with
multi-threads & clients...
I have factored out several error messages in "commandFailed", in place of
the "metaCommandFailed", and added the script number as well in the error
messages. All messages are now specific to the failed command.
I have added two states to the machine:
- CSTATE_CHOOSE_SCRIPT which simplifies threadRun, there is now one call
to chooseScript instead of two before.
- CSTATE_END_COMMAND which manages is_latencies and proceeding to the
next command, thus merging the three instances of updating the stats
that were in the first version.
The later state means that processing query results is included in the per
statement latency, which is an improvement because before I was getting
some transaction latency significantly larger that the apparent sum of the
per-statement latencies, which did not make much sense...
I have added & updated a few comments. There are some places where the
break could be a pass through instead, not sure how desirable it is, I'm
fine with break.
Well, the comment right there says "note this is not included in the
statement latency numbers", so apparently it's intentional. Whether it's a
good idea or not, I don't know :-). It does seem a bit surprising.
Indeed, it also results in apparently inconsistent numbers, and it creates
a mess for recording the statement latency because it meant that in some
case the latency was collected before the actual end of the command, see
the discussion about CSTATE_END_COMMAND above.
But what seems more bogus to me is that we do that after recording the
*transaction* stats, if this was the last command. So the PQgetResult() of
the last command in the transaction is not included in the transaction stats,
even though the PQgetResult() calls for any previous commands are. (Perhaps
that's what you meant too?)I changed that in my patch, it would've been inconvenient to keep that old
behavior, and it doesn't make any sense to me anyway.
Fine with me.
--
Fabien.
Attachments:
pgbench-refactor-2.patchtext/x-diff; name=pgbench-refactor-2.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 8b24ad5..502e644 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -235,25 +235,97 @@ typedef struct StatsData
} StatsData;
/*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+ /*
+ * The client must first choose a script to execute. Once chosen, it can
+ * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+ * right away (state CSTATE_START_TX).
+ */
+ CSTATE_CHOOSE_SCRIPT,
+
+ /*
+ * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+ * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
+ * sleeps until that moment. (If throttling is not enabled, doCustom()
+ * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+ */
+ CSTATE_START_THROTTLE,
+ CSTATE_THROTTLE,
+
+ /*
+ * CSTATE_START_TX performs start-of-transaction processing. Establishes
+ * a new connection for the transaction, in --connect mode, and records
+ * the transaction start time.
+ */
+ CSTATE_START_TX,
+
+ /*
+ * We loop through these states, to process each command in the
+ * script:
+ *
+ * CSTATE_START_COMMAND starts the execution of a command. On a SQL
+ * command, the command is sent to the server, and we move to
+ * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is
+ * set, and we enter the CSTATE_SLEEP state to wait for it to expire.
+ * Other meta-commands are executed immediately, and we proceed to next
+ * command.
+ *
+ * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+ * for the current command, and proceeds to CSTATE_END_COMMAND.
+ *
+ * CSTATE_SLEEP waits until the end of \sleep and proceeds to CSTATE_END_COMMAND.
+ *
+ * CSTATE_END_COMMAND does some house keeping stats, then jumps back to
+ * start the next command with CSTATE_START_COMMAND.
+ */
+ CSTATE_START_COMMAND,
+ CSTATE_WAIT_RESULT,
+ CSTATE_SLEEP,
+ CSTATE_END_COMMAND,
+
+ /*
+ * CSTATE_END_TX performs end-of-transaction processing. Calculates
+ * latency, and logs the transaction. In --connect mode, closes the
+ * current connection. Chooses the next script to execute and starts
+ * over in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we
+ * have no more work to do.
+ */
+ CSTATE_END_TX,
+
+ /*
+ * Final states. CSTATE_ABORTED means that the script execution was
+ * aborted because a command failed, CSTATE_FINISHED means success.
+ */
+ CSTATE_ABORTED,
+ CSTATE_FINISHED,
+} ConnectionStateEnum;
+
+/*
+ * Connection state.
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
- int state; /* state No. */
- bool listen; /* whether an async query has been sent */
- bool sleeping; /* whether the client is napping */
- bool throttling; /* whether nap is for throttling */
- bool is_throttled; /* whether transaction throttling is done */
+ ConnectionStateEnum state; /* state machine's current state. */
+
+ int use_file; /* index in sql_script for this client */
+ int command; /* command number in script */
+
+ /* client variables */
Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
+
+ /* various times about current transaction */
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
int64 sleep_until; /* scheduled start time of next cmd (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
- int use_file; /* index in sql_scripts for this client */
+
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */
@@ -1382,7 +1454,7 @@ evalFunc(TState *thread, CState *st,
Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ",
- st->use_file, st->state + 1);
+ st->use_file, st->command + 1);
if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
@@ -1733,15 +1805,12 @@ preparedStatementName(char *buffer, int file, int state)
sprintf(buffer, "P%d_%d", file, state);
}
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
{
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
- return false; /* always false */
+ fprintf(stderr,
+ "client %d aborted in command %d of script %d; %s\n",
+ st->id, st->command, st->use_file, message);
}
/* return a script number with a weighted choice. */
@@ -1763,425 +1832,593 @@ chooseScript(TState *thread)
return i - 1;
}
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
static bool
+sendCommand(CState *st, Command *command)
+{
+ int r;
+
+ if (querymode == QUERY_SIMPLE)
+ {
+ char *sql;
+
+ sql = pg_strdup(command->argv[0]);
+ sql = assignVariables(st, sql);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQuery(st->con, sql);
+ free(sql);
+ }
+ else if (querymode == QUERY_EXTENDED)
+ {
+ const char *sql = command->argv[0];
+ const char *params[MAX_ARGS];
+
+ getQueryParams(st, command, params);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQueryParams(st->con, sql, command->argc - 1,
+ NULL, params, NULL, NULL, 0);
+ }
+ else if (querymode == QUERY_PREPARED)
+ {
+ char name[MAX_PREPARE_NAME];
+ const char *params[MAX_ARGS];
+
+ if (!st->prepared[st->use_file])
+ {
+ int j;
+ Command **commands = sql_script[st->use_file].commands;
+
+ for (j = 0; commands[j] != NULL; j++)
+ {
+ PGresult *res;
+ char name[MAX_PREPARE_NAME];
+
+ if (commands[j]->type != SQL_COMMAND)
+ continue;
+ preparedStatementName(name, st->use_file, j);
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ fprintf(stderr, "%s", PQerrorMessage(st->con));
+ PQclear(res);
+ }
+ st->prepared[st->use_file] = true;
+ }
+
+ getQueryParams(st, command, params);
+ preparedStatementName(name, st->use_file, st->command);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, name);
+ r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+ params, NULL, NULL, 0);
+ }
+ else /* unknown sql mode */
+ r = 0;
+
+ if (r == 0)
+ {
+ if (debug)
+ fprintf(stderr, "client %d could not send %s\n",
+ st->id, command->argv[0]);
+ st->ecnt++;
+ return false;
+ }
+ else
+ return true;
+}
+
+/*
+ * "execute" a sleep command. Parses the argument, and returns the
+ * requested amount of delay, in microseconds. Returns true on
+ * success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+ char *var;
+ int usec;
+
+ if (*argv[1] == ':')
+ {
+ if ((var = getVariable(st, argv[1] + 1)) == NULL)
+ {
+ fprintf(stderr, "%s: undefined variable \"%s\"\n",
+ argv[0], argv[1]);
+ return false;
+ }
+ usec = atoi(var);
+ }
+ else
+ usec = atoi(argv[1]);
+
+ if (argc > 2)
+ {
+ if (pg_strcasecmp(argv[2], "ms") == 0)
+ usec *= 1000;
+ else if (pg_strcasecmp(argv[2], "s") == 0)
+ usec *= 1000000;
+ }
+ else
+ usec *= 1000000;
+
+ *usecs = usec;
+ return true;
+}
+
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
doCustom(TState *thread, CState *st, StatsData *agg)
{
PGresult *res;
- Command **commands;
- bool trans_needs_throttle = false;
+ Command *command;
instr_time now;
+ bool end_tx_processed = false;
+ int64 wait;
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
* first time it's needed, and reuse the same value throughout this
- * function after that. This also ensures that e.g. the calculated latency
+ * function after that. This also ensures that e.g. the calculated latency
* reported in the log file and in the totals are the same. Zero means
- * "not set yet". Reset "now" when we step to the next command with "goto
- * top", though.
+ * "not set yet". Reset "now" when we execute shell commands or expressions,
+ * which might take a non-neglicible amount of time, though.
*/
-top:
INSTR_TIME_SET_ZERO(now);
- commands = sql_script[st->use_file].commands;
-
/*
- * Handle throttling once per transaction by sleeping. It is simpler to
- * do this here rather than at the end, because so much complicated logic
- * happens below when statements finish.
+ * Loop in the state machine, until we have to wait for a result from the
+ * server (or have to sleep, for throttling or for \sleep).
+ *
+ * Note: In the switch-statement below, 'break' will loop back here,
+ * meaning "continue in the state machine". Return is used to return to
+ * the caller. (XXX: Would it be better to replace the 'break's with
+ * 'continue's?)
*/
- if (throttle_delay && !st->is_throttled)
+ for (;;)
{
- /*
- * Generate a delay such that the series of delays will approximate a
- * Poisson distribution centered on the throttle_delay time.
- *
- * If transactions are too slow or a given wait is shorter than a
- * transaction, the next transaction will start right away.
- */
- int64 wait = getPoissonRand(thread, throttle_delay);
-
- thread->throttle_trigger += wait;
- st->txn_scheduled = thread->throttle_trigger;
-
- /* stop client if next transaction is beyond pgbench end of execution */
- if (duration > 0 && st->txn_scheduled > end_time)
- return clientDone(st);
-
- /*
- * If this --latency-limit is used, and this slot is already late so
- * that the transaction will miss the latency limit even if it
- * completed immediately, we skip this time slot and iterate till the
- * next slot that isn't late yet.
- */
- if (latency_limit)
+ switch(st->state)
{
- int64 now_us;
-
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
- while (thread->throttle_trigger < now_us - latency_limit)
- {
- processXactStats(thread, st, &now, true, agg);
- /* next rendez-vous */
+ /*
+ * Select transaction to run.
+ */
+ case CSTATE_CHOOSE_SCRIPT:
+
+ st->use_file = chooseScript(thread);
+
+ if (debug)
+ fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+ sql_script[st->use_file].desc);
+
+ st->state = (throttle_delay > 0) ? CSTATE_START_THROTTLE : CSTATE_START_TX;
+
+ break;
+
+ /*
+ * Handle throttling once per transaction by sleeping.
+ */
+ case CSTATE_START_THROTTLE:
+ /*
+ * Generate a delay such that the series of delays will
+ * approximate a Poisson distribution centered on the
+ * throttle_delay time.
+ *
+ * If transactions are too slow or a given wait is shorter
+ * than a transaction, the next transaction will start
+ * right away.
+ */
+ /* Assert(throttle_delay > 0); */
wait = getPoissonRand(thread, throttle_delay);
+
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
- }
- }
-
- st->sleep_until = st->txn_scheduled;
- st->sleeping = true;
- st->throttling = true;
- st->is_throttled = true;
- if (debug)
- fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
- st->id, wait);
- }
- if (st->sleeping)
- { /* are we sleeping? */
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
- return true; /* Still sleeping, nothing to do here */
- /* Else done sleeping, go ahead with next command */
- st->sleeping = false;
- st->throttling = false;
- }
+ /*
+ * stop client if next transaction is beyond pgbench end
+ * of execution
+ */
+ if (duration > 0 && st->txn_scheduled > end_time)
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
- if (st->listen)
- { /* are we receiver? */
- if (commands[st->state]->type == SQL_COMMAND)
- {
- if (debug)
- fprintf(stderr, "client %d receiving\n", st->id);
- if (!PQconsumeInput(st->con))
- { /* there's something wrong */
- fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
- return clientDone(st);
- }
- if (PQisBusy(st->con))
- return true; /* don't have the whole result yet */
- }
+ /*
+ * If this --latency-limit is used, and this slot is
+ * already late so that the transaction will miss the
+ * latency limit even if it completed immediately, we skip
+ * this time slot and iterate till the next slot that
+ * isn't late yet.
+ */
+ if (latency_limit)
+ {
+ int64 now_us;
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ while (thread->throttle_trigger < now_us - latency_limit)
+ {
+ processXactStats(thread, st, &now, true, agg);
+ /* next rendez-vous */
+ wait = getPoissonRand(thread, throttle_delay);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
+ }
+ }
- /*
- * command finished: accumulate per-command execution times in
- * thread-local data structure, if per-command latencies are requested
- */
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
-
- /* XXX could use a mutex here, but we choose not to */
- addToSimpleStats(&commands[st->state]->stats,
- INSTR_TIME_GET_DOUBLE(now) -
- INSTR_TIME_GET_DOUBLE(st->stmt_begin));
- }
+ st->state = CSTATE_THROTTLE;
+ if (debug)
+ fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+ st->id, wait);
- /* transaction finished: calculate latency and log the transaction */
- if (commands[st->state + 1] == NULL)
- {
- if (progress || throttle_delay || latency_limit ||
- per_script_stats || use_log)
- processXactStats(thread, st, &now, false, agg);
- else
- thread->stats.cnt++;
- }
+ /* could pass through? could return? */
+ break;
- if (commands[st->state]->type == SQL_COMMAND)
- {
/*
- * Read and discard the query result; note this is not included in
- * the statement latency numbers.
+ * Wait until it's time to start next transaction.
*/
- res = PQgetResult(st->con);
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- case PGRES_EMPTY_QUERY:
- break; /* OK */
- default:
- fprintf(stderr, "client %d aborted in state %d: %s",
- st->id, st->state, PQerrorMessage(st->con));
- PQclear(res);
- return clientDone(st);
- }
- PQclear(res);
- discard_response(st);
- }
+ case CSTATE_THROTTLE:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+ return; /* Still sleeping, nothing to do here */
+
+ /* Else done sleeping, start the transaction */
+ st->state = CSTATE_START_TX;
+
+ /* could pass through */
+ break;
+
+ /* Start new transaction */
+ case CSTATE_START_TX:
+ /*
+ * Establish connection on first call, or if is_connect is
+ * true.
+ */
+ if (st->con == NULL)
+ {
+ instr_time start;
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ start = now;
+ if ((st->con = doConnect()) == NULL)
+ {
+ fprintf(stderr, "client %d aborted while establishing connection\n",
+ st->id);
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
+
+ /* Reset session-local state */
+ memset(st->prepared, 0, sizeof(st->prepared));
+ }
- if (commands[st->state + 1] == NULL)
- {
- if (is_connect)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
-
- ++st->cnt;
- if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
- return clientDone(st); /* exit success */
- }
+ /*
+ * Record transaction start time under logging, progress or
+ * throttling.
+ */
+ if (use_log || progress || throttle_delay || latency_limit ||
+ per_script_stats)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->txn_begin = now;
+
+ /*
+ * When not throttling, this is also the transaction's
+ * scheduled start time.
+ */
+ if (!throttle_delay)
+ st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+ }
+
+ /* Begin with the first command */
+ st->command = 0;
+ st->state = CSTATE_START_COMMAND;
- /* increment state counter */
- st->state++;
- if (commands[st->state] == NULL)
- {
- st->state = 0;
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- st->is_throttled = false;
+ /* could pass through */
+ break;
/*
- * No transaction is underway anymore, which means there is
- * nothing to listen to right now. When throttling rate limits
- * are active, a sleep will happen next, as the next transaction
- * starts. And then in any case the next SQL command will set
- * listen back to true.
+ * Send a command to server (or execute a meta-command)
*/
- st->listen = false;
- trans_needs_throttle = (throttle_delay > 0);
- }
- }
-
- if (st->con == NULL)
- {
- instr_time start,
- end;
-
- INSTR_TIME_SET_CURRENT(start);
- if ((st->con = doConnect()) == NULL)
- {
- fprintf(stderr, "client %d aborted while establishing connection\n",
- st->id);
- return clientDone(st);
- }
- INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
- /* Reset session-local state */
- st->listen = false;
- st->sleeping = false;
- st->throttling = false;
- st->is_throttled = false;
- memset(st->prepared, 0, sizeof(st->prepared));
- }
-
- /*
- * This ensures that a throttling delay is inserted before proceeding with
- * sql commands, after the first transaction. The first transaction
- * throttling is performed when first entering doCustom.
- */
- if (trans_needs_throttle)
- {
- trans_needs_throttle = false;
- goto top;
- }
-
- /* Record transaction start time under logging, progress or throttling */
- if ((use_log || progress || throttle_delay || latency_limit ||
- per_script_stats) && st->state == 0)
- {
- INSTR_TIME_SET_CURRENT(st->txn_begin);
-
- /*
- * When not throttling, this is also the transaction's scheduled start
- * time.
- */
- if (!throttle_delay)
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
- }
-
- /* Record statement start time if per-command latencies are requested */
- if (is_latencies)
- INSTR_TIME_SET_CURRENT(st->stmt_begin);
-
- if (commands[st->state]->type == SQL_COMMAND)
- {
- const Command *command = commands[st->state];
- int r;
-
- if (querymode == QUERY_SIMPLE)
- {
- char *sql;
+ case CSTATE_START_COMMAND:
+ command = sql_script[st->use_file].commands[st->command];
+
+ /*
+ * If we reached the end of the script, move to end-of-xact
+ * processing.
+ */
+ if (command == NULL)
+ {
+ st->state = CSTATE_END_TX;
+ break;
+ }
- sql = pg_strdup(command->argv[0]);
- sql = assignVariables(st, sql);
+ /*
+ * Record statement start time if per-command latencies
+ * are requested
+ */
+ if (is_latencies)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->stmt_begin = now;
+ }
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQuery(st->con, sql);
- free(sql);
- }
- else if (querymode == QUERY_EXTENDED)
- {
- const char *sql = command->argv[0];
- const char *params[MAX_ARGS];
+ if (command->type == SQL_COMMAND)
+ {
+ if (!sendCommand(st, command))
+ {
+ /*
+ * Failed. Stay in CSTATE_START_COMMAND state, to retry.
+ * ??? What the point or retrying? Should rather abort?
+ */
+ return;
+ }
+ else
+ st->state = CSTATE_WAIT_RESULT;
+ }
+ else if (command->type == META_COMMAND)
+ {
+ int argc = command->argc,
+ i;
+ char **argv = command->argv;
+
+ if (debug)
+ {
+ fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+ for (i = 1; i < argc; i++)
+ fprintf(stderr, " %s", argv[i]);
+ fprintf(stderr, "\n");
+ }
+
+ if (pg_strcasecmp(argv[0], "sleep") == 0)
+ {
+ /*
+ * A \sleep doesn't execute anything, we just get the
+ * delay from the argument, and enter the CSTATE_SLEEP
+ * state. (The per-command latency will be recorded
+ * in CSTATE_SLEEP state, not here, after the delay has
+ * elapsed.)
+ */
+ int usec;
+
+ if (!evaluateSleep(st, argc, argv, &usec))
+ {
+ commandFailed(st, "execution of meta-command 'sleep' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+ st->state = CSTATE_SLEEP;
+ break;
+ }
+ else
+ {
+ if (pg_strcasecmp(argv[0], "set") == 0)
+ {
+ PgBenchExpr *expr = command->expr;
+ PgBenchValue result;
+
+ if (!evaluateExpr(thread, st, expr, &result))
+ {
+ commandFailed(st, "evaluation of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+
+ if (!putVariableNumber(st, argv[0], argv[1], &result))
+ {
+ commandFailed(st, "assignment of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "setshell") == 0)
+ {
+ bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'setshell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "shell") == 0)
+ {
+ bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'shell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+ /*
+ * executing the expression or shell command might
+ * take a non-neglicible amount of time, so reset
+ * 'now'
+ */
+ INSTR_TIME_SET_ZERO(now);
+
+ st->state = CSTATE_END_COMMAND;
+ }
+ }
+ break;
- getQueryParams(st, command, params);
+ /*
+ * Wait for the current SQL command to complete
+ */
+ case CSTATE_WAIT_RESULT:
+ command = sql_script[st->use_file].commands[st->command];
+ if (debug)
+ fprintf(stderr, "client %d receiving\n", st->id);
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ commandFailed(st, "perhaps the backend died while processing");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
+
+ /*
+ * Read and discard the query result;
+ */
+ res = PQgetResult(st->con);
+ switch (PQresultStatus(res))
+ {
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_EMPTY_QUERY:
+ /* OK */
+ PQclear(res);
+ discard_response(st);
+ st->state = CSTATE_END_COMMAND;
+ break;
+ default:
+ commandFailed(st, PQerrorMessage(st->con));
+ PQclear(res);
+ st->state = CSTATE_ABORTED;
+ break;
+ }
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQueryParams(st->con, sql, command->argc - 1,
- NULL, params, NULL, NULL, 0);
- }
- else if (querymode == QUERY_PREPARED)
- {
- char name[MAX_PREPARE_NAME];
- const char *params[MAX_ARGS];
+ break;
- if (!st->prepared[st->use_file])
- {
- int j;
+ /*
+ * Wait until sleep is done.
+ * This state is entered after a \sleep metacommand.
+ * The behavior is similar to CSTATE_THROTTLE, but proceeds
+ * to CSTATE_START_COMMAND instead of CSTATE_START_TX.
+ */
+ case CSTATE_SLEEP:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+ return; /* Still sleeping, nothing to do here */
+ /* Else done sleeping. */
+ st->state = CSTATE_END_COMMAND;
+ break;
- for (j = 0; commands[j] != NULL; j++)
+ /*
+ * End of command: do some house keeping and proceed to next.
+ */
+ case CSTATE_END_COMMAND:
+ /*
+ * command executed: accumulate per-command execution
+ * times in thread-local data structure, if per-command
+ * latencies are requested.
+ */
+ if (is_latencies)
{
- PGresult *res;
- char name[MAX_PREPARE_NAME];
-
- if (commands[j]->type != SQL_COMMAND)
- continue;
- preparedStatementName(name, st->use_file, j);
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- fprintf(stderr, "%s", PQerrorMessage(st->con));
- PQclear(res);
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+
+ /* XXX could use a mutex here, but we choose not to */
+ command = sql_script[st->use_file].commands[st->command];
+ addToSimpleStats(&command->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
- st->prepared[st->use_file] = true;
- }
-
- getQueryParams(st, command, params);
- preparedStatementName(name, st->use_file, st->state);
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, name);
- r = PQsendQueryPrepared(st->con, name, command->argc - 1,
- params, NULL, NULL, 0);
- }
- else /* unknown sql mode */
- r = 0;
-
- if (r == 0)
- {
- if (debug)
- fprintf(stderr, "client %d could not send %s\n",
- st->id, command->argv[0]);
- st->ecnt++;
- }
- else
- st->listen = true; /* flags that should be listened */
- }
- else if (commands[st->state]->type == META_COMMAND)
- {
- int argc = commands[st->state]->argc,
- i;
- char **argv = commands[st->state]->argv;
-
- if (debug)
- {
- fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
- for (i = 1; i < argc; i++)
- fprintf(stderr, " %s", argv[i]);
- fprintf(stderr, "\n");
- }
+ /* Go ahead with next command */
+ st->command++;
+ st->state = CSTATE_START_COMMAND;
+ break;
- if (pg_strcasecmp(argv[0], "set") == 0)
- {
- PgBenchExpr *expr = commands[st->state]->expr;
- PgBenchValue result;
+ /*
+ * End of transaction.
+ */
+ case CSTATE_END_TX:
+ /* transaction finished: calculate latency and log the transaction */
+ if (progress || throttle_delay || latency_limit ||
+ per_script_stats || use_log)
+ processXactStats(thread, st, &now, false, agg);
+ else
+ thread->stats.cnt++;
+
+ if (is_connect)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ INSTR_TIME_SET_ZERO(now);
+ }
- if (!evaluateExpr(thread, st, expr, &result))
- {
- st->ecnt++;
- return true;
- }
+ ++st->cnt;
+ if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+ {
+ /* exit success */
+ st->state = CSTATE_FINISHED;
+ break;
+ }
- if (!putVariableNumber(st, argv[0], argv[1], &result))
- {
- st->ecnt++;
- return true;
- }
+ /*
+ * No transaction is underway anymore.
+ */
+ st->state = CSTATE_CHOOSE_SCRIPT;
+
+ /*
+ * If we paced through all commands in the script in this loop,
+ * without returning to the caller even once, do it now. This
+ * gives the thread a chance to process other connections, and
+ * to do progress reporting. This can currently only happen
+ * if the script consists entirely of meta-commands.
+ */
+ if (end_tx_processed)
+ return;
+ else
+ {
+ end_tx_processed = true;
+ break;
+ }
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "sleep") == 0)
- {
- char *var;
- int usec;
- instr_time now;
-
- if (*argv[1] == ':')
- {
- if ((var = getVariable(st, argv[1] + 1)) == NULL)
+ /*
+ * Final states. Close the connection if it's still open.
+ */
+ case CSTATE_ABORTED:
+ case CSTATE_FINISHED:
+ if (st->con != NULL)
{
- fprintf(stderr, "%s: undefined variable \"%s\"\n",
- argv[0], argv[1]);
- st->ecnt++;
- return true;
+ PQfinish(st->con);
+ st->con = NULL;
}
- usec = atoi(var);
- }
- else
- usec = atoi(argv[1]);
-
- if (argc > 2)
- {
- if (pg_strcasecmp(argv[2], "ms") == 0)
- usec *= 1000;
- else if (pg_strcasecmp(argv[2], "s") == 0)
- usec *= 1000000;
- }
- else
- usec *= 1000000;
-
- INSTR_TIME_SET_CURRENT(now);
- st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
- st->sleeping = true;
-
- st->listen = true;
+ return;
}
- else if (pg_strcasecmp(argv[0], "setshell") == 0)
- {
- bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
-
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "shell") == 0)
- {
- bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
-
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
-
- /* after a meta command, immediately proceed with next command */
- goto top;
}
-
- return true;
}
/*
@@ -4183,29 +4420,10 @@ threadRun(void *arg)
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
- /* send start up queries in async manner */
+ /* initialize explicitely the state machines */
for (i = 0; i < nstate; i++)
{
- CState *st = &state[i];
- int prev_ecnt = st->ecnt;
- Command **commands;
-
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
-
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
- {
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- }
+ state[i].state = CSTATE_CHOOSE_SCRIPT;
}
while (remains > 0)
@@ -4222,59 +4440,60 @@ threadRun(void *arg)
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
int sock;
- if (st->con == NULL)
+ if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
+ /* interrupt client which has not started a transaction */
+ st->state = CSTATE_FINISHED;
+ remains--;
+ PQfinish(st->con);
+ st->con = NULL;
continue;
}
- else if (st->sleeping)
+ else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
- if (st->throttling && timer_exceeded)
- {
- /* interrupt client which has not started a transaction */
- remains--;
- st->sleeping = false;
- st->throttling = false;
- PQfinish(st->con);
- st->con = NULL;
- continue;
- }
- else /* just a nap from the script */
- {
- int this_usec;
-
- if (min_usec == PG_INT64_MAX)
- {
- instr_time now;
+ /* a nap from the script, or under throttling */
+ int this_usec;
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
- }
+ if (min_usec == PG_INT64_MAX)
+ {
+ instr_time now;
- this_usec = st->txn_scheduled - now_usec;
- if (min_usec > this_usec)
- min_usec = this_usec;
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
}
+
+ this_usec = (st->state == CSTATE_SLEEP ?
+ st->sleep_until : st->txn_scheduled) - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
}
- else if (commands[st->state]->type == META_COMMAND)
+ else if (st->state == CSTATE_WAIT_RESULT)
{
- min_usec = 0; /* the connection is ready to run */
+ /*
+ * waiting for result from server - nothing to do unless the
+ * socket is readable
+ */
+ sock = PQsocket(st->con);
+ if (sock < 0)
+ {
+ fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+ goto done;
+ }
+
+ FD_SET(sock, &input_mask);
+
+ if (maxsock < sock)
+ maxsock = sock;
break;
}
-
- sock = PQsocket(st->con);
- if (sock < 0)
+ else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
{
- fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
- goto done;
+ /* the connection is ready to run */
+ min_usec = 0;
+ break;
}
-
- FD_SET(sock, &input_mask);
-
- if (maxsock < sock)
- maxsock = sock;
}
/* also wake up to print the next progress report on time */
@@ -4324,14 +4543,13 @@ threadRun(void *arg)
}
}
- /* ok, backend returns reply */
+ /* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
- int prev_ecnt = st->ecnt;
+ bool ready;
- if (st->con)
+ if (st->state == CSTATE_WAIT_RESULT && st->con)
{
int sock = PQsocket(st->con);
@@ -4341,21 +4559,19 @@ threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
- if (FD_ISSET(sock, &input_mask) ||
- commands[st->state]->type == META_COMMAND)
- {
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
- }
+
+ ready = FD_ISSET(sock, &input_mask);
}
+ else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ ready = false;
+ else
+ ready = true;
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+ if (ready)
{
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
+ doCustom(thread, st, &aggs);
+ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ remains--;
}
}
On 09/24/2016 12:45 PM, Fabien COELHO wrote:
Although I cannot be absolutely sure that the refactoring does not
introduce any new bug, I'm convinced that it will be much easier to find
them:-)
:-)
Attached are some small changes to your version:
I have added the sleep_until fix.
I have fixed a bug introduced in the patch by changing && by || in the
(min_sec > 0 && maxsock != -1) condition which was inducing errors with
multi-threads & clients...I have factored out several error messages in "commandFailed", in place of
the "metaCommandFailed", and added the script number as well in the error
messages. All messages are now specific to the failed command.I have added two states to the machine:
- CSTATE_CHOOSE_SCRIPT which simplifies threadRun, there is now one call
to chooseScript instead of two before.- CSTATE_END_COMMAND which manages is_latencies and proceeding to the
next command, thus merging the three instances of updating the stats
that were in the first version.The later state means that processing query results is included in the per
statement latency, which is an improvement because before I was getting
some transaction latency significantly larger that the apparent sum of the
per-statement latencies, which did not make much sense...
Ok. I agree that makes more sense.
I have added & updated a few comments.
Thanks! Committed.
There are some places where the break could be a pass through
instead, not sure how desirable it is, I'm fine with break.
I left them as "break". Pass-throughs are error-prone, and make it more
difficult to read, IMHO. The compiler will optimize it into a
pass-through anyway, if possible and worthwhile, so there should be no
performance difference.
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Sep 26, 2016 at 1:01 AM, Heikki Linnakangas <hlinnaka@iki.fi> wrote:
On 09/24/2016 12:45 PM, Fabien COELHO wrote:
Attached are some small changes to your version:
I have added the sleep_until fix.
I have fixed a bug introduced in the patch by changing && by || in the
(min_sec > 0 && maxsock != -1) condition which was inducing errors with
multi-threads & clients...I have factored out several error messages in "commandFailed", in place of
the "metaCommandFailed", and added the script number as well in the error
messages. All messages are now specific to the failed command.I have added two states to the machine:
- CSTATE_CHOOSE_SCRIPT which simplifies threadRun, there is now one call
to chooseScript instead of two before.- CSTATE_END_COMMAND which manages is_latencies and proceeding to the
next command, thus merging the three instances of updating the stats
that were in the first version.The later state means that processing query results is included in the per
statement latency, which is an improvement because before I was getting
some transaction latency significantly larger that the apparent sum of the
per-statement latencies, which did not make much sense...Ok. I agree that makes more sense.
I have added & updated a few comments.
Thanks! Committed.
There are some places where the break could be a pass through
instead, not sure how desirable it is, I'm fine with break.
I left them as "break". Pass-throughs are error-prone, and make it more
difficult to read, IMHO. The compiler will optimize it into a pass-through
anyway, if possible and worthwhile, so there should be no performance
difference.
Since this commit (12788ae49e1933f463bc5), if I use the --rate to throttle
the transaction rate, it does get throttled to about the indicated speed,
but the pg_bench consumes the entire CPU.
At the block of code starting
if (min_usec > 0 && maxsock != -1)
If maxsock == -1, then there is no sleep happening.
Cheers,
Jeff
Hello Jeff,
I have fixed a bug introduced in the patch by changing && by || in the
(min_sec > 0 && maxsock != -1) condition which was inducing errors with
multi-threads & clients...
Since this commit (12788ae49e1933f463bc5), if I use the --rate to throttle
the transaction rate, it does get throttled to about the indicated speed,
but the pg_bench consumes the entire CPU.At the block of code starting
if (min_usec > 0 && maxsock != -1)If maxsock == -1, then there is no sleep happening.
Argh, shame on me:-(
I cannot find the "induced errors" I was refering to in the message...
Sleeping is definitely needed to avoid a hard loop.
Patch attached fixes it and does not seem introduce any special issue...
Should probably be backpatched.
Thanks for the debug!
--
Fabien.
Attachments:
pgbench-rate-bug-1.patchtext/x-diff; name=pgbench-rate-bug-1.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 364e254..3e23a6a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4587,7 +4587,7 @@ threadRun(void *arg)
* or it's time to print a progress report. Update input_mask to show
* which client(s) received data.
*/
- if (min_usec > 0 && maxsock != -1)
+ if (min_usec > 0 || maxsock != -1)
{
int nsocks; /* return from select(2) */
On Mon, Sep 4, 2017 at 1:56 PM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:
Hello Jeff,
I have fixed a bug introduced in the patch by changing && by || in the
(min_sec > 0 && maxsock != -1) condition which was inducing errors with
multi-threads & clients...Since this commit (12788ae49e1933f463bc5), if I use the --rate to throttle
the transaction rate, it does get throttled to about the indicated speed,
but the pg_bench consumes the entire CPU.At the block of code starting
if (min_usec > 0 && maxsock != -1)If maxsock == -1, then there is no sleep happening.
Argh, shame on me:-(
I cannot find the "induced errors" I was refering to in the message...
Sleeping is definitely needed to avoid a hard loop.Patch attached fixes it and does not seem introduce any special issue...
Should probably be backpatched.
Thanks for the debug!
Thanks Fabien, that works for me.
But if min_sec <= 0, do we want to do whatever it is that we already know
is over-do, before stopping to do the select? If it is safe to go through
this code path when maxsock == -1, then should we just change it to this?
if (min_usec > 0)
Cheers,
Jeff
Hello Jeff,
Ok, the problem was a little bit more trivial than I thought.
The issue is that under a low rate there may be no transaction in
progress, however the wait procedure was relying on select's timeout. If
nothing is active there is nothing to wait for, thus it was an active loop
in this case...
I've introduced a usleep call in place of select for this particular
case. Hopefully this is portable.
ISTM that this bug exists since rate was introduced, so shame on me and
back-patching should be needed.
--
Fabien.
Attachments:
pgbench-rate-bug-2.patchtext/x-diff; name=pgbench-rate-bug-2.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e37496c..068dbe6 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4582,20 +4582,30 @@ threadRun(void *arg)
* or it's time to print a progress report. Update input_mask to show
* which client(s) received data.
*/
- if (min_usec > 0 && maxsock != -1)
+ if (min_usec > 0)
{
- int nsocks; /* return from select(2) */
+ int nsocks = 0; /* return from select(2) if called, or usleep() */
if (min_usec != PG_INT64_MAX)
{
- struct timeval timeout;
+ if (maxsock != -1)
+ {
+ struct timeval timeout;
- timeout.tv_sec = min_usec / 1000000;
- timeout.tv_usec = min_usec % 1000000;
- nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_usec = min_usec % 1000000;
+ nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+ }
+ else /* nothing active, simple sleep */
+ {
+ nsocks = usleep(min_usec);
+ }
}
- else
+ else /* no delay, select without timeout */
+ {
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+ }
+
if (nsocks < 0)
{
if (errno == EINTR)
@@ -4604,11 +4614,11 @@ threadRun(void *arg)
continue;
}
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "select() or usleep() failed: %s\n", strerror(errno));
goto done;
}
}
- else
+ else /* min_usec == 0 */
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
On Mon, Sep 11, 2017 at 1:49 AM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:
Hello Jeff,
Ok, the problem was a little bit more trivial than I thought.
The issue is that under a low rate there may be no transaction in
progress, however the wait procedure was relying on select's timeout. If
nothing is active there is nothing to wait for, thus it was an active loop
in this case...I've introduced a usleep call in place of select for this particular case.
Hopefully this is portable.
Shouldn't we use pg_usleep to ensure portability? it is defined for
front-end code. But it returns void, so the error check will have to be
changed.
I didn't see the problem before the commit I originally indicated , so I
don't think it has to be back-patched to before v10.
Cheers,
Jeff
Hello Jeff,
Shouldn't we use pg_usleep to ensure portability? it is defined for
front-end code. But it returns void, so the error check will have to be
changed.
Attached v3 with pg_usleep called instead.
I didn't see the problem before the commit I originally indicated , so I
don't think it has to be back-patched to before v10.
Hmmm.... you've got a point, although I'm not sure how it could work
without sleeping explicitely. Maybe the path was calling select with an
empty wait list plus timeout, and select is kind enough to just sleep on
an empty list, or some other miracle. ISTM clearer to explicitely sleep in
that case.
--
Fabien.
Attachments:
pgbench-rate-bug-3.patchtext/x-diff; name=pgbench-rate-bug-3.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e37496c..524fcc6 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4582,20 +4582,30 @@ threadRun(void *arg)
* or it's time to print a progress report. Update input_mask to show
* which client(s) received data.
*/
- if (min_usec > 0 && maxsock != -1)
+ if (min_usec > 0)
{
- int nsocks; /* return from select(2) */
+ int nsocks = 0; /* return from select(2) if called */
if (min_usec != PG_INT64_MAX)
{
- struct timeval timeout;
+ if (maxsock != -1)
+ {
+ struct timeval timeout;
- timeout.tv_sec = min_usec / 1000000;
- timeout.tv_usec = min_usec % 1000000;
- nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+ timeout.tv_sec = min_usec / 1000000;
+ timeout.tv_usec = min_usec % 1000000;
+ nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+ }
+ else /* nothing active, simple sleep */
+ {
+ pg_usleep(min_usec);
+ }
}
- else
+ else /* no explicit delay, select without timeout */
+ {
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+ }
+
if (nsocks < 0)
{
if (errno == EINTR)
@@ -4608,7 +4618,7 @@ threadRun(void *arg)
goto done;
}
}
- else
+ else /* min_usec == 0, i.e. something needs to be executed */
{
/* If we didn't call select(), don't try to read any data */
FD_ZERO(&input_mask);
On Tue, Sep 12, 2017 at 03:27:13AM +0200, Fabien COELHO wrote:
Shouldn't we use pg_usleep to ensure portability? it is defined for
front-end code. But it returns void, so the error check will have to be
changed.Attached v3 with pg_usleep called instead.
I didn't see the problem before the commit I originally indicated , so I
don't think it has to be back-patched to before v10.Hmmm.... you've got a point, although I'm not sure how it could work without
sleeping explicitely. Maybe the path was calling select with an empty wait
list plus timeout, and select is kind enough to just sleep on an empty list,
or some other miracle. ISTM clearer to explicitely sleep in that case.
[Action required within three days. This is a generic notification.]
The above-described topic is currently a PostgreSQL 10 open item. Heikki,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.
[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Sep 11, 2017 at 6:27 PM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:
Hello Jeff,
Shouldn't we use pg_usleep to ensure portability? it is defined for
front-end code. But it returns void, so the error check will have to be
changed.Attached v3 with pg_usleep called instead.
I didn't see the problem before the commit I originally indicated , so I
don't think it has to be back-patched to before v10.
Hmmm.... you've got a point, although I'm not sure how it could work
without sleeping explicitely. Maybe the path was calling select with an
empty wait list plus timeout, and select is kind enough to just sleep on an
empty list, or some other miracle.
Not really a miracle, calling select with an empty list of file handles is
a standard way to sleep on Unix-like platforms. (Indeed, that is how
pg_usleep is implemented on non-Windows platforms, see
"src/port/pgsleep.c"). The problem is that it is reportedly not portable
to Windows. But I tested pgbench.exe for 9.6.5-1 from EDB installer, and I
don't see excessive CPU usage for a throttled run, and it throttles to
about the correct speed. So maybe the non-portability is more rumor than
reality. So I don't know if this needs backpatching or not. But it should
be fixed for v10, as there it becomes a demonstrably live issue.
ISTM clearer to explicitly sleep in that case.
Yes.
Cheers,
Jeff
reality. So I don't know if this needs backpatching or not. But it
should be fixed for v10, as there it becomes a demonstrably live issue.
Yes.
--
Fabien.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Sep 11, 2017 at 4:49 AM, Fabien COELHO <coelho@cri.ensmp.fr> wrote:
Ok, the problem was a little bit more trivial than I thought.
The issue is that under a low rate there may be no transaction in progress,
however the wait procedure was relying on select's timeout. If nothing is
active there is nothing to wait for, thus it was an active loop in this
case...I've introduced a usleep call in place of select for this particular case.
Hopefully this is portable.ISTM that this bug exists since rate was introduced, so shame on me and
back-patching should be needed.
I took a look at this and found that the proposed patch applies
cleanly all the way back to 9.5, but the regression is reported to
have begun with a commit that starts in v10. I haven't probed into
this in any depth, but are we sure that
12788ae49e1933f463bc59a6efe46c4a01701b76 is in fact where this problem
originated?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello Robert,
ISTM that this bug exists since rate was introduced, so shame on me and
back-patching should be needed.I took a look at this and found that the proposed patch applies
cleanly all the way back to 9.5, but the regression is reported to
have begun with a commit that starts in v10. I haven't probed into
this in any depth, but are we sure that
12788ae49e1933f463bc59a6efe46c4a01701b76 is in fact where this problem
originated?
Yes.
I just rechecked that the problem occurs at 12788ae but not at the
preceding da6c4f6ca8.
Now the situation before the restructuring is that it worked but given the
spaghetti code it was very hard to guess why, not to fix issues when
not...
My late at night fuzzy interpretation is as follows:
The issue is in the code above the fix I submitted which checks what has
to be selected. In the previous version ISTM that the condition was laxed,
so it filled the input_mask even if the client was not waiting for
anything, so it was calling select later which was really just
implementing the timeout. With the updated version the input mask and
maxsock is only set if there is really something to wait, and if not then
it fall through and is active instead of doing a simple sleep/timeout.
So I would say that the previous version worked because of a side effect
which may or may not have been intentional at the time, and was revealed
by checking the condition better.
Basically I'd say that the restructuring patch fixed a defect which
triggered the bug. Programming is fun:-)
--
Fabien.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 09/29/2017 08:43 PM, Fabien COELHO wrote:
reality. So I don't know if this needs backpatching or not. But it
should be fixed for v10, as there it becomes a demonstrably live issue.Yes.
Patch looks good to me, so committed to master and v10. Thanks!
- Heikki
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers