diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 7ede6954aa..0c3704a2ff 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -4,7 +4,7 @@ * A simple benchmark program for PostgreSQL * Originally written by Tatsuo Ishii and enhanced by many contributors. * - * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.88 2009/07/30 09:28:00 mha Exp $ + * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.89 2009/08/03 15:18:14 ishii Exp $ * Copyright (c) 2000-2009, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -35,6 +35,7 @@ #include "libpq-fe.h" #include "pqsignal.h" +#include "portability/instr_time.h" #include @@ -58,6 +59,40 @@ #include /* for getrlimit */ #endif +#ifndef INT64_MAX +#define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF) +#endif + +/* + * Multi-platform pthread implementations + */ + +#ifdef WIN32 +/* Use native win32 threads on Windows */ +typedef struct win32_pthread *pthread_t; +typedef int pthread_attr_t; +static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); +static int pthread_join(pthread_t th, void **thread_return); + +#elif defined(ENABLE_THREAD_SAFETY) +/* Use platform-dependent pthread */ +#include + +#else + +#include +/* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */ +#define pthread_t pg_pthread_t +#define pthread_attr_t pg_pthread_attr_t +#define pthread_create pg_pthread_create +#define pthread_join pg_pthread_join +typedef struct fork_pthread *pthread_t; +typedef int pthread_attr_t; +static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); +static int pthread_join(pthread_t th, void **thread_return); + +#endif + extern char *optarg; extern int optind; @@ -74,7 +109,6 @@ extern int optind; #define DEFAULT_NXACTS 10 /* default nxacts */ -int nclients = 1; /* default number of simulated clients */ int nxacts = 0; /* number of transactions per client */ int duration = 0; /* duration in seconds */ @@ -102,8 +136,6 @@ FILE *LOGFILE = NULL; bool use_log; /* log transaction latencies to a file */ -int remains; /* number of remaining clients */ - int is_connect; /* establish connection for each transaction */ char *pghost = ""; @@ -138,14 +170,33 @@ typedef struct int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ - struct timeval until; /* napping until */ + int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; - struct timeval txn_begin; /* used for measuring latencies */ + instr_time txn_begin; /* used for measuring latencies */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; +/* + * Thread state and result + */ +typedef struct +{ + pthread_t thread; /* thread handle */ + CState *state; /* array of CState */ + int nstate; /* length of state */ + instr_time start_time; /* thread start time */ +} TState; + +#define INVALID_THREAD ((pthread_t) 0) + +typedef struct +{ + instr_time conn_time; + int xacts; +} TResult; + /* * queries read from files */ @@ -171,8 +222,9 @@ typedef struct char *argv[MAX_ARGS]; /* command list */ } Command; -Command **sql_files[MAX_FILES]; /* SQL script files */ -int num_files; /* number of script files */ +static Command **sql_files[MAX_FILES]; /* SQL script files */ +static int num_files; /* number of script files */ +static int debug = 0; /* debug flag */ /* default scenario */ static char *tpc_b = { @@ -215,44 +267,9 @@ static char *select_only = { "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" }; -/* Connection overhead time */ -static struct timeval conn_total_time = {0, 0}; - /* Function prototypes */ static void setalarm(int seconds); - - -/* Calculate total time */ -static void -addTime(struct timeval * t1, struct timeval * t2, struct timeval * result) -{ - int sec = t1->tv_sec + t2->tv_sec; - int usec = t1->tv_usec + t2->tv_usec; - - if (usec >= 1000000) - { - usec -= 1000000; - sec++; - } - result->tv_sec = sec; - result->tv_usec = usec; -} - -/* Calculate time difference */ -static void -diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result) -{ - int sec = t1->tv_sec - t2->tv_sec; - int usec = t1->tv_usec - t2->tv_usec; - - if (usec < 0) - { - usec += 1000000; - sec--; - } - result->tv_sec = sec; - result->tv_usec = usec; -} +static void* threadRun(void *arg); static void usage(const char *progname) @@ -270,6 +287,7 @@ usage(const char *progname) " -D VARNAME=VALUE\n" " define variable for use by custom script\n" " -f FILENAME read transaction script from FILENAME\n" + " -j NUM number of threads (default: 1)\n" " -l write transaction times to log file\n" " -M {simple|extended|prepared}\n" " protocol for submitting queries to server (default: simple)\n" @@ -379,29 +397,6 @@ discard_response(CState *state) } while (res); } -/* check to see if the SQL result was good */ -static int -check(CState *state, PGresult *res, int n) -{ - CState *st = &state[n]; - - switch (PQresultStatus(res)) - { - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - /* OK */ - break; - default: - fprintf(stderr, "Client %d aborted in state %d: %s", - n, st->state, PQerrorMessage(st->con)); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return (-1); - } - return (0); /* OK */ -} - static int compareVariables(const void *v1, const void *v2) { @@ -598,11 +593,24 @@ preparedStatementName(char *buffer, int file, int state) sprintf(buffer, "P%d_%d", file, state); } -static void -doCustom(CState *state, int n, int debug) +static bool +clientDone(CState *st, bool ok) +{ + (void) ok; /* unused */ + + if (st->con != NULL) + { + PQfinish(st->con); + st->con = NULL; + } + return false; /* always false */ +} + +/* return false iff client should be disconnected */ +static bool +doCustom(CState *st, instr_time *conn_time) { PGresult *res; - CState *st = &state[n]; Command **commands; top: @@ -610,16 +618,13 @@ top: if (st->sleeping) { /* are we sleeping? */ - int usec; - struct timeval now; + instr_time now; - gettimeofday(&now, NULL); - usec = (st->until.tv_sec - now.tv_sec) * 1000000 + - st->until.tv_usec - now.tv_usec; - if (usec <= 0) + INSTR_TIME_SET_CURRENT(now); + if (st->until <= INSTR_TIME_GET_MICROSEC(now)) st->sleeping = 0; /* Done sleeping, go ahead with next command */ else - return; /* Still sleeping, nothing to do here */ + return true; /* Still sleeping, nothing to do here */ } if (st->listen) @@ -627,17 +632,14 @@ top: if (commands[st->state]->type == SQL_COMMAND) { if (debug) - fprintf(stderr, "client %d receiving\n", n); + fprintf(stderr, "client %d receiving\n", st->id); if (!PQconsumeInput(st->con)) { /* there's something wrong */ - fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; + fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state); + return clientDone(st, false); } if (PQisBusy(st->con)) - return; /* don't have the whole result yet */ + return true; /* don't have the whole result yet */ } /* @@ -645,25 +647,35 @@ top: */ if (use_log && commands[st->state + 1] == NULL) { - double diff; - struct timeval now; + instr_time diff; + double sec; + double msec; + double usec; - gettimeofday(&now, NULL); - diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 + - (int) (now.tv_usec - st->txn_begin.tv_usec); + INSTR_TIME_SET_CURRENT(diff); + INSTR_TIME_SUBTRACT(diff, st->txn_begin); + sec = INSTR_TIME_GET_DOUBLE(diff); + msec = INSTR_TIME_GET_MILLISEC(diff); + usec = (double) INSTR_TIME_GET_MICROSEC(diff); - fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n", - st->id, st->cnt, diff, st->use_file, - (long) now.tv_sec, (long) now.tv_usec); + fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n", + st->id, st->cnt, usec, st->use_file, + sec, usec - sec * 1000.0); } if (commands[st->state]->type == SQL_COMMAND) { res = PQgetResult(st->con); - if (check(state, res, n)) + switch (PQresultStatus(res)) { - PQclear(res); - return; + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: + break; /* OK */ + default: + fprintf(stderr, "Client %d aborted in state %d: %s", + st->id, st->state, PQerrorMessage(st->con)); + PQclear(res); + return clientDone(st, false); } PQclear(res); discard_response(st); @@ -679,15 +691,7 @@ top: ++st->cnt; if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) - { - remains--; /* I've done */ - if (st->con != NULL) - { - PQfinish(st->con); - st->con = NULL; - } - return; - } + return clientDone(st, true); /* exit success */ } /* increment state counter */ @@ -702,27 +706,20 @@ top: if (st->con == NULL) { - struct timeval t1, - t2, - t3; + instr_time start, end; - gettimeofday(&t1, NULL); + INSTR_TIME_SET_CURRENT(start); if ((st->con = doConnect()) == NULL) { - fprintf(stderr, "Client %d aborted in establishing connection.\n", - n); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; + fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id); + return clientDone(st, false); } - gettimeofday(&t2, NULL); - diffTime(&t2, &t1, &t3); - addTime(&conn_total_time, &t3, &conn_total_time); + INSTR_TIME_SET_CURRENT(end); + INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } if (use_log && st->state == 0) - gettimeofday(&(st->txn_begin), NULL); + INSTR_TIME_SET_CURRENT(st->txn_begin); if (commands[st->state]->type == SQL_COMMAND) { @@ -738,11 +735,11 @@ top: { fprintf(stderr, "out of memory\n"); st->ecnt++; - return; + return true; } if (debug) - fprintf(stderr, "client %d sending %s\n", n, sql); + fprintf(stderr, "client %d sending %s\n", st->id, sql); r = PQsendQuery(st->con, sql); free(sql); } @@ -754,7 +751,7 @@ top: getQueryParams(st, command, params); if (debug) - fprintf(stderr, "client %d sending %s\n", n, sql); + fprintf(stderr, "client %d sending %s\n", st->id, sql); r = PQsendQueryParams(st->con, sql, command->argc - 1, NULL, params, NULL, NULL, 0); } @@ -788,7 +785,7 @@ top: preparedStatementName(name, st->use_file, st->state); if (debug) - fprintf(stderr, "client %d sending %s\n", n, name); + fprintf(stderr, "client %d sending %s\n", st->id, name); r = PQsendQueryPrepared(st->con, name, command->argc - 1, params, NULL, NULL, 0); } @@ -798,7 +795,7 @@ top: if (r == 0) { if (debug) - fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]); + fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]); st->ecnt++; } else @@ -812,7 +809,7 @@ top: if (debug) { - fprintf(stderr, "client %d executing \\%s", n, argv[0]); + fprintf(stderr, "client %d executing \\%s", st->id, argv[0]); for (i = 1; i < argc; i++) fprintf(stderr, " %s", argv[i]); fprintf(stderr, "\n"); @@ -831,7 +828,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; - return; + return true; } min = atoi(var); } @@ -853,7 +850,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); st->ecnt++; - return; + return true; } max = atoi(var); } @@ -864,7 +861,7 @@ top: { fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); st->ecnt++; - return; + return true; } #ifdef DEBUG @@ -876,7 +873,7 @@ top: { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; - return; + return true; } st->listen = 1; @@ -894,7 +891,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; - return; + return true; } ope1 = atoi(var); } @@ -911,7 +908,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); st->ecnt++; - return; + return true; } ope2 = atoi(var); } @@ -930,7 +927,7 @@ top: { fprintf(stderr, "%s: division by zero\n", argv[0]); st->ecnt++; - return; + return true; } snprintf(res, sizeof(res), "%d", ope1 / ope2); } @@ -938,7 +935,7 @@ top: { fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); st->ecnt++; - return; + return true; } } @@ -946,7 +943,7 @@ top: { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; - return; + return true; } st->listen = 1; @@ -955,7 +952,7 @@ top: { char *var; int usec; - struct timeval now; + instr_time now; if (*argv[1] == ':') { @@ -963,7 +960,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); st->ecnt++; - return; + return true; } usec = atoi(var); } @@ -980,9 +977,8 @@ top: else usec *= 1000000; - gettimeofday(&now, NULL); - st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000; - st->until.tv_usec = (now.tv_usec + usec) % 1000000; + INSTR_TIME_SET_CURRENT(now); + st->until = INSTR_TIME_GET_MICROSEC(now) + usec; st->sleeping = 1; st->listen = 1; @@ -990,18 +986,23 @@ top: goto top; } + + return true; } /* discard connections */ static void -disconnect_all(CState *state) +disconnect_all(CState *state, int length) { int i; - for (i = 0; i < nclients; i++) + for (i = 0; i < length; i++) { if (state[i].con) + { PQfinish(state[i].con); + state[i].con = NULL; + } } } @@ -1267,6 +1268,24 @@ process_commands(char *buf) return NULL; } + /* + * Split argument into number and unit for "sleep 1ms" or so. + * We don't have to terminate the number argument with null + * because it will parsed with atoi, that ignores trailing + * non-digit characters. + */ + if (my_commands->argv[1][0] != ':') + { + char *c = my_commands->argv[1]; + while (isdigit(*c)) { c++; } + if (*c) + { + my_commands->argv[2] = c; + if (my_commands->argc < 3) + my_commands->argc = 3; + } + } + if (my_commands->argc >= 3) { if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && @@ -1453,25 +1472,18 @@ process_builtin(char *tb) /* print out results */ static void -printResults( - int ttype, CState *state, - struct timeval * start_time, struct timeval * end_time) +printResults(int ttype, int normal_xacts, int nclients, int nthreads, + instr_time total_time, instr_time conn_total_time) { - double t1, - t2; - int i; - int normal_xacts = 0; + double time_include, + tps_include, + tps_exclude; char *s; - for (i = 0; i < nclients; i++) - normal_xacts += state[i].cnt; - - t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec); - t1 = normal_xacts * 1000000.0 / t1; - - t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 + - (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec); - t2 = normal_xacts * 1000000.0 / t2; + time_include = INSTR_TIME_GET_DOUBLE(total_time); + tps_include = normal_xacts / time_include; + tps_exclude = normal_xacts / (time_include - + (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); if (ttype == 0) s = "TPC-B (sort of)"; @@ -1486,6 +1498,7 @@ printResults( printf("scaling factor: %d\n", scale); printf("query mode: %s\n", QUERYMODE[querymode]); printf("number of clients: %d\n", nclients); + printf("number of threads: %d\n", nthreads); if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); @@ -1498,8 +1511,8 @@ printResults( printf("number of transactions actually processed: %d\n", normal_xacts); } - printf("tps = %f (including connections establishing)\n", t1); - printf("tps = %f (excluding connections establishing)\n", t2); + printf("tps = %f (including connections establishing)\n", tps_include); + printf("tps = %f (excluding connections establishing)\n", tps_exclude); } @@ -1507,29 +1520,26 @@ int main(int argc, char **argv) { int c; + int nclients = 1; /* default number of simulated clients */ + int nthreads = 1; /* default number of threads */ int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ - int debug = 0; /* debug flag */ int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, * 2: skip update of branches and tellers */ char *filename = NULL; bool scale_given = false; CState *state; /* status of clients */ + TState *threads; /* array of thread */ - struct timeval start_time; /* start up time */ - struct timeval end_time; /* end time */ + instr_time start_time; /* start up time */ + instr_time total_time; + instr_time conn_total_time; + int total_xacts; int i; - fd_set input_mask; - int nsocks; /* return from select(2) */ - int maxsock; /* max socket number to be waited */ - struct timeval now; - struct timeval timeout; - int min_usec; - #ifdef HAVE_GETRLIMIT struct rlimit rlim; #endif @@ -1579,7 +1589,7 @@ main(int argc, char **argv) memset(state, 0, sizeof(*state)); - while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1) + while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1) { switch (c) { @@ -1632,6 +1642,14 @@ main(int argc, char **argv) } #endif /* HAVE_GETRLIMIT */ break; + case 'j': /* jobs */ + nthreads = atoi(optarg); + if (nthreads <= 0) + { + fprintf(stderr, "invalid number of threads: %d\n", nthreads); + exit(1); + } + break; case 'C': is_connect = 1; break; @@ -1752,7 +1770,11 @@ main(int argc, char **argv) if (nxacts <= 0 && duration <= 0) nxacts = DEFAULT_NXACTS; - remains = nclients; + if (nclients % nthreads != 0) + { + fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads); + exit(1); + } if (nclients > 1) { @@ -1770,6 +1792,7 @@ main(int argc, char **argv) { int j; + state[i].id = i; for (j = 0; j < state[0].nvariables; j++) { if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false) @@ -1879,33 +1902,8 @@ main(int argc, char **argv) PQfinish(con); /* set random seed */ - gettimeofday(&start_time, NULL); - srandom((unsigned int) start_time.tv_usec); - - /* get start up time */ - gettimeofday(&start_time, NULL); - - /* set alarm if duration is specified. */ - if (duration > 0) - setalarm(duration); - - if (is_connect == 0) - { - struct timeval t, - now; - - /* make connections to the database */ - for (i = 0; i < nclients; i++) - { - state[i].id = i; - if ((state[i].con = doConnect()) == NULL) - exit(1); - } - /* time after connections set up */ - gettimeofday(&now, NULL); - diffTime(&now, &start_time, &t); - addTime(&conn_total_time, &t, &conn_total_time); - } + INSTR_TIME_SET_CURRENT(start_time); + srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)); /* process bultin SQL scripts */ switch (ttype) @@ -1929,140 +1927,227 @@ main(int argc, char **argv) break; } - /* send start up queries in async manner */ - for (i = 0; i < nclients; i++) + /* get start up time */ + INSTR_TIME_SET_CURRENT(start_time); + + /* set alarm if duration is specified. */ + if (duration > 0) + setalarm(duration); + + /* start threads */ + threads = (TState *) malloc(sizeof(TState) * nthreads); + for (i = 0; i < nthreads; i++) { - Command **commands = sql_files[state[i].use_file]; - int prev_ecnt = state[i].ecnt; + threads[i].state = &state[nclients / nthreads * i]; + threads[i].nstate = nclients / nthreads; + INSTR_TIME_SET_CURRENT(threads[i].start_time); - state[i].use_file = getrand(0, num_files - 1); - doCustom(state, i, debug); - - if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) + /* the first thread (i = 0) is executed by main thread */ + if (i > 0) { - fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state); - remains--; /* I've aborted */ - PQfinish(state[i].con); - state[i].con = NULL; + int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); + if (err != 0 || threads[i].thread == INVALID_THREAD) + { + fprintf(stderr, "cannot create thread: %s\n", strerror(err)); + exit(1); + } + } + else + { + threads[i].thread = INVALID_THREAD; } } - for (;;) + /* wait for threads and accumulate results */ + total_xacts = 0; + INSTR_TIME_SET_ZERO(conn_total_time); + for (i = 0; i < nthreads; i++) { - if (remains <= 0) - { /* all done ? */ - disconnect_all(state); - /* get end time */ - gettimeofday(&end_time, NULL); - printResults(ttype, state, &start_time, &end_time); - if (LOGFILE) - fclose(LOGFILE); - exit(0); + void *ret = NULL; + + if (threads[i].thread == INVALID_THREAD) + ret = threadRun(&threads[i]); + else + pthread_join(threads[i].thread, &ret); + + if (ret != NULL) + { + TResult *r = (TResult *) ret; + total_xacts += r->xacts; + INSTR_TIME_ADD(conn_total_time, r->conn_time); + free(ret); } + } + disconnect_all(state, nclients); + + /* get end time */ + INSTR_TIME_SET_CURRENT(total_time); + INSTR_TIME_SUBTRACT(total_time, start_time); + printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time); + if (LOGFILE) + fclose(LOGFILE); + + return 0; +} + +static void * +threadRun(void *arg) +{ + TState *thread = (TState *) arg; + CState *state = thread->state; + TResult *result; + instr_time start, end; + int nstate = thread->nstate; + int remains = nstate; /* number of remaining clients */ + int i; + + result = malloc(sizeof(TResult)); + INSTR_TIME_SET_ZERO(result->conn_time); + + if (is_connect == 0) + { + /* make connections to the database */ + for (i = 0; i < nstate; i++) + { + if ((state[i].con = doConnect()) == NULL) + goto done; + } + } + + /* time after thread and connections set up */ + INSTR_TIME_SET_CURRENT(result->conn_time); + INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); + + /* send start up queries in async manner */ + for (i = 0; i < nstate; i++) + { + CState *st = &state[i]; + Command **commands = sql_files[st->use_file]; + int prev_ecnt = st->ecnt; + + st->use_file = getrand(0, num_files - 1); + if (!doCustom(st, &result->conn_time)) + remains--; /* I've aborted */ + + if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) + { + fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state); + remains--; /* I've aborted */ + PQfinish(st->con); + st->con = NULL; + } + } + + while (remains > 0) + { + fd_set input_mask; + int maxsock; /* max socket number to be waited */ + int64 now_usec = 0; + int64 min_usec; FD_ZERO(&input_mask); maxsock = -1; - min_usec = -1; - for (i = 0; i < nclients; i++) + min_usec = INT64_MAX; + for (i = 0; i < nstate; i++) { - Command **commands = sql_files[state[i].use_file]; + CState *st = &state[i]; + Command **commands = sql_files[st->use_file]; + int sock; - if (state[i].sleeping) + if (st->sleeping) { int this_usec; - int sock = PQsocket(state[i].con); - if (min_usec < 0) + if (min_usec == INT64_MAX) { - gettimeofday(&now, NULL); - min_usec = 0; + instr_time now; + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); } - this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 + - state[i].until.tv_usec - now.tv_usec; - - if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec)) + this_usec = st->until - now_usec; + if (min_usec > this_usec) min_usec = this_usec; - - FD_SET (sock, &input_mask); - - if (maxsock < sock) - maxsock = sock; } - else if (state[i].con && commands[state[i].state]->type != META_COMMAND) + else if (st->con == NULL) { - int sock = PQsocket(state[i].con); - - if (sock < 0) - { - disconnect_all(state); - exit(1); - } - FD_SET (sock, &input_mask); - - if (maxsock < sock) - maxsock = sock; + continue; } + else if (commands[st->state]->type == META_COMMAND) + { + min_usec = 0; /* the connection is ready to run */ + break; + } + + sock = PQsocket(st->con); + if (sock < 0) + { + fprintf(stderr, "bad socket: %s\n", strerror(errno)); + goto done; + } + + FD_SET(sock, &input_mask); + if (maxsock < sock) + maxsock = sock; } - if (maxsock != -1) + if (min_usec > 0 && maxsock != -1) { - if (min_usec >= 0) + int nsocks; /* return from select(2) */ + + if (min_usec != INT64_MAX) { + struct timeval timeout; timeout.tv_sec = min_usec / 1000000; timeout.tv_usec = min_usec % 1000000; - - nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, - (fd_set *) NULL, &timeout); + nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); } else - nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, - (fd_set *) NULL, (struct timeval *) NULL); + nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); if (nsocks < 0) { if (errno == EINTR) continue; /* must be something wrong */ - disconnect_all(state); fprintf(stderr, "select failed: %s\n", strerror(errno)); - exit(1); + goto done; } -#ifdef NOT_USED - else if (nsocks == 0) - { /* timeout */ - fprintf(stderr, "select timeout\n"); - for (i = 0; i < nclients; i++) - { - fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", - i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); - } - exit(0); - } -#endif } /* ok, backend returns reply */ - for (i = 0; i < nclients; i++) + for (i = 0; i < nstate; i++) { - Command **commands = sql_files[state[i].use_file]; - int prev_ecnt = state[i].ecnt; + CState *st = &state[i]; + Command **commands = sql_files[st->use_file]; + int prev_ecnt = st->ecnt; - if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask) - || commands[state[i].state]->type == META_COMMAND)) + if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) + || commands[st->state]->type == META_COMMAND)) { - doCustom(state, i, debug); + if (!doCustom(st, &result->conn_time)) + remains--; /* I've aborted */ } - if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) + if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { - fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state); + fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state); remains--; /* I've aborted */ - PQfinish(state[i].con); - state[i].con = NULL; + PQfinish(st->con); + st->con = NULL; } } } + +done: + INSTR_TIME_SET_CURRENT(start); + disconnect_all(state, nstate); + result->xacts = 0; + for (i = 0; i < nstate; i++) + result->xacts += state[i].cnt; + INSTR_TIME_SET_CURRENT(end); + INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); + return result; } @@ -2084,6 +2169,87 @@ setalarm(int seconds) pqsignal(SIGALRM, handle_sig_alarm); alarm(seconds); } + +#ifndef ENABLE_THREAD_SAFETY + +/* + * implements pthread using fork. + */ + +typedef struct fork_pthread +{ + pid_t pid; + int pipes[2]; +} fork_pthread; + +static int +pthread_create(pthread_t *thread, + pthread_attr_t *attr, + void * (*start_routine)(void *), + void *arg) +{ + fork_pthread *th; + void *ret; + + th = (fork_pthread *) malloc(sizeof(fork_pthread)); + pipe(th->pipes); + + th->pid = fork(); + if (th->pid == -1) /* error */ + { + free(th); + return errno; + } + if (th->pid != 0) /* parent process */ + { + close(th->pipes[1]); + *thread = th; + return 0; + } + + /* child process */ + close(th->pipes[0]); + + /* set alarm again because the child does not inherit timers */ + if (duration > 0) + setalarm(duration); + + ret = start_routine(arg); + write(th->pipes[1], ret, sizeof(TResult)); + close(th->pipes[1]); + free(th); + exit(0); +} + +static int +pthread_join(pthread_t th, void **thread_return) +{ + int status; + + while (waitpid(th->pid, &status, 0) != th->pid) + { + if (errno != EINTR) + return errno; + } + + if (thread_return != NULL) + { + /* assume result is TResult */ + *thread_return = malloc(sizeof(TResult)); + if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult)) + { + free(*thread_return); + *thread_return = NULL; + } + } + close(th->pipes[0]); + + free(th); + return 0; +} + +#endif + #else /* WIN32 */ static VOID CALLBACK @@ -2110,4 +2276,70 @@ setalarm(int seconds) } } +/* partial pthread implementation for Windows */ + +typedef struct win32_pthread +{ + HANDLE handle; + void *(*routine)(void *); + void *arg; + void *result; +} win32_pthread; + +static unsigned __stdcall +win32_pthread_run(void *arg) +{ + win32_pthread *th = (win32_pthread *) arg; + + th->result = th->routine(th->arg); + + return 0; +} + +static int +pthread_create(pthread_t *thread, + pthread_attr_t *attr, + void * (*start_routine)(void *), + void *arg) +{ + int save_errno; + win32_pthread *th; + + th = (win32_pthread *) malloc(sizeof(win32_pthread)); + th->routine = start_routine; + th->arg = arg; + th->result = NULL; + + th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL); + if (th->handle == NULL) + { + save_errno = errno; + free(th); + return save_errno; + } + + *thread = th; + return 0; +} + +static int +pthread_join(pthread_t th, void **thread_return) +{ + if (th == NULL || th->handle == NULL) + return errno = EINVAL; + + if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0) + { + _dosmaperr(GetLastError()); + return errno; + } + + if (thread_return) + *thread_return = th->result; + + CloseHandle(th->handle); + free(th); + return 0; +} + #endif /* WIN32 */ diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml index 5c30e8499f..c34f7acbbb 100644 --- a/doc/src/sgml/pgbench.sgml +++ b/doc/src/sgml/pgbench.sgml @@ -1,4 +1,4 @@ - + pgbench @@ -171,6 +171,14 @@ pgbench options dbname sessions. Default is 1. + + -j threads + + Number of worker threads. Clients are equally-divided into those + threads and executed in it. The number of clients must be a multiple + number of threads. Default is 1. + + -t transactions