diff -cpr HEAD/contrib/pgbench/pgbench.c pgbench-mt_20090709/contrib/pgbench/pgbench.c *** HEAD/contrib/pgbench/pgbench.c Fri Jun 12 09:52:43 2009 --- pgbench-mt_20090709/contrib/pgbench/pgbench.c Thu Jul 9 17:22:03 2009 *************** *** 30,35 **** --- 30,36 ---- #include "libpq-fe.h" #include "pqsignal.h" + #include "portability/instr_time.h" #include *************** *** 55,60 **** --- 56,80 ---- #include /* for getrlimit */ #endif + #ifndef INT64_MAX + #define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF) + #endif + + #ifdef WIN32 + typedef struct win32_pthread *pthread_t; + typedef int pthread_attr_t; + static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg); + static int pthread_join(pthread_t th, void **thread_return); + #elif defined(ENABLE_THREAD_SAFETY) + #include + #else + #include + typedef struct fork_pthread *pthread_t; + typedef int pthread_attr_t; + static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg); + static int pthread_join(pthread_t th, void **thread_return); + #endif + extern char *optarg; extern int optind; *************** extern int optind; *** 71,77 **** #define DEFAULT_NXACTS 10 /* default nxacts */ - int nclients = 1; /* default number of simulated clients */ int nxacts = 0; /* number of transactions per client */ int duration = 0; /* duration in seconds */ --- 91,96 ---- *************** FILE *LOGFILE = NULL; *** 99,106 **** bool use_log; /* log transaction latencies to a file */ - int remains; /* number of remaining clients */ - int is_connect; /* establish connection for each transaction */ char *pghost = ""; --- 118,123 ---- *************** typedef struct *** 135,149 **** int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ ! struct timeval until; /* napping until */ Variable *variables; /* array of variable definitions */ int nvariables; ! struct timeval txn_begin; /* used for measuring latencies */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; /* * queries read from files */ #define SQL_COMMAND 1 --- 152,185 ---- int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ ! int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; ! instr_time txn_begin; /* used for measuring latencies */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; /* + * Thread state and result + */ + typedef struct + { + pthread_t thread; /* thread handle */ + CState *state; /* array of CState */ + int nstate; /* length of state */ + instr_time start_time; /* thread start time */ + } TState; + + #define INVALID_THREAD ((pthread_t) 0) + + typedef struct + { + instr_time conn_time; + int xacts; + } TResult; + + /* * queries read from files */ #define SQL_COMMAND 1 *************** typedef struct *** 168,175 **** char *argv[MAX_ARGS]; /* command list */ } Command; ! Command **sql_files[MAX_FILES]; /* SQL script files */ ! int num_files; /* number of script files */ /* default scenario */ static char *tpc_b = { --- 204,212 ---- char *argv[MAX_ARGS]; /* command list */ } Command; ! static Command **sql_files[MAX_FILES]; /* SQL script files */ ! static int num_files; /* number of script files */ ! static int debug = 0; /* debug flag */ /* default scenario */ static char *tpc_b = { *************** static char *select_only = { *** 212,255 **** "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" }; - /* Connection overhead time */ - static struct timeval conn_total_time = {0, 0}; - /* Function prototypes */ static void setalarm(int seconds); ! ! ! /* Calculate total time */ ! static void ! addTime(struct timeval * t1, struct timeval * t2, struct timeval * result) ! { ! int sec = t1->tv_sec + t2->tv_sec; ! int usec = t1->tv_usec + t2->tv_usec; ! ! if (usec >= 1000000) ! { ! usec -= 1000000; ! sec++; ! } ! result->tv_sec = sec; ! result->tv_usec = usec; ! } ! ! /* Calculate time difference */ ! static void ! diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result) ! { ! int sec = t1->tv_sec - t2->tv_sec; ! int usec = t1->tv_usec - t2->tv_usec; ! ! if (usec < 0) ! { ! usec += 1000000; ! sec--; ! } ! result->tv_sec = sec; ! result->tv_usec = usec; ! } static void usage(const char *progname) --- 249,257 ---- "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" }; /* Function prototypes */ static void setalarm(int seconds); ! static void* threadRun(void *arg); static void usage(const char *progname) *************** usage(const char *progname) *** 267,272 **** --- 269,275 ---- " -D VARNAME=VALUE\n" " define variable for use by custom script\n" " -f FILENAME read transaction script from FILENAME\n" + " -j NUM number of threads (default: 1)\n" " -l write transaction times to log file\n" " -M {simple|extended|prepared}\n" " protocol for submitting queries to server (default: simple)\n" *************** discard_response(CState *state) *** 376,404 **** } while (res); } - /* check to see if the SQL result was good */ - static int - check(CState *state, PGresult *res, int n) - { - CState *st = &state[n]; - - switch (PQresultStatus(res)) - { - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - /* OK */ - break; - default: - fprintf(stderr, "Client %d aborted in state %d: %s", - n, st->state, PQerrorMessage(st->con)); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return (-1); - } - return (0); /* OK */ - } - static int compareVariables(const void *v1, const void *v2) { --- 379,384 ---- *************** preparedStatementName(char *buffer, int *** 595,605 **** sprintf(buffer, "P%d_%d", file, state); } ! static void ! doCustom(CState *state, int n, int debug) { PGresult *res; - CState *st = &state[n]; Command **commands; top: --- 575,598 ---- sprintf(buffer, "P%d_%d", file, state); } ! static bool ! clientDone(CState *st, bool ok) ! { ! (void) ok; /* unused */ ! ! if (st->con != NULL) ! { ! PQfinish(st->con); ! st->con = NULL; ! } ! return false; /* always false */ ! } ! ! /* return false iff client should be disconnected */ ! static bool ! doCustom(CState *st, instr_time *conn_time) { PGresult *res; Command **commands; top: *************** top: *** 607,622 **** if (st->sleeping) { /* are we sleeping? */ ! int usec; ! struct timeval now; ! gettimeofday(&now, NULL); ! usec = (st->until.tv_sec - now.tv_sec) * 1000000 + ! st->until.tv_usec - now.tv_usec; ! if (usec <= 0) st->sleeping = 0; /* Done sleeping, go ahead with next command */ else ! return; /* Still sleeping, nothing to do here */ } if (st->listen) --- 600,612 ---- if (st->sleeping) { /* are we sleeping? */ ! instr_time now; ! INSTR_TIME_SET_CURRENT(now); ! if (st->until <= INSTR_TIME_GET_MICROSEC(now)) st->sleeping = 0; /* Done sleeping, go ahead with next command */ else ! return true; /* Still sleeping, nothing to do here */ } if (st->listen) *************** top: *** 624,640 **** if (commands[st->state]->type == SQL_COMMAND) { if (debug) ! fprintf(stderr, "client %d receiving\n", n); if (!PQconsumeInput(st->con)) { /* there's something wrong */ ! fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); ! remains--; /* I've aborted */ ! PQfinish(st->con); ! st->con = NULL; ! return; } if (PQisBusy(st->con)) ! return; /* don't have the whole result yet */ } /* --- 614,627 ---- if (commands[st->state]->type == SQL_COMMAND) { if (debug) ! fprintf(stderr, "client %d receiving\n", st->id); if (!PQconsumeInput(st->con)) { /* there's something wrong */ ! fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state); ! return clientDone(st, false); } if (PQisBusy(st->con)) ! return true; /* don't have the whole result yet */ } /* *************** top: *** 642,666 **** */ if (use_log && commands[st->state + 1] == NULL) { ! double diff; ! struct timeval now; ! ! gettimeofday(&now, NULL); ! diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 + ! (int) (now.tv_usec - st->txn_begin.tv_usec); ! ! fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n", ! st->id, st->cnt, diff, st->use_file, ! (long) now.tv_sec, (long) now.tv_usec); } if (commands[st->state]->type == SQL_COMMAND) { res = PQgetResult(st->con); ! if (check(state, res, n)) { ! PQclear(res); ! return; } PQclear(res); discard_response(st); --- 629,663 ---- */ if (use_log && commands[st->state + 1] == NULL) { ! instr_time diff; ! double sec; ! double msec; ! double usec; ! ! INSTR_TIME_SET_CURRENT(diff); ! INSTR_TIME_SUBTRACT(diff, st->txn_begin); ! sec = INSTR_TIME_GET_DOUBLE(diff); ! msec = INSTR_TIME_GET_MILLISEC(diff); ! usec = (double) INSTR_TIME_GET_MICROSEC(diff); ! ! fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n", ! st->id, st->cnt, usec, st->use_file, ! sec, usec - sec * 1000.0); } if (commands[st->state]->type == SQL_COMMAND) { res = PQgetResult(st->con); ! switch (PQresultStatus(res)) { ! case PGRES_COMMAND_OK: ! case PGRES_TUPLES_OK: ! break; /* OK */ ! default: ! fprintf(stderr, "Client %d aborted in state %d: %s", ! st->id, st->state, PQerrorMessage(st->con)); ! PQclear(res); ! return clientDone(st, false); } PQclear(res); discard_response(st); *************** top: *** 676,690 **** ++st->cnt; if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) ! { ! remains--; /* I've done */ ! if (st->con != NULL) ! { ! PQfinish(st->con); ! st->con = NULL; ! } ! return; ! } } /* increment state counter */ --- 673,679 ---- ++st->cnt; if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) ! return clientDone(st, true); /* exit success */ } /* increment state counter */ *************** top: *** 699,725 **** if (st->con == NULL) { ! struct timeval t1, ! t2, ! t3; ! gettimeofday(&t1, NULL); if ((st->con = doConnect()) == NULL) { ! fprintf(stderr, "Client %d aborted in establishing connection.\n", ! n); ! remains--; /* I've aborted */ ! PQfinish(st->con); ! st->con = NULL; ! return; } ! gettimeofday(&t2, NULL); ! diffTime(&t2, &t1, &t3); ! addTime(&conn_total_time, &t3, &conn_total_time); } if (use_log && st->state == 0) ! gettimeofday(&(st->txn_begin), NULL); if (commands[st->state]->type == SQL_COMMAND) { --- 688,707 ---- if (st->con == NULL) { ! instr_time start, end; ! INSTR_TIME_SET_CURRENT(start); if ((st->con = doConnect()) == NULL) { ! fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id); ! return clientDone(st, false); } ! INSTR_TIME_SET_CURRENT(end); ! INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } if (use_log && st->state == 0) ! INSTR_TIME_SET_CURRENT(st->txn_begin); if (commands[st->state]->type == SQL_COMMAND) { *************** top: *** 735,745 **** { fprintf(stderr, "out of memory\n"); st->ecnt++; ! return; } if (debug) ! fprintf(stderr, "client %d sending %s\n", n, sql); r = PQsendQuery(st->con, sql); free(sql); } --- 717,727 ---- { fprintf(stderr, "out of memory\n"); st->ecnt++; ! return true; } if (debug) ! fprintf(stderr, "client %d sending %s\n", st->id, sql); r = PQsendQuery(st->con, sql); free(sql); } *************** top: *** 751,757 **** getQueryParams(st, command, params); if (debug) ! fprintf(stderr, "client %d sending %s\n", n, sql); r = PQsendQueryParams(st->con, sql, command->argc - 1, NULL, params, NULL, NULL, 0); } --- 733,739 ---- getQueryParams(st, command, params); if (debug) ! fprintf(stderr, "client %d sending %s\n", st->id, sql); r = PQsendQueryParams(st->con, sql, command->argc - 1, NULL, params, NULL, NULL, 0); } *************** top: *** 785,791 **** preparedStatementName(name, st->use_file, st->state); if (debug) ! fprintf(stderr, "client %d sending %s\n", n, name); r = PQsendQueryPrepared(st->con, name, command->argc - 1, params, NULL, NULL, 0); } --- 767,773 ---- preparedStatementName(name, st->use_file, st->state); if (debug) ! fprintf(stderr, "client %d sending %s\n", st->id, name); r = PQsendQueryPrepared(st->con, name, command->argc - 1, params, NULL, NULL, 0); } *************** top: *** 795,801 **** if (r == 0) { if (debug) ! fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]); st->ecnt++; } else --- 777,783 ---- if (r == 0) { if (debug) ! fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]); st->ecnt++; } else *************** top: *** 809,815 **** if (debug) { ! fprintf(stderr, "client %d executing \\%s", n, argv[0]); for (i = 1; i < argc; i++) fprintf(stderr, " %s", argv[i]); fprintf(stderr, "\n"); --- 791,797 ---- if (debug) { ! fprintf(stderr, "client %d executing \\%s", st->id, argv[0]); for (i = 1; i < argc; i++) fprintf(stderr, " %s", argv[i]); fprintf(stderr, "\n"); *************** top: *** 828,834 **** { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; ! return; } min = atoi(var); } --- 810,816 ---- { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; ! return true; } min = atoi(var); } *************** top: *** 850,856 **** { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); st->ecnt++; ! return; } max = atoi(var); } --- 832,838 ---- { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); st->ecnt++; ! return true; } max = atoi(var); } *************** top: *** 861,867 **** { fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); st->ecnt++; ! return; } #ifdef DEBUG --- 843,849 ---- { fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); st->ecnt++; ! return true; } #ifdef DEBUG *************** top: *** 873,879 **** { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; ! return; } st->listen = 1; --- 855,861 ---- { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; ! return true; } st->listen = 1; *************** top: *** 891,897 **** { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; ! return; } ope1 = atoi(var); } --- 873,879 ---- { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; ! return true; } ope1 = atoi(var); } *************** top: *** 908,914 **** { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); st->ecnt++; ! return; } ope2 = atoi(var); } --- 890,896 ---- { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); st->ecnt++; ! return true; } ope2 = atoi(var); } *************** top: *** 927,933 **** { fprintf(stderr, "%s: division by zero\n", argv[0]); st->ecnt++; ! return; } snprintf(res, sizeof(res), "%d", ope1 / ope2); } --- 909,915 ---- { fprintf(stderr, "%s: division by zero\n", argv[0]); st->ecnt++; ! return true; } snprintf(res, sizeof(res), "%d", ope1 / ope2); } *************** top: *** 935,941 **** { fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); st->ecnt++; ! return; } } --- 917,923 ---- { fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); st->ecnt++; ! return true; } } *************** top: *** 943,949 **** { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; ! return; } st->listen = 1; --- 925,931 ---- { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; ! return true; } st->listen = 1; *************** top: *** 952,958 **** { char *var; int usec; ! struct timeval now; if (*argv[1] == ':') { --- 934,940 ---- { char *var; int usec; ! instr_time now; if (*argv[1] == ':') { *************** top: *** 960,966 **** { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); st->ecnt++; ! return; } usec = atoi(var); } --- 942,948 ---- { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); st->ecnt++; ! return true; } usec = atoi(var); } *************** top: *** 977,985 **** else usec *= 1000000; ! gettimeofday(&now, NULL); ! st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000; ! st->until.tv_usec = (now.tv_usec + usec) % 1000000; st->sleeping = 1; st->listen = 1; --- 959,966 ---- else usec *= 1000000; ! INSTR_TIME_SET_CURRENT(now); ! st->until = INSTR_TIME_GET_MICROSEC(now) + usec; st->sleeping = 1; st->listen = 1; *************** top: *** 987,1004 **** goto top; } } /* discard connections */ static void ! disconnect_all(CState *state) { int i; ! for (i = 0; i < nclients; i++) { if (state[i].con) PQfinish(state[i].con); } } --- 968,990 ---- goto top; } + + return true; } /* discard connections */ static void ! disconnect_all(CState *state, int length) { int i; ! for (i = 0; i < length; i++) { if (state[i].con) + { PQfinish(state[i].con); + state[i].con = NULL; + } } } *************** process_commands(char *buf) *** 1264,1269 **** --- 1250,1273 ---- return NULL; } + /* + * Split argument into number and unit for "sleep 1ms" or so. + * We don't have to terminate the number argument with null + * because it will parsed with atoi, that ignores trailing + * non-digit characters. + */ + if (my_commands->argv[1][0] != ':') + { + char *c = my_commands->argv[1]; + while (isdigit(*c)) { c++; } + if (*c) + { + my_commands->argv[2] = c; + if (my_commands->argc < 3) + my_commands->argc = 3; + } + } + if (my_commands->argc >= 3) { if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && *************** process_builtin(char *tb) *** 1450,1474 **** /* print out results */ static void ! printResults( ! int ttype, CState *state, ! struct timeval * start_time, struct timeval * end_time) { ! double t1, ! t2; ! int i; ! int normal_xacts = 0; char *s; ! for (i = 0; i < nclients; i++) ! normal_xacts += state[i].cnt; ! ! t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec); ! t1 = normal_xacts * 1000000.0 / t1; ! ! t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 + ! (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec); ! t2 = normal_xacts * 1000000.0 / t2; if (ttype == 0) s = "TPC-B (sort of)"; --- 1454,1471 ---- /* print out results */ static void ! printResults(int ttype, int normal_xacts, int nclients, int nthreads, ! instr_time total_time, instr_time conn_total_time) { ! double time_include, ! tps_include, ! tps_exclude; char *s; ! time_include = INSTR_TIME_GET_DOUBLE(total_time); ! tps_include = normal_xacts / time_include; ! tps_exclude = normal_xacts / (time_include - ! (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); if (ttype == 0) s = "TPC-B (sort of)"; *************** printResults( *** 1483,1488 **** --- 1480,1486 ---- printf("scaling factor: %d\n", scale); printf("query mode: %s\n", QUERYMODE[querymode]); printf("number of clients: %d\n", nclients); + printf("number of threads: %d\n", nthreads); if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); *************** printResults( *** 1495,1502 **** printf("number of transactions actually processed: %d\n", normal_xacts); } ! printf("tps = %f (including connections establishing)\n", t1); ! printf("tps = %f (excluding connections establishing)\n", t2); } --- 1493,1500 ---- printf("number of transactions actually processed: %d\n", normal_xacts); } ! printf("tps = %f (including connections establishing)\n", tps_include); ! printf("tps = %f (excluding connections establishing)\n", tps_exclude); } *************** int *** 1504,1532 **** main(int argc, char **argv) { int c; int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ - int debug = 0; /* debug flag */ int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, * 2: skip update of branches and tellers */ char *filename = NULL; bool scale_given = false; CState *state; /* status of clients */ ! struct timeval start_time; /* start up time */ ! struct timeval end_time; /* end time */ int i; - fd_set input_mask; - int nsocks; /* return from select(2) */ - int maxsock; /* max socket number to be waited */ - struct timeval now; - struct timeval timeout; - int min_usec; - #ifdef HAVE_GETRLIMIT struct rlimit rlim; #endif --- 1502,1527 ---- main(int argc, char **argv) { int c; + int nclients = 1; /* default number of simulated clients */ + int nthreads = 1; /* default number of threads */ int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, * 2: skip update of branches and tellers */ char *filename = NULL; bool scale_given = false; CState *state; /* status of clients */ + TState *threads; /* array of thread */ ! instr_time start_time; /* start up time */ ! instr_time total_time; ! instr_time conn_total_time; ! int total_xacts; int i; #ifdef HAVE_GETRLIMIT struct rlimit rlim; #endif *************** main(int argc, char **argv) *** 1576,1582 **** memset(state, 0, sizeof(*state)); ! while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1) { switch (c) { --- 1571,1577 ---- memset(state, 0, sizeof(*state)); ! while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1) { switch (c) { *************** main(int argc, char **argv) *** 1629,1634 **** --- 1624,1637 ---- } #endif /* HAVE_GETRLIMIT */ break; + case 'j': /* jobs */ + nthreads = atoi(optarg); + if (nthreads <= 0) + { + fprintf(stderr, "invalid number of threads: %d\n", nthreads); + exit(1); + } + break; case 'C': is_connect = 1; break; *************** main(int argc, char **argv) *** 1749,1755 **** if (nxacts <= 0 && duration <= 0) nxacts = DEFAULT_NXACTS; ! remains = nclients; if (nclients > 1) { --- 1752,1762 ---- if (nxacts <= 0 && duration <= 0) nxacts = DEFAULT_NXACTS; ! if (nclients % nthreads != 0) ! { ! fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads); ! exit(1); ! } if (nclients > 1) { *************** main(int argc, char **argv) *** 1767,1772 **** --- 1774,1780 ---- { int j; + state[i].id = i; for (j = 0; j < state[0].nvariables; j++) { if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false) *************** main(int argc, char **argv) *** 1876,1908 **** PQfinish(con); /* set random seed */ ! gettimeofday(&start_time, NULL); ! srandom((unsigned int) start_time.tv_usec); ! ! /* get start up time */ ! gettimeofday(&start_time, NULL); ! ! /* set alarm if duration is specified. */ ! if (duration > 0) ! setalarm(duration); ! ! if (is_connect == 0) ! { ! struct timeval t, ! now; ! ! /* make connections to the database */ ! for (i = 0; i < nclients; i++) ! { ! state[i].id = i; ! if ((state[i].con = doConnect()) == NULL) ! exit(1); ! } ! /* time after connections set up */ ! gettimeofday(&now, NULL); ! diffTime(&now, &start_time, &t); ! addTime(&conn_total_time, &t, &conn_total_time); ! } /* process bultin SQL scripts */ switch (ttype) --- 1884,1891 ---- PQfinish(con); /* set random seed */ ! INSTR_TIME_SET_CURRENT(start_time); ! srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)); /* process bultin SQL scripts */ switch (ttype) *************** main(int argc, char **argv) *** 1926,2065 **** break; } /* send start up queries in async manner */ ! for (i = 0; i < nclients; i++) { ! Command **commands = sql_files[state[i].use_file]; ! int prev_ecnt = state[i].ecnt; ! state[i].use_file = getrand(0, num_files - 1); ! doCustom(state, i, debug); ! if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) { ! fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state); remains--; /* I've aborted */ ! PQfinish(state[i].con); ! state[i].con = NULL; } } ! for (;;) { ! if (remains <= 0) ! { /* all done ? */ ! disconnect_all(state); ! /* get end time */ ! gettimeofday(&end_time, NULL); ! printResults(ttype, state, &start_time, &end_time); ! if (LOGFILE) ! fclose(LOGFILE); ! exit(0); ! } FD_ZERO(&input_mask); maxsock = -1; ! min_usec = -1; ! for (i = 0; i < nclients; i++) { ! Command **commands = sql_files[state[i].use_file]; ! if (state[i].sleeping) { int this_usec; - int sock = PQsocket(state[i].con); ! if (min_usec < 0) { ! gettimeofday(&now, NULL); ! min_usec = 0; } ! this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 + ! state[i].until.tv_usec - now.tv_usec; ! ! if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec)) min_usec = this_usec; - - FD_SET (sock, &input_mask); - - if (maxsock < sock) - maxsock = sock; } ! else if (state[i].con && commands[state[i].state]->type != META_COMMAND) { ! int sock = PQsocket(state[i].con); ! ! if (sock < 0) ! { ! disconnect_all(state); ! exit(1); ! } ! FD_SET (sock, &input_mask); ! if (maxsock < sock) ! maxsock = sock; } } ! if (maxsock != -1) { ! if (min_usec >= 0) { timeout.tv_sec = min_usec / 1000000; timeout.tv_usec = min_usec % 1000000; ! ! nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, ! (fd_set *) NULL, &timeout); } else ! nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, ! (fd_set *) NULL, (struct timeval *) NULL); if (nsocks < 0) { if (errno == EINTR) continue; /* must be something wrong */ - disconnect_all(state); fprintf(stderr, "select failed: %s\n", strerror(errno)); ! exit(1); } - #ifdef NOT_USED - else if (nsocks == 0) - { /* timeout */ - fprintf(stderr, "select timeout\n"); - for (i = 0; i < nclients; i++) - { - fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", - i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); - } - exit(0); - } - #endif } /* ok, backend returns reply */ ! for (i = 0; i < nclients; i++) { ! Command **commands = sql_files[state[i].use_file]; ! int prev_ecnt = state[i].ecnt; ! if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask) ! || commands[state[i].state]->type == META_COMMAND)) { ! doCustom(state, i, debug); } ! if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) { ! fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state); remains--; /* I've aborted */ ! PQfinish(state[i].con); ! state[i].con = NULL; } } } } --- 1909,2135 ---- break; } + /* get start up time */ + INSTR_TIME_SET_CURRENT(start_time); + + /* set alarm if duration is specified. */ + if (duration > 0) + setalarm(duration); + + /* start threads */ + threads = (TState *) malloc(sizeof(TState) * nthreads); + for (i = 0; i < nthreads; i++) + { + threads[i].state = &state[nclients / nthreads * i]; + threads[i].nstate = nclients / nthreads; + INSTR_TIME_SET_CURRENT(threads[i].start_time); + + /* the first thread (i = 0) is executed by main thread */ + if (i > 0) + { + int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); + if (err != 0 || threads[i].thread == INVALID_THREAD) + { + fprintf(stderr, "cannot create thread: %s\n", strerror(err)); + exit(1); + } + } + else + { + threads[i].thread = INVALID_THREAD; + } + } + + /* wait for threads and accumulate results */ + total_xacts = 0; + INSTR_TIME_SET_ZERO(conn_total_time); + for (i = 0; i < nthreads; i++) + { + void *ret = NULL; + + if (threads[i].thread == INVALID_THREAD) + ret = threadRun(&threads[i]); + else + pthread_join(threads[i].thread, &ret); + + if (ret != NULL) + { + TResult *r = (TResult *) ret; + total_xacts += r->xacts; + INSTR_TIME_ADD(conn_total_time, r->conn_time); + free(ret); + } + } + disconnect_all(state, nclients); + + /* get end time */ + INSTR_TIME_SET_CURRENT(total_time); + INSTR_TIME_SUBTRACT(total_time, start_time); + printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time); + if (LOGFILE) + fclose(LOGFILE); + + return 0; + } + + static void * + threadRun(void *arg) + { + TState *thread = (TState *) arg; + CState *state = thread->state; + TResult *result; + instr_time start, end; + int nstate = thread->nstate; + int remains = nstate; /* number of remaining clients */ + int i; + + result = malloc(sizeof(TResult)); + INSTR_TIME_SET_ZERO(result->conn_time); + + if (is_connect == 0) + { + /* make connections to the database */ + for (i = 0; i < nstate; i++) + { + if ((state[i].con = doConnect()) == NULL) + goto done; + } + } + + /* time after thread and connections set up */ + INSTR_TIME_SET_CURRENT(result->conn_time); + INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); + /* send start up queries in async manner */ ! for (i = 0; i < nstate; i++) { ! CState *st = &state[i]; ! Command **commands = sql_files[st->use_file]; ! int prev_ecnt = st->ecnt; ! st->use_file = getrand(0, num_files - 1); ! if (!doCustom(st, &result->conn_time)) ! remains--; /* I've aborted */ ! if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { ! fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state); remains--; /* I've aborted */ ! PQfinish(st->con); ! st->con = NULL; } } ! while (remains > 0) { ! fd_set input_mask; ! int maxsock; /* max socket number to be waited */ ! int64 now_usec = 0; ! int64 min_usec; FD_ZERO(&input_mask); maxsock = -1; ! min_usec = INT64_MAX; ! for (i = 0; i < nstate; i++) { ! CState *st = &state[i]; ! Command **commands = sql_files[st->use_file]; ! int sock; ! if (st->sleeping) { int this_usec; ! if (min_usec == INT64_MAX) { ! instr_time now; ! INSTR_TIME_SET_CURRENT(now); ! now_usec = INSTR_TIME_GET_MICROSEC(now); } ! this_usec = st->until - now_usec; ! if (min_usec > this_usec) min_usec = this_usec; } ! else if (st->con == NULL) { ! continue; ! } ! else if (commands[st->state]->type == META_COMMAND) ! { ! min_usec = 0; /* the connection is ready to run */ ! break; ! } ! sock = PQsocket(st->con); ! if (sock < 0) ! { ! fprintf(stderr, "bad socket: %s\n", strerror(errno)); ! goto done; } + + FD_SET(sock, &input_mask); + if (maxsock < sock) + maxsock = sock; } ! if (min_usec > 0 && maxsock != -1) { ! int nsocks; /* return from select(2) */ ! ! if (min_usec != INT64_MAX) { + struct timeval timeout; timeout.tv_sec = min_usec / 1000000; timeout.tv_usec = min_usec % 1000000; ! nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); } else ! nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); if (nsocks < 0) { if (errno == EINTR) continue; /* must be something wrong */ fprintf(stderr, "select failed: %s\n", strerror(errno)); ! goto done; } } /* ok, backend returns reply */ ! for (i = 0; i < nstate; i++) { ! CState *st = &state[i]; ! Command **commands = sql_files[st->use_file]; ! int prev_ecnt = st->ecnt; ! if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) ! || commands[st->state]->type == META_COMMAND)) { ! if (!doCustom(st, &result->conn_time)) ! remains--; /* I've aborted */ } ! if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { ! fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state); remains--; /* I've aborted */ ! PQfinish(st->con); ! st->con = NULL; } } } + + done: + INSTR_TIME_SET_CURRENT(start); + disconnect_all(state, nstate); + result->xacts = 0; + for (i = 0; i < nstate; i++) + result->xacts += state[i].cnt; + INSTR_TIME_SET_CURRENT(end); + INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); + return result; } *************** setalarm(int seconds) *** 2081,2086 **** --- 2151,2228 ---- pqsignal(SIGALRM, handle_sig_alarm); alarm(seconds); } + + #ifndef ENABLE_THREAD_SAFETY + + /* + * implements pthread using fork. + */ + + typedef struct fork_pthread + { + pid_t pid; + int pipes[2]; + } fork_pthread; + + static int + pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void * arg) + { + fork_pthread *th; + void *ret; + + th = (fork_pthread *) malloc(sizeof(fork_pthread)); + pipe(th->pipes); + + th->pid = fork(); + if (th->pid == -1) /* error */ + { + free(th); + return errno; + } + if (th->pid != 0) /* parent process */ + { + close(th->pipes[1]); + *thread = th; + return 0; + } + + /* child process */ + close(th->pipes[0]); + + /* set alarm again because the child does not inherit timers */ + if (duration > 0) + setalarm(duration); + + ret = start_routine(arg); + write(th->pipes[1], ret, sizeof(TResult)); + close(th->pipes[1]); + free(th); + exit(0); + } + + static int + pthread_join(pthread_t th, void **thread_return) + { + int status; + + while (waitpid(th->pid, &status, 0) != th->pid) + { + if (errno != EINTR) + return errno; + } + + if (thread_return != NULL) + { + *thread_return = malloc(sizeof(TResult)); + read(th->pipes[0], *thread_return, sizeof(TResult)); + } + + free(th); + return 0; + } + + #endif + #else /* WIN32 */ static VOID CALLBACK *************** setalarm(int seconds) *** 2107,2110 **** --- 2249,2318 ---- } } + /* partial pthread implementation for Windows */ + + typedef struct win32_pthread + { + HANDLE handle; + void *(*routine)(void *); + void *arg; + void *result; + } win32_pthread; + + static unsigned __stdcall + win32_pthread_run(void *arg) + { + win32_pthread *th = (win32_pthread *) arg; + + th->result = th->routine(th->arg); + + return 0; + } + + static int + pthread_create(pthread_t *thread, + pthread_attr_t *attr, + void * (*start_routine)(void *), + void * arg) + { + int save_errno; + win32_pthread *th; + + th = (win32_pthread *) malloc(sizeof(win32_pthread)); + th->routine = start_routine; + th->arg = arg; + th->result = NULL; + + th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL); + if (th->handle == NULL) + { + save_errno = errno; + free(th); + return save_errno; + } + + *thread = th; + return 0; + } + + static int + pthread_join(pthread_t th, void **thread_return) + { + if (th == NULL || th->handle == NULL) + return errno = EINVAL; + + if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0) + { + _dosmaperr(GetLastError()); + return errno; + } + + if (thread_return) + *thread_return = th->result; + + CloseHandle(th->handle); + free(th); + return 0; + } + #endif /* WIN32 */ diff -cpr HEAD/doc/src/sgml/pgbench.sgml pgbench-mt_20090709/doc/src/sgml/pgbench.sgml *** HEAD/doc/src/sgml/pgbench.sgml Mon May 11 11:33:20 2009 --- pgbench-mt_20090709/doc/src/sgml/pgbench.sgml Thu Jul 9 17:22:03 2009 *************** pgbench options< *** 172,177 **** --- 172,185 ---- + -j threads + + Number of worker threads. Clients are equally-divided into those + threads and executed in it. The number of clients must be a multiple + number of threads. Default is 1. + + + -t transactions Number of transactions each client runs. Default is 10.