X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=contrib%2Fpgbench%2Fpgbench.c;h=41312e88c64bb10ccc652992f797bac4d2395d05;hb=49639a7b2c52263f2580463394e08b29059d5882;hp=6cb15bd2cab8ec72efcab099a6de498138f0c902;hpb=ba708ea3dc84e35900a43015236f90b1a67d3ca3;p=postgresql diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 6cb15bd2ca..41312e88c6 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -1,33 +1,45 @@ /* - * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.10 2001/09/09 03:15:56 ishii Exp $ + * pgbench.c * - * pgbench: a simple TPC-B like benchmark program for PostgreSQL - * written by Tatsuo Ishii + * A simple benchmark program for PostgreSQL + * Originally written by Tatsuo Ishii and enhanced by many contributors. * - * Copyright (c) 2000 Tatsuo Ishii + * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.79 2008/03/19 03:33:21 ishii Exp $ + * Copyright (c) 2000-2008, PostgreSQL Global Development Group + * ALL RIGHTS RESERVED; + * + * Permission to use, copy, modify, and distribute this software and its + * documentation for any purpose, without fee, and without a written agreement + * is hereby granted, provided that the above copyright notice and this + * paragraph and the following two paragraphs appear in all copies. + * + * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR + * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING + * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS + * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS + * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO + * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * - * Permission to use, copy, modify, and distribute this software and - * its documentation for any purpose and without fee is hereby - * granted, provided that the above copyright notice appear in all - * copies and that both that copyright notice and this permission - * notice appear in supporting documentation, and that the name of the - * author not be used in advertising or publicity pertaining to - * distribution of the software without specific, written prior - * permission. The author makes no representations about the - * suitability of this software for any purpose. It is provided "as - * is" without express or implied warranty. */ #include "postgres_fe.h" #include "libpq-fe.h" -#include +#include #ifdef WIN32 -#include "win32.h" +#undef FD_SETSIZE +#define FD_SETSIZE 1024 +#include #else #include #include +#endif /* ! WIN32 */ #ifdef HAVE_GETOPT_H #include @@ -37,25 +49,38 @@ #include #endif -/* for getrlimit */ -#include +#ifdef HAVE_SYS_RESOURCE_H +#include /* for getrlimit */ +#endif + +extern char *optarg; +extern int optind; -#endif /* WIN32 */ /******************************************************************** * some configurable parameters */ -#define MAXCLIENTS 1024 /* max number of clients allowed */ +/* max number of clients allowed */ +#ifdef FD_SETSIZE +#define MAXCLIENTS (FD_SETSIZE - 10) +#else +#define MAXCLIENTS 1024 +#endif int nclients = 1; /* default number of simulated clients */ -int nxacts = 10; /* default number of transactions per - * clients */ +int nxacts = 10; /* default number of transactions per clients */ /* - * scaling factor. for example, tps = 10 will make 1000000 tuples of + * scaling factor. for example, scale = 10 will make 1000000 tuples of * accounts table. */ -int tps = 1; +int scale = 1; + +/* + * fillfactor. for example, fillfactor = 90 will use only 90 percent + * space during inserts and leave 10 percent free. + */ +int fillfactor = 100; /* * end of configurable parameters @@ -65,73 +90,235 @@ int tps = 1; #define ntellers 10 #define naccounts 100000 -int remains; /* number of remained clients */ +FILE *LOGFILE = NULL; + +bool use_log; /* log transaction latencies to a file */ -int is_connect; /* establish connection for each transactoin */ +int remains; /* number of remaining clients */ + +int is_connect; /* establish connection for each transaction */ char *pghost = ""; -char *pgport = NULL; +char *pgport = ""; char *pgoptions = NULL; char *pgtty = NULL; char *login = NULL; -char *pwd = NULL; char *dbName; +/* variable definitions */ +typedef struct +{ + char *name; /* variable name */ + char *value; /* its value */ +} Variable; + +#define MAX_FILES 128 /* max number of SQL script files allowed */ + +/* + * structures used in custom query mode + */ + typedef struct { PGconn *con; /* connection handle to DB */ + int id; /* client No. */ int state; /* state No. */ int cnt; /* xacts count */ int ecnt; /* error count */ - int listen; /* none 0 indicates that an async query - * has been sent */ - int aid; /* account id for this transaction */ - int bid; /* branch id for this transaction */ - int tid; /* teller id for this transaction */ - int delta; - int abalance; -} CState; + int listen; /* 0 indicates that an async query has been + * sent */ + int sleeping; /* 1 indicates that the client is napping */ + struct timeval until; /* napping until */ + Variable *variables; /* array of variable definitions */ + int nvariables; + struct timeval txn_begin; /* used for measuring latencies */ + int use_file; /* index in sql_files for this client */ + bool prepared[MAX_FILES]; +} CState; + +/* + * queries read from files + */ +#define SQL_COMMAND 1 +#define META_COMMAND 2 +#define MAX_ARGS 10 + +typedef enum QueryMode +{ + QUERY_SIMPLE, /* simple query */ + QUERY_EXTENDED, /* extended query */ + QUERY_PREPARED, /* extended query with prepared statements */ + NUM_QUERYMODE +} QueryMode; + +static QueryMode querymode = QUERY_SIMPLE; +static const char *QUERYMODE[] = { "simple", "extended", "prepared" }; + +typedef struct +{ + int type; /* command type (SQL_COMMAND or META_COMMAND) */ + int argc; /* number of commands */ + char *argv[MAX_ARGS]; /* command list */ +} Command; + +Command **sql_files[MAX_FILES]; /* SQL script files */ +int num_files; /* its number */ + +/* default scenario */ +static char *tpc_b = { + "\\set nbranches :scale\n" + "\\set ntellers 10 * :scale\n" + "\\set naccounts 100000 * :scale\n" + "\\setrandom aid 1 :naccounts\n" + "\\setrandom bid 1 :nbranches\n" + "\\setrandom tid 1 :ntellers\n" + "\\setrandom delta -5000 5000\n" + "BEGIN;\n" + "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" + "SELECT abalance FROM accounts WHERE aid = :aid;\n" + "UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n" + "UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" + "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" + "END;\n" +}; + +/* -N case */ +static char *simple_update = { + "\\set nbranches :scale\n" + "\\set ntellers 10 * :scale\n" + "\\set naccounts 100000 * :scale\n" + "\\setrandom aid 1 :naccounts\n" + "\\setrandom bid 1 :nbranches\n" + "\\setrandom tid 1 :ntellers\n" + "\\setrandom delta -5000 5000\n" + "BEGIN;\n" + "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" + "SELECT abalance FROM accounts WHERE aid = :aid;\n" + "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" + "END;\n" +}; + +/* -S case */ +static char *select_only = { + "\\set naccounts 100000 * :scale\n" + "\\setrandom aid 1 :naccounts\n" + "SELECT abalance FROM accounts WHERE aid = :aid;\n" +}; + +/* Connection overhead time */ +static struct timeval conn_total_time = {0, 0}; + +/* Calculate total time */ +static void +addTime(struct timeval *t1, struct timeval *t2, struct timeval *result) +{ + int sec = t1->tv_sec + t2->tv_sec; + int usec = t1->tv_usec + t2->tv_usec; + if (usec >= 1000000) + { + usec -= 1000000; + sec++; + } + result->tv_sec = sec; + result->tv_usec = usec; +} + +/* Calculate time difference */ +static void +diffTime(struct timeval *t1, struct timeval *t2, struct timeval *result) +{ + int sec = t1->tv_sec - t2->tv_sec; + int usec = t1->tv_usec - t2->tv_usec; + if (usec < 0) + { + usec += 1000000; + sec--; + } + result->tv_sec = sec; + result->tv_usec = usec; +} static void -usage() +usage(void) { - fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-U login][-P password][-d][dbname]\n"); - fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n"); + fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-D varname=value][-n][-C][-v][-S][-N][-M querymode][-f filename][-l][-U login][-d][dbname]\n"); + fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor] [-F fillfactor] [-U login][-d][dbname]\n"); } -/* random number generator */ +/* random number generator: uniform distribution from min to max inclusive */ static int getrand(int min, int max) { - return (min + (int) (max * 1.0 * rand() / (RAND_MAX + 1.0))); + /* + * Odd coding is so that min and max have approximately the same chance of + * being selected as do numbers between them. + */ + return min + (int) (((max - min + 1) * (double) random()) / (MAX_RANDOM_VALUE + 1.0)); +} + +/* call PQexec() and exit() on failure */ +static void +executeStatement(PGconn *con, const char *sql) +{ + PGresult *res; + + res = PQexec(con, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "%s", PQerrorMessage(con)); + exit(1); + } + PQclear(res); } /* set up a connection to the backend */ -static PGconn *doConnect() +static PGconn * +doConnect(void) { - PGconn *con; - - con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, - login, pwd); - if (con == NULL) - { - fprintf(stderr, "Connection to database '%s' failed.\n", dbName); - fprintf(stderr, "Memory allocatin problem?\n"); - return(NULL); - } - - if (PQstatus(con) == CONNECTION_BAD) - { - fprintf(stderr, "Connection to database '%s' failed.\n", dbName); - - if (PQerrorMessage(con)) - fprintf(stderr, "%s", PQerrorMessage(con)); - else - fprintf(stderr, "No explanation from the backend\n"); + PGconn *conn; + static char *password = NULL; + bool new_pass; - return(NULL); - } - return (con); + /* + * Start the connection. Loop until we have a password if requested by + * backend. + */ + do + { + new_pass = false; + + conn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, + login, password); + if (!conn) + { + fprintf(stderr, "Connection to database \"%s\" failed\n", + dbName); + return NULL; + } + + if (PQstatus(conn) == CONNECTION_BAD && + PQconnectionNeedsPassword(conn) && + password == NULL && + !feof(stdin)) + { + PQfinish(conn); + password = simple_prompt("Password: ", 100, false); + new_pass = true; + } + } while (new_pass); + + /* check to see that the backend connection was successfully made */ + if (PQstatus(conn) == CONNECTION_BAD) + { + fprintf(stderr, "Connection to database \"%s\" failed:\n%s", + dbName, PQerrorMessage(conn)); + PQfinish(conn); + return NULL; + } + + executeStatement(conn, "SET search_path = public"); + + return conn; } /* throw away response from backend */ @@ -150,35 +337,250 @@ discard_response(CState * state) /* check to see if the SQL result was good */ static int -check(CState * state, PGresult *res, int n, int good) +check(CState * state, PGresult *res, int n) { CState *st = &state[n]; - if (res && PQresultStatus(res) != good) + switch (PQresultStatus(res)) + { + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: + /* OK */ + break; + default: + fprintf(stderr, "Client %d aborted in state %d: %s", + n, st->state, PQerrorMessage(st->con)); + remains--; /* I've aborted */ + PQfinish(st->con); + st->con = NULL; + return (-1); + } + return (0); /* OK */ +} + +static int +compareVariables(const void *v1, const void *v2) +{ + return strcmp(((const Variable *) v1)->name, + ((const Variable *) v2)->name); +} + +static char * +getVariable(CState * st, char *name) +{ + Variable key, + *var; + + /* On some versions of Solaris, bsearch of zero items dumps core */ + if (st->nvariables <= 0) + return NULL; + + key.name = name; + var = (Variable *) bsearch((void *) &key, + (void *) st->variables, + st->nvariables, + sizeof(Variable), + compareVariables); + if (var != NULL) + return var->value; + else + return NULL; +} + +static int +putVariable(CState * st, char *name, char *value) +{ + Variable key, + *var; + + key.name = name; + /* On some versions of Solaris, bsearch of zero items dumps core */ + if (st->nvariables > 0) + var = (Variable *) bsearch((void *) &key, + (void *) st->variables, + st->nvariables, + sizeof(Variable), + compareVariables); + else + var = NULL; + + if (var == NULL) { - fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con)); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return (-1); + Variable *newvars; + + if (st->variables) + newvars = (Variable *) realloc(st->variables, + (st->nvariables + 1) * sizeof(Variable)); + else + newvars = (Variable *) malloc(sizeof(Variable)); + + if (newvars == NULL) + return false; + + st->variables = newvars; + + var = &newvars[st->nvariables]; + + var->name = NULL; + var->value = NULL; + + if ((var->name = strdup(name)) == NULL + || (var->value = strdup(value)) == NULL) + { + free(var->name); + free(var->value); + return false; + } + + st->nvariables++; + + qsort((void *) st->variables, st->nvariables, sizeof(Variable), + compareVariables); } - return (0); /* OK */ + else + { + char *val; + + if ((val = strdup(value)) == NULL) + return false; + + free(var->value); + var->value = val; + } + + return true; +} + +static char * +parseVariable(const char *sql, int *eaten) +{ + int i = 0; + char *name; + + do + { + i++; + } while (isalnum((unsigned char) sql[i]) || sql[i] == '_'); + if (i == 1) + return NULL; + + name = malloc(i); + if (name == NULL) + return NULL; + memcpy(name, &sql[1], i - 1); + name[i - 1] = '\0'; + + *eaten = i; + return name; +} + +static char * +replaceVariable(char **sql, char *param, int len, char *value) +{ + int valueln = strlen(value); + + if (valueln > len) + { + char *tmp; + size_t offset = param - *sql; + + tmp = realloc(*sql, strlen(*sql) - len + valueln + 1); + if (tmp == NULL) + { + free(*sql); + return NULL; + } + *sql = tmp; + param = *sql + offset; + } + + if (valueln != len) + memmove(param + valueln, param + len, strlen(param + len) + 1); + strncpy(param, value, valueln); + + return param + valueln; +} + +static char * +assignVariables(CState * st, char *sql) +{ + char *p, + *name, + *val; + + p = sql; + while ((p = strchr(p, ':')) != NULL) + { + int eaten; + + name = parseVariable(p, &eaten); + if (name == NULL) + { + while (*p == ':') { p++; } + continue; + } + + val = getVariable(st, name); + free(name); + if (val == NULL) + { + p++; + continue; + } + + if ((p = replaceVariable(&sql, p, eaten, val)) == NULL) + return NULL; + } + + return sql; } -/* process a transaction */ static void -doOne(CState * state, int n, int debug) +getQueryParams(CState *st, const Command *command, const char **params) +{ + int i; + + for (i = 0; i < command->argc - 1; i++) + params[i] = getVariable(st, command->argv[i+1]); +} + +#define MAX_PREPARE_NAME 32 +static void +preparedStatementName(char *buffer, int file, int state) +{ + sprintf(buffer, "P%d_%d", file, state); +} + +static void +doCustom(CState * state, int n, int debug) { - char sql[256]; PGresult *res; CState *st = &state[n]; + Command **commands; + +top: + commands = sql_files[st->use_file]; + + if (st->sleeping) + { /* are we sleeping? */ + int usec; + struct timeval now; + + gettimeofday(&now, NULL); + usec = (st->until.tv_sec - now.tv_sec) * 1000000 + + st->until.tv_usec - now.tv_usec; + if (usec <= 0) + st->sleeping = 0; /* Done sleeping, go ahead with next command */ + else + return; /* Still sleeping, nothing to do here */ + } if (st->listen) { /* are we receiver? */ - if (debug) - fprintf(stderr, "client %d receiving\n", n); - while (PQisBusy(st->con) == TRUE) + if (commands[st->state]->type == SQL_COMMAND) { + if (debug) + fprintf(stderr, "client %d receiving\n", n); if (!PQconsumeInput(st->con)) { /* there's something wrong */ fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); @@ -187,233 +589,356 @@ doOne(CState * state, int n, int debug) st->con = NULL; return; } + if (PQisBusy(st->con)) + return; /* don't have the whole result yet */ } - switch (st->state) + /* + * transaction finished: record the time it took in the log + */ + if (use_log && commands[st->state + 1] == NULL) { - case 0: /* response to "begin" */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_COMMAND_OK)) - return; - PQclear(res); - discard_response(st); - break; - case 1: /* response to "update accounts..." */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_COMMAND_OK)) - return; - PQclear(res); - discard_response(st); - break; - case 2: /* response to "select abalance ..." */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_TUPLES_OK)) - return; - PQclear(res); - discard_response(st); - break; - case 3: /* response to "update tellers ..." */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_COMMAND_OK)) - return; - PQclear(res); - discard_response(st); - break; - case 4: /* response to "update branches ..." */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_COMMAND_OK)) - return; - PQclear(res); - discard_response(st); - break; - case 5: /* response to "insert into history ..." */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_COMMAND_OK)) - return; - PQclear(res); - discard_response(st); - break; - case 6: /* response to "end" */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_COMMAND_OK)) - return; + double diff; + struct timeval now; + + gettimeofday(&now, NULL); + diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 + + (int) (now.tv_usec - st->txn_begin.tv_usec); + + fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n", + st->id, st->cnt, diff, st->use_file, + (long) now.tv_sec, (long) now.tv_usec); + } + + if (commands[st->state]->type == SQL_COMMAND) + { + res = PQgetResult(st->con); + if (check(state, res, n)) + { PQclear(res); - discard_response(st); + return; + } + PQclear(res); + discard_response(st); + } - if (is_connect) - { - PQfinish(st->con); - st->con = NULL; - } + if (commands[st->state + 1] == NULL) + { + if (is_connect) + { + PQfinish(st->con); + st->con = NULL; + } - if (++st->cnt >= nxacts) + if (++st->cnt >= nxacts) + { + remains--; /* I've done */ + if (st->con != NULL) { - remains--; /* I've done */ - if (st->con != NULL) - { - PQfinish(st->con); - st->con = NULL; - } - return; + PQfinish(st->con); + st->con = NULL; } - break; + return; + } } /* increment state counter */ st->state++; - if (st->state > 6) + if (commands[st->state] == NULL) + { st->state = 0; + st->use_file = getrand(0, num_files - 1); + commands = sql_files[st->use_file]; + } } if (st->con == NULL) { - if ((st->con = doConnect()) == NULL) - { - fprintf(stderr, "Client %d aborted in establishing connection.\n", - n); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; - } - } - - switch (st->state) - { - case 0: /* about to start */ - strcpy(sql, "begin"); - st->aid = getrand(1, naccounts * tps); - st->bid = getrand(1, nbranches * tps); - st->tid = getrand(1, ntellers * tps); - st->delta = getrand(1, 1000); - break; - case 1: - sprintf(sql, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid); - break; - case 2: - sprintf(sql, "select abalance from accounts where aid = %d", st->aid); - break; - case 3: - sprintf(sql, "update tellers set tbalance = tbalance + %d where tid = %d\n", - st->delta, st->tid); - break; - case 4: - sprintf(sql, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid); - break; - case 5: - sprintf(sql, "insert into history(tid,bid,aid,delta,time) values(%d,%d,%d,%d,'now')", - st->tid, st->bid, st->aid, st->delta); - break; - case 6: - strcpy(sql, "end"); - break; + struct timeval t1, t2, t3; + + gettimeofday(&t1, NULL); + if ((st->con = doConnect()) == NULL) + { + fprintf(stderr, "Client %d aborted in establishing connection.\n", + n); + remains--; /* I've aborted */ + PQfinish(st->con); + st->con = NULL; + return; + } + gettimeofday(&t2, NULL); + diffTime(&t2, &t1, &t3); + addTime(&conn_total_time, &t3, &conn_total_time); } - if (debug) - fprintf(stderr, "client %d sending %s\n", n, sql); - if (PQsendQuery(st->con, sql) == 0) + if (use_log && st->state == 0) + gettimeofday(&(st->txn_begin), NULL); + + if (commands[st->state]->type == SQL_COMMAND) { - if (debug) - fprintf(stderr, "PQsendQuery(%s)failed\n", sql); - st->ecnt++; + const Command *command = commands[st->state]; + int r; + + if (querymode == QUERY_SIMPLE) + { + char *sql; + + if ((sql = strdup(command->argv[0])) == NULL + || (sql = assignVariables(st, sql)) == NULL) + { + fprintf(stderr, "out of memory\n"); + st->ecnt++; + return; + } + + if (debug) + fprintf(stderr, "client %d sending %s\n", n, sql); + r = PQsendQuery(st->con, sql); + free(sql); + } + else if (querymode == QUERY_EXTENDED) + { + const char *sql = command->argv[0]; + const char *params[MAX_ARGS]; + + getQueryParams(st, command, params); + + if (debug) + fprintf(stderr, "client %d sending %s\n", n, sql); + r = PQsendQueryParams(st->con, sql, command->argc - 1, + NULL, params, NULL, NULL, 0); + } + else if (querymode == QUERY_PREPARED) + { + char name[MAX_PREPARE_NAME]; + const char *params[MAX_ARGS]; + + if (!st->prepared[st->use_file]) + { + int j; + + for (j = 0; commands[j] != NULL; j++) + { + PGresult *res; + char name[MAX_PREPARE_NAME]; + + if (commands[j]->type != SQL_COMMAND) + continue; + preparedStatementName(name, st->use_file, j); + res = PQprepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + fprintf(stderr, "%s", PQerrorMessage(st->con)); + PQclear(res); + } + st->prepared[st->use_file] = true; + } + + getQueryParams(st, command, params); + preparedStatementName(name, st->use_file, st->state); + + if (debug) + fprintf(stderr, "client %d sending %s\n", n, name); + r = PQsendQueryPrepared(st->con, name, command->argc - 1, + params, NULL, NULL, 0); + } + else /* unknown sql mode */ + r = 0; + + if (r == 0) + { + if (debug) + fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]); + st->ecnt++; + } + else + st->listen = 1; /* flags that should be listened */ } - else + else if (commands[st->state]->type == META_COMMAND) { - st->listen++; /* flags that should be listned */ - } -} + int argc = commands[st->state]->argc, + i; + char **argv = commands[st->state]->argv; -/* process a select only transaction */ -static void -doSelectOnly(CState * state, int n, int debug) -{ - char sql[256]; - PGresult *res; - CState *st = &state[n]; - - if (st->listen) - { /* are we receiver? */ if (debug) - fprintf(stderr, "client %d receiving\n", n); - while (PQisBusy(st->con) == TRUE) { - if (!PQconsumeInput(st->con)) - { /* there's something wrong */ - fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; - } + fprintf(stderr, "client %d executing \\%s", n, argv[0]); + for (i = 1; i < argc; i++) + fprintf(stderr, " %s", argv[i]); + fprintf(stderr, "\n"); } - switch (st->state) + if (pg_strcasecmp(argv[0], "setrandom") == 0) { - case 0: /* response to "select abalance ..." */ - res = PQgetResult(st->con); - if (check(state, res, n, PGRES_TUPLES_OK)) + char *var; + int min, + max; + char res[64]; + + if (*argv[2] == ':') + { + if ((var = getVariable(st, argv[2] + 1)) == NULL) + { + fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); + st->ecnt++; return; - PQclear(res); - discard_response(st); + } + min = atoi(var); + } + else + min = atoi(argv[2]); + +#ifdef NOT_USED + if (min < 0) + { + fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min); + st->ecnt++; + return; + } +#endif + + if (*argv[3] == ':') + { + if ((var = getVariable(st, argv[3] + 1)) == NULL) + { + fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); + st->ecnt++; + return; + } + max = atoi(var); + } + else + max = atoi(argv[3]); + + if (max < min || max > MAX_RANDOM_VALUE) + { + fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); + st->ecnt++; + return; + } + +#ifdef DEBUG + printf("min: %d max: %d random: %d\n", min, max, getrand(min, max)); +#endif + snprintf(res, sizeof(res), "%d", getrand(min, max)); + + if (putVariable(st, argv[1], res) == false) + { + fprintf(stderr, "%s: out of memory\n", argv[0]); + st->ecnt++; + return; + } - if (is_connect) + st->listen = 1; + } + else if (pg_strcasecmp(argv[0], "set") == 0) + { + char *var; + int ope1, + ope2; + char res[64]; + + if (*argv[2] == ':') + { + if ((var = getVariable(st, argv[2] + 1)) == NULL) { - PQfinish(st->con); - st->con = NULL; + fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); + st->ecnt++; + return; } + ope1 = atoi(var); + } + else + ope1 = atoi(argv[2]); - if (++st->cnt >= nxacts) + if (argc < 5) + snprintf(res, sizeof(res), "%d", ope1); + else + { + if (*argv[4] == ':') { - remains--; /* I've done */ - if (st->con != NULL) + if ((var = getVariable(st, argv[4] + 1)) == NULL) { - PQfinish(st->con); - st->con = NULL; + fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); + st->ecnt++; + return; } + ope2 = atoi(var); + } + else + ope2 = atoi(argv[4]); + + if (strcmp(argv[3], "+") == 0) + snprintf(res, sizeof(res), "%d", ope1 + ope2); + else if (strcmp(argv[3], "-") == 0) + snprintf(res, sizeof(res), "%d", ope1 - ope2); + else if (strcmp(argv[3], "*") == 0) + snprintf(res, sizeof(res), "%d", ope1 * ope2); + else if (strcmp(argv[3], "/") == 0) + { + if (ope2 == 0) + { + fprintf(stderr, "%s: division by zero\n", argv[0]); + st->ecnt++; + return; + } + snprintf(res, sizeof(res), "%d", ope1 / ope2); + } + else + { + fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); + st->ecnt++; return; } - break; + } + + if (putVariable(st, argv[1], res) == false) + { + fprintf(stderr, "%s: out of memory\n", argv[0]); + st->ecnt++; + return; + } + + st->listen = 1; } + else if (pg_strcasecmp(argv[0], "sleep") == 0) + { + char *var; + int usec; + struct timeval now; - /* increment state counter */ - st->state++; - if (st->state > 0) - st->state = 0; - } + if (*argv[1] == ':') + { + if ((var = getVariable(st, argv[1] + 1)) == NULL) + { + fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); + st->ecnt++; + return; + } + usec = atoi(var); + } + else + usec = atoi(argv[1]); - if (st->con == NULL) - { - if ((st->con = doConnect()) == NULL) - { - fprintf(stderr, "Client %d aborted in establishing connection.\n", - n); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; - } - } + if (argc > 2) + { + if (pg_strcasecmp(argv[2], "ms") == 0) + usec *= 1000; + else if (pg_strcasecmp(argv[2], "s") == 0) + usec *= 1000000; + } + else + usec *= 1000000; - switch (st->state) - { - case 0: - st->aid = getrand(1, naccounts * tps); - sprintf(sql, "select abalance from accounts where aid = %d", st->aid); - break; - } + gettimeofday(&now, NULL); + st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000; + st->until.tv_usec = (now.tv_usec + usec) % 1000000; + st->sleeping = 1; - if (debug) - fprintf(stderr, "client %d sending %s\n", n, sql); + st->listen = 1; + } - if (PQsendQuery(st->con, sql) == 0) - { - if (debug) - fprintf(stderr, "PQsendQuery(%s)failed\n", sql); - st->ecnt++; - } - else - { - st->listen++; /* flags that should be listned */ + goto top; } } @@ -432,215 +957,545 @@ disconnect_all(CState * state) /* create tables and setup data */ static void -init() +init(void) { PGconn *con; PGresult *res; + /* + * Note: TPC-B requires at least 100 bytes per row, and the "filler" + * fields in these table declarations were intended to comply with that. + * But because they default to NULLs, they don't actually take any + * space. We could fix that by giving them non-null default values. + * However, that would completely break comparability of pgbench + * results with prior versions. Since pgbench has never pretended + * to be fully TPC-B compliant anyway, we stick with the historical + * behavior. + */ static char *DDLs[] = { - "drop table branches", - "create table branches(bid int, primary key(bid),bbalance int,filler char(88))", - "drop table tellers", - "create table tellers(tid int, primary key(tid),bid int,tbalance int,filler char(84))", - "drop table accounts", - "create table accounts(aid int,primary key(aid),bid int,abalance int,filler char(84))", - "drop table history", - "create table history(tid int,bid int,aid int,delta int,time timestamp,filler char(22))"}; + "drop table if exists branches", + "create table branches(bid int not null,bbalance int,filler char(88)) with (fillfactor=%d)", + "drop table if exists tellers", + "create table tellers(tid int not null,bid int,tbalance int,filler char(84)) with (fillfactor=%d)", + "drop table if exists accounts", + "create table accounts(aid int not null,bid int,abalance int,filler char(84)) with (fillfactor=%d)", + "drop table if exists history", + "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"}; + static char *DDLAFTERs[] = { + "alter table branches add primary key (bid)", + "alter table tellers add primary key (tid)", + "alter table accounts add primary key (aid)"}; + + char sql[256]; - int i; + int i; + + if ((con = doConnect()) == NULL) + exit(1); + + for (i = 0; i < lengthof(DDLs); i++) + { + /* + * set fillfactor for branches, tellers and accounts tables + */ + if ((strstr(DDLs[i], "create table branches") == DDLs[i]) || + (strstr(DDLs[i], "create table tellers") == DDLs[i]) || + (strstr(DDLs[i], "create table accounts") == DDLs[i])) + { + char ddl_stmt[128]; + + snprintf(ddl_stmt, 128, DDLs[i], fillfactor); + executeStatement(con, ddl_stmt); + continue; + } + else + executeStatement(con, DDLs[i]); + } + + executeStatement(con, "begin"); + + for (i = 0; i < nbranches * scale; i++) + { + snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1); + executeStatement(con, sql); + } + + for (i = 0; i < ntellers * scale; i++) + { + snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)" + ,i + 1, i / ntellers + 1); + executeStatement(con, sql); + } + + executeStatement(con, "commit"); + + /* + * fill the accounts table with some data + */ + fprintf(stderr, "creating tables...\n"); + + executeStatement(con, "begin"); + executeStatement(con, "truncate accounts"); + + res = PQexec(con, "copy accounts from stdin"); + if (PQresultStatus(res) != PGRES_COPY_IN) + { + fprintf(stderr, "%s", PQerrorMessage(con)); + exit(1); + } + PQclear(res); + + for (i = 0; i < naccounts * scale; i++) + { + int j = i + 1; + + snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0); + if (PQputline(con, sql)) + { + fprintf(stderr, "PQputline failed\n"); + exit(1); + } + + if (j % 10000 == 0) + fprintf(stderr, "%d tuples done.\n", j); + } + if (PQputline(con, "\\.\n")) + { + fprintf(stderr, "very last PQputline failed\n"); + exit(1); + } + if (PQendcopy(con)) + { + fprintf(stderr, "PQendcopy failed\n"); + exit(1); + } + executeStatement(con, "commit"); + + /* + * create indexes + */ + fprintf(stderr, "set primary key...\n"); + for (i = 0; i < lengthof(DDLAFTERs); i++) + executeStatement(con, DDLAFTERs[i]); + + /* vacuum */ + fprintf(stderr, "vacuum..."); + executeStatement(con, "vacuum analyze"); + + fprintf(stderr, "done.\n"); + PQfinish(con); +} + +/* + * Parse the raw sql and replace :param to $n. + */ +static bool +parseQuery(Command *cmd, const char *raw_sql) +{ + char *sql, + *p; + + sql = strdup(raw_sql); + if (sql == NULL) + return false; + cmd->argc = 1; + + p = sql; + while ((p = strchr(p, ':')) != NULL) + { + char var[12]; + char *name; + int eaten; + + name = parseVariable(p, &eaten); + if (name == NULL) + { + while (*p == ':') { p++; } + continue; + } + + if (cmd->argc >= MAX_ARGS) + { + fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql); + return false; + } + + sprintf(var, "$%d", cmd->argc); + if ((p = replaceVariable(&sql, p, eaten, var)) == NULL) + return false; + + cmd->argv[cmd->argc] = name; + cmd->argc++; + } + + cmd->argv[0] = sql; + return true; +} + +static Command * +process_commands(char *buf) +{ + const char delim[] = " \f\n\r\t\v"; + + Command *my_commands; + int j; + char *p, + *tok; + + if ((p = strchr(buf, '\n')) != NULL) + *p = '\0'; + + p = buf; + while (isspace((unsigned char) *p)) + p++; + + if (*p == '\0' || strncmp(p, "--", 2) == 0) + { + return NULL; + } + + my_commands = (Command *) malloc(sizeof(Command)); + if (my_commands == NULL) + { + return NULL; + } + + my_commands->argc = 0; + + if (*p == '\\') + { + my_commands->type = META_COMMAND; + + j = 0; + tok = strtok(++p, delim); + + while (tok != NULL) + { + if ((my_commands->argv[j] = strdup(tok)) == NULL) + return NULL; + + my_commands->argc++; + + j++; + tok = strtok(NULL, delim); + } + + if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0) + { + if (my_commands->argc < 4) + { + fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); + return NULL; + } + + for (j = 4; j < my_commands->argc; j++) + fprintf(stderr, "%s: extra argument \"%s\" ignored\n", + my_commands->argv[0], my_commands->argv[j]); + } + else if (pg_strcasecmp(my_commands->argv[0], "set") == 0) + { + if (my_commands->argc < 3) + { + fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); + return NULL; + } + + for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++) + fprintf(stderr, "%s: extra argument \"%s\" ignored\n", + my_commands->argv[0], my_commands->argv[j]); + } + else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0) + { + if (my_commands->argc < 2) + { + fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); + return NULL; + } - if ((con = doConnect()) == NULL) - exit(1); + if (my_commands->argc >= 3) + { + if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && + pg_strcasecmp(my_commands->argv[2], "ms") != 0 && + pg_strcasecmp(my_commands->argv[2], "s")) + { + fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n", + my_commands->argv[0], my_commands->argv[2]); + return NULL; + } + } - for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++) - { - res = PQexec(con, DDLs[i]); - if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK) + for (j = 3; j < my_commands->argc; j++) + fprintf(stderr, "%s: extra argument \"%s\" ignored\n", + my_commands->argv[0], my_commands->argv[j]); + } + else { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); + fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]); + return NULL; } - PQclear(res); } - - res = PQexec(con, "begin"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + else { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); - } + my_commands->type = SQL_COMMAND; - for (i = 0; i < nbranches * tps; i++) - { - sprintf(sql, "insert into branches(bid,bbalance) values(%d,0)", i + 1); - res = PQexec(con, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + switch (querymode) { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); + case QUERY_SIMPLE: + if ((my_commands->argv[0] = strdup(p)) == NULL) + return NULL; + my_commands->argc++; + break; + case QUERY_EXTENDED: + case QUERY_PREPARED: + if (!parseQuery(my_commands, p)) + return NULL; + break; + default: + return NULL; } - PQclear(res); } - for (i = 0; i < ntellers * tps; i++) + return my_commands; +} + +static int +process_file(char *filename) +{ +#define COMMANDS_ALLOC_NUM 128 + + Command **my_commands; + FILE *fd; + int lineno; + char buf[BUFSIZ]; + int alloc_num; + + if (num_files >= MAX_FILES) { - sprintf(sql, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)" - ,i + 1, i / ntellers + 1); - res = PQexec(con, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); - } - PQclear(res); + fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES); + exit(1); } - res = PQexec(con, "end"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + alloc_num = COMMANDS_ALLOC_NUM; + my_commands = (Command **) malloc(sizeof(Command *) * alloc_num); + if (my_commands == NULL) + return false; + + if (strcmp(filename, "-") == 0) + fd = stdin; + else if ((fd = fopen(filename, "r")) == NULL) { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); + fprintf(stderr, "%s: %s\n", filename, strerror(errno)); + return false; } + lineno = 0; - /* - * occupy accounts table with some data - */ - fprintf(stderr, "creating tables...\n"); - for (i = 0; i < naccounts * tps; i++) + while (fgets(buf, sizeof(buf), fd) != NULL) { - int j = i + 1; + Command *commands; + int i; - if (j % 10000 == 1) + i = 0; + while (isspace((unsigned char) buf[i])) + i++; + + if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0) { - res = PQexec(con, "copy accounts from stdin"); - if (PQresultStatus(res) != PGRES_COPY_IN) + commands = process_commands(&buf[i]); + if (commands == NULL) { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); + fclose(fd); + return false; } - PQclear(res); } + else + continue; - sprintf(sql, "%d\t%d\t%d\t\n", j, j / naccounts, 0); - if (PQputline(con, sql)) - { - fprintf(stderr, "PQputline failed\n"); - exit(1); - } + my_commands[lineno] = commands; + lineno++; - if (j % 10000 == 0) + if (lineno >= alloc_num) { - /* - * every 10000 tuples, we commit the copy command. - * this should avoid generating too much WAL logs - */ - fprintf(stderr, "%d tuples done.\n", j); - if (PQputline(con, "\\.\n")) + alloc_num += COMMANDS_ALLOC_NUM; + my_commands = realloc(my_commands, sizeof(Command *) * alloc_num); + if (my_commands == NULL) { - fprintf(stderr, "very last PQputline failed\n"); - exit(1); + fclose(fd); + return false; } + } + } + fclose(fd); - if (PQendcopy(con)) - { - fprintf(stderr, "PQendcopy failed\n"); - exit(1); - } - /* - * do a checkpoint to purge the old WAL logs - */ - res = PQexec(con, "checkpoint"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + my_commands[lineno] = NULL; + + sql_files[num_files++] = my_commands; + + return true; +} + +static Command ** +process_builtin(char *tb) +{ +#define COMMANDS_ALLOC_NUM 128 + + Command **my_commands; + int lineno; + char buf[BUFSIZ]; + int alloc_num; + + if (*tb == '\0') + return NULL; + + alloc_num = COMMANDS_ALLOC_NUM; + my_commands = (Command **) malloc(sizeof(Command *) * alloc_num); + if (my_commands == NULL) + return NULL; + + lineno = 0; + + for (;;) + { + char *p; + Command *commands; + + p = buf; + while (*tb && *tb != '\n') + *p++ = *tb++; + + if (*tb == '\0') + break; + + if (*tb == '\n') + tb++; + + *p = '\0'; + + commands = process_commands(buf); + if (commands == NULL) + { + return NULL; + } + + my_commands[lineno] = commands; + lineno++; + + if (lineno >= alloc_num) + { + alloc_num += COMMANDS_ALLOC_NUM; + my_commands = realloc(my_commands, sizeof(Command *) * alloc_num); + if (my_commands == NULL) { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); + return NULL; } } } - /* vacuum */ - fprintf(stderr, "vacuum..."); - res = PQexec(con, "vacuum analyze"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); - } - fprintf(stderr, "done.\n"); + my_commands[lineno] = NULL; - PQfinish(con); + return my_commands; } /* print out results */ static void printResults( int ttype, CState * state, - struct timeval * tv1, struct timeval * tv2, - struct timeval * tv3) + struct timeval * start_time, struct timeval * end_time) { double t1, t2; int i; int normal_xacts = 0; + char *s; for (i = 0; i < nclients; i++) normal_xacts += state[i].cnt; - t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec); + t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec); t1 = normal_xacts * 1000000.0 / t1; - t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec); + t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 + + (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec); t2 = normal_xacts * 1000000.0 / t2; - printf("transaction type: %s\n", ttype == 0 ? "TPC-B (sort of)" : "SELECT only"); - printf("scaling factor: %d\n", tps); + if (ttype == 0) + s = "TPC-B (sort of)"; + else if (ttype == 2) + s = "Update only accounts"; + else if (ttype == 1) + s = "SELECT only"; + else + s = "Custom query"; + + printf("transaction type: %s\n", s); + printf("scaling factor: %d\n", scale); + printf("query mode: %s\n", QUERYMODE[querymode]); printf("number of clients: %d\n", nclients); printf("number of transactions per client: %d\n", nxacts); printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients); - printf("tps = %f(including connections establishing)\n", t1); - printf("tps = %f(excluding connections establishing)\n", t2); + printf("tps = %f (including connections establishing)\n", t1); + printf("tps = %f (excluding connections establishing)\n", t2); } int main(int argc, char **argv) { - extern char *optarg; - extern int optind, - opterr, - optopt; int c; int is_init_mode = 0; /* initialize mode? */ - int is_no_vacuum = 0; /* no vacuum at all before - * testing? */ - int is_full_vacuum = 0; /* do full vacuum before testing? */ + int is_no_vacuum = 0; /* no vacuum at all before testing? */ + int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ int debug = 0; /* debug flag */ - int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT - * only */ + int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, + * 2: skip update of branches and tellers */ + char *filename = NULL; - static CState state[MAXCLIENTS]; /* clients status */ + CState *state; /* status of clients */ - struct timeval tv1; /* start up time */ - struct timeval tv2; /* after establishing all connections to - * the backend */ - struct timeval tv3; /* end time */ + struct timeval start_time; /* start up time */ + struct timeval end_time; /* end time */ int i; fd_set input_mask; int nsocks; /* return from select(2) */ int maxsock; /* max socket number to be waited */ + struct timeval now; + struct timeval timeout; + int min_usec; -#ifndef __CYGWIN__ +#ifdef HAVE_GETRLIMIT struct rlimit rlim; - #endif PGconn *con; PGresult *res; + char *env; + + char val[64]; + +#ifdef WIN32 + /* stderr is buffered on Win32. */ + setvbuf(stderr, NULL, _IONBF, 0); +#endif + + if ((env = getenv("PGHOST")) != NULL && *env != '\0') + pghost = env; + if ((env = getenv("PGPORT")) != NULL && *env != '\0') + pgport = env; + else if ((env = getenv("PGUSER")) != NULL && *env != '\0') + login = env; + + state = (CState *) malloc(sizeof(CState)); + if (state == NULL) + { + fprintf(stderr, "Couldn't allocate memory for state\n"); + exit(1); + } + + memset(state, 0, sizeof(*state)); - while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CS")) != EOF) + while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:CNSlf:D:F:M:")) != -1) { switch (c) { @@ -654,7 +1509,7 @@ main(int argc, char **argv) is_no_vacuum++; break; case 'v': - is_full_vacuum++; + do_vacuum_accounts++; break; case 'p': pgport = optarg; @@ -665,40 +1520,42 @@ main(int argc, char **argv) case 'S': ttype = 1; break; + case 'N': + ttype = 2; + break; case 'c': nclients = atoi(optarg); if (nclients <= 0 || nclients > MAXCLIENTS) { - fprintf(stderr, "wrong number of clients: %d\n", nclients); + fprintf(stderr, "invalid number of clients: %d\n", nclients); exit(1); } -#ifndef __CYGWIN__ -#ifdef RLIMIT_NOFILE /* most platform uses RLIMIT_NOFILE */ +#ifdef HAVE_GETRLIMIT +#ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */ if (getrlimit(RLIMIT_NOFILE, &rlim) == -1) - { #else /* but BSD doesn't ... */ if (getrlimit(RLIMIT_OFILE, &rlim) == -1) +#endif /* RLIMIT_NOFILE */ { -#endif /* HAVE_RLIMIT_NOFILE */ - fprintf(stderr, "getrlimit failed. reason: %s\n", strerror(errno)); + fprintf(stderr, "getrlimit failed: %s\n", strerror(errno)); exit(1); } if (rlim.rlim_cur <= (nclients + 2)) { - fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur); - fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n"); + fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur); + fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n"); exit(1); } -#endif /* #ifndef __CYGWIN__ */ +#endif /* HAVE_GETRLIMIT */ + break; + case 'C': + is_connect = 1; break; - case 'C': - is_connect = 1; - break; case 's': - tps = atoi(optarg); - if (tps <= 0) + scale = atoi(optarg); + if (scale <= 0) { - fprintf(stderr, "wrong scaling factor: %d\n", tps); + fprintf(stderr, "invalid scaling factor: %d\n", scale); exit(1); } break; @@ -706,15 +1563,62 @@ main(int argc, char **argv) nxacts = atoi(optarg); if (nxacts <= 0) { - fprintf(stderr, "wrong number of transactions: %d\n", nxacts); + fprintf(stderr, "invalid number of transactions: %d\n", nxacts); exit(1); } break; case 'U': login = optarg; break; - case 'P': - pwd = optarg; + case 'l': + use_log = true; + break; + case 'f': + ttype = 3; + filename = optarg; + if (process_file(filename) == false || *sql_files[num_files - 1] == NULL) + exit(1); + break; + case 'D': + { + char *p; + + if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0') + { + fprintf(stderr, "invalid variable definition: %s\n", optarg); + exit(1); + } + + *p++ = '\0'; + if (putVariable(&state[0], optarg, p) == false) + { + fprintf(stderr, "Couldn't allocate memory for variable\n"); + exit(1); + } + } + break; + case 'F': + fillfactor = atoi(optarg); + if ((fillfactor < 10) || (fillfactor > 100)) + { + fprintf(stderr, "invalid fillfactor: %d\n", fillfactor); + exit(1); + } + break; + case 'M': + if (num_files > 0) + { + fprintf(stderr, "querymode(-M) should be specifiled before transaction scripts(-f)\n"); + exit(1); + } + for (querymode = 0; querymode < NUM_QUERYMODE; querymode++) + if (strcmp(optarg, QUERYMODE[querymode]) == 0) + break; + if (querymode >= NUM_QUERYMODE) + { + fprintf(stderr, "invalid querymode(-M): %s\n", optarg); + exit(1); + } break; default: usage(); @@ -727,19 +1631,80 @@ main(int argc, char **argv) dbName = argv[optind]; else { - dbName = getenv("USER"); - if (dbName == NULL) + if ((env = getenv("PGDATABASE")) != NULL && *env != '\0') + dbName = env; + else if (login != NULL && *login != '\0') + dbName = login; + else dbName = ""; } if (is_init_mode) { - init(); + init(); exit(0); } remains = nclients; + if (getVariable(&state[0], "scale") == NULL) + { + snprintf(val, sizeof(val), "%d", scale); + if (putVariable(&state[0], "scale", val) == false) + { + fprintf(stderr, "Couldn't allocate memory for variable\n"); + exit(1); + } + } + + if (nclients > 1) + { + state = (CState *) realloc(state, sizeof(CState) * nclients); + if (state == NULL) + { + fprintf(stderr, "Couldn't allocate memory for state\n"); + exit(1); + } + + memset(state + 1, 0, sizeof(*state) * (nclients - 1)); + + snprintf(val, sizeof(val), "%d", scale); + + for (i = 1; i < nclients; i++) + { + int j; + + for (j = 0; j < state[0].nvariables; j++) + { + if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false) + { + fprintf(stderr, "Couldn't allocate memory for variable\n"); + exit(1); + } + } + + if (putVariable(&state[i], "scale", val) == false) + { + fprintf(stderr, "Couldn't allocate memory for variable\n"); + exit(1); + } + } + } + + if (use_log) + { + char logpath[64]; + + snprintf(logpath, 64, "pgbench_log.%d", (int) getpid()); + LOGFILE = fopen(logpath, "w"); + + if (LOGFILE == NULL) + { + fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno)); + exit(1); + } + } + if (debug) { printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n", @@ -749,7 +1714,7 @@ main(int argc, char **argv) /* opening connection... */ con = doConnect(); if (con == NULL) - exit(1); + exit(1); if (PQstatus(con) == CONNECTION_BAD) { @@ -758,102 +1723,126 @@ main(int argc, char **argv) exit(1); } - /* - * get the scaling factor that should be same as count(*) from - * branches... - */ - res = PQexec(con, "select count(*) from branches"); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); - } - tps = atoi(PQgetvalue(res, 0, 0)); - if (tps < 0) - { - fprintf(stderr, "count(*) from branches invalid (%d)\n", tps); - exit(1); - } - PQclear(res); - - if (!is_no_vacuum) + if (ttype != 3) { - fprintf(stderr, "starting vacuum..."); - res = PQexec(con, "vacuum branches"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + /* + * get the scaling factor that should be same as count(*) from + * branches if this is not a custom query + */ + res = PQexec(con, "select count(*) from branches"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } - PQclear(res); - - res = PQexec(con, "vacuum tellers"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + scale = atoi(PQgetvalue(res, 0, 0)); + if (scale < 0) { - fprintf(stderr, "%s", PQerrorMessage(con)); + fprintf(stderr, "count(*) from branches invalid (%d)\n", scale); exit(1); } PQclear(res); - res = PQexec(con, "delete from history"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + snprintf(val, sizeof(val), "%d", scale); + if (putVariable(&state[0], "scale", val) == false) { - fprintf(stderr, "%s", PQerrorMessage(con)); + fprintf(stderr, "Couldn't allocate memory for variable\n"); exit(1); } - PQclear(res); - res = PQexec(con, "vacuum history"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + + if (nclients > 1) { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); + for (i = 1; i < nclients; i++) + { + if (putVariable(&state[i], "scale", val) == false) + { + fprintf(stderr, "Couldn't allocate memory for variable\n"); + exit(1); + } + } } - PQclear(res); + } + if (!is_no_vacuum) + { + fprintf(stderr, "starting vacuum..."); + executeStatement(con, "vacuum branches"); + executeStatement(con, "vacuum tellers"); + executeStatement(con, "delete from history"); + executeStatement(con, "vacuum history"); fprintf(stderr, "end.\n"); - if (is_full_vacuum) + if (do_vacuum_accounts) { - fprintf(stderr, "starting full vacuum..."); - res = PQexec(con, "vacuum analyze accounts"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, "%s", PQerrorMessage(con)); - exit(1); - } - PQclear(res); + fprintf(stderr, "starting vacuum accounts..."); + executeStatement(con, "vacuum analyze accounts"); fprintf(stderr, "end.\n"); } } PQfinish(con); /* set random seed */ - gettimeofday(&tv1, 0); - srand((uint) tv1.tv_usec); + gettimeofday(&start_time, NULL); + srandom((unsigned int) start_time.tv_usec); /* get start up time */ - gettimeofday(&tv1, 0); + gettimeofday(&start_time, NULL); if (is_connect == 0) { - /* make connections to the database */ - for (i = 0; i < nclients; i++) - { - if ((state[i].con = doConnect()) == NULL) - exit(1); - } + struct timeval t, now; + + /* make connections to the database */ + for (i = 0; i < nclients; i++) + { + state[i].id = i; + if ((state[i].con = doConnect()) == NULL) + exit(1); + } + /* time after connections set up */ + gettimeofday(&now, NULL); + diffTime(&now, &start_time, &t); + addTime(&conn_total_time, &t, &conn_total_time); } - /* time after connections set up */ - gettimeofday(&tv2, 0); + /* process bultin SQL scripts */ + switch (ttype) + { + case 0: + sql_files[0] = process_builtin(tpc_b); + num_files = 1; + break; + + case 1: + sql_files[0] = process_builtin(select_only); + num_files = 1; + break; + + case 2: + sql_files[0] = process_builtin(simple_update); + num_files = 1; + break; + + default: + break; + } - /* send start up quries in async manner */ + /* send start up queries in async manner */ for (i = 0; i < nclients; i++) { - if (ttype == 0) - doOne(state, i, debug); - else if (ttype == 1) - doSelectOnly(state, i, debug); + Command **commands = sql_files[state[i].use_file]; + int prev_ecnt = state[i].ecnt; + + state[i].use_file = getrand(0, num_files - 1); + doCustom(state, i, debug); + + if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) + { + fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state); + remains--; /* I've aborted */ + PQfinish(state[i].con); + state[i].con = NULL; + } } for (;;) @@ -862,23 +1851,48 @@ main(int argc, char **argv) { /* all done ? */ disconnect_all(state); /* get end time */ - gettimeofday(&tv3, 0); - printResults(ttype, state, &tv1, &tv2, &tv3); + gettimeofday(&end_time, NULL); + printResults(ttype, state, &start_time, &end_time); + if (LOGFILE) + fclose(LOGFILE); exit(0); } FD_ZERO(&input_mask); - maxsock = 0; + maxsock = -1; + min_usec = -1; for (i = 0; i < nclients; i++) { - if (state[i].con) + Command **commands = sql_files[state[i].use_file]; + + if (state[i].sleeping) + { + int this_usec; + int sock = PQsocket(state[i].con); + + if (min_usec < 0) + { + gettimeofday(&now, NULL); + min_usec = 0; + } + + this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 + + state[i].until.tv_usec - now.tv_usec; + + if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec)) + min_usec = this_usec; + + FD_SET(sock, &input_mask); + if (maxsock < sock) + maxsock = sock; + } + else if (state[i].con && commands[state[i].state]->type != META_COMMAND) { int sock = PQsocket(state[i].con); if (sock < 0) { - fprintf(stderr, "Client %d: PQsock failed\n", i); disconnect_all(state); exit(1); } @@ -888,36 +1902,60 @@ main(int argc, char **argv) } } - if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, - (fd_set *) NULL, (struct timeval *) NULL)) < 0) + if (maxsock != -1) { - if (errno == EINTR) - continue; - /* must be something wrong */ - disconnect_all(state); - fprintf(stderr, "select failed: %s\n", strerror(errno)); - exit(1); - } - else if (nsocks == 0) - { /* timeout */ - fprintf(stderr, "select timeout\n"); - for (i = 0; i < nclients; i++) + if (min_usec >= 0) { - fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", - i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); + timeout.tv_sec = min_usec / 1000000; + timeout.tv_usec = min_usec % 1000000; + + nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, + (fd_set *) NULL, &timeout); } - exit(0); + else + nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, + (fd_set *) NULL, (struct timeval *) NULL); + if (nsocks < 0) + { + if (errno == EINTR) + continue; + /* must be something wrong */ + disconnect_all(state); + fprintf(stderr, "select failed: %s\n", strerror(errno)); + exit(1); + } +#ifdef NOT_USED + else if (nsocks == 0) + { /* timeout */ + fprintf(stderr, "select timeout\n"); + for (i = 0; i < nclients; i++) + { + fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", + i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); + } + exit(0); + } +#endif } /* ok, backend returns reply */ for (i = 0; i < nclients; i++) { - if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask)) + Command **commands = sql_files[state[i].use_file]; + int prev_ecnt = state[i].ecnt; + + if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask) + || commands[state[i].state]->type == META_COMMAND)) { - if (ttype == 0) - doOne(state, i, debug); - else if (ttype == 1) - doSelectOnly(state, i, debug); + doCustom(state, i, debug); + } + + if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) + { + fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state); + remains--; /* I've aborted */ + PQfinish(state[i].con); + state[i].con = NULL; } } }