* A simple benchmark program for PostgreSQL
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
- * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.78 2008/03/19 00:29:35 ishii Exp $
- * Copyright (c) 2000-2008, PostgreSQL Global Development Group
+ * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.99 2010/07/06 19:18:55 momjian Exp $
+ * Copyright (c) 2000-2010, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
+
+#ifdef WIN32
+#define FD_SETSIZE 1024 /* set before winsock2.h is included */
+#endif /* ! WIN32 */
+
#include "postgres_fe.h"
#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "portability/instr_time.h"
#include <ctype.h>
-#ifdef WIN32
-#undef FD_SETSIZE
-#define FD_SETSIZE 1024
-#include <win32.h>
-#else
+#ifndef WIN32
#include <sys/time.h>
#include <unistd.h>
#endif /* ! WIN32 */
#include <sys/resource.h> /* for getrlimit */
#endif
+#ifndef INT64_MAX
+#define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
+#endif
+
+/*
+ * Multi-platform pthread implementations
+ */
+
+#ifdef WIN32
+/* Use native win32 threads on Windows */
+typedef struct win32_pthread *pthread_t;
+typedef int pthread_attr_t;
+
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+#elif defined(ENABLE_THREAD_SAFETY)
+/* Use platform-dependent pthread capability */
+#include <pthread.h>
+#else
+/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
+
+#include <sys/wait.h>
+
+#define pthread_t pg_pthread_t
+#define pthread_attr_t pg_pthread_attr_t
+#define pthread_create pg_pthread_create
+#define pthread_join pg_pthread_join
+
+typedef struct fork_pthread *pthread_t;
+typedef int pthread_attr_t;
+
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+#endif
+
extern char *optarg;
extern int optind;
#define MAXCLIENTS 1024
#endif
-int nclients = 1; /* default number of simulated clients */
-int nxacts = 10; /* default number of transactions per clients */
+#define DEFAULT_NXACTS 10 /* default nxacts */
+
+int nxacts = 0; /* number of transactions per client */
+int duration = 0; /* duration in seconds */
/*
- * scaling factor. for example, scale = 10 will make 1000000 tuples of
- * accounts table.
+ * scaling factor. for example, scale = 10 will make 1000000 tuples in
+ * pgbench_accounts table.
*/
int scale = 1;
* end of configurable parameters
*********************************************************************/
-#define nbranches 1
+#define nbranches 1 /* Makes little sense to change this. Change
+ * -s instead */
#define ntellers 10
#define naccounts 100000
-FILE *LOGFILE = NULL;
-
bool use_log; /* log transaction latencies to a file */
-
-int remains; /* number of remaining clients */
-
-int is_connect; /* establish connection for each transaction */
+bool is_connect; /* establish connection for each transaction */
+int main_pid; /* main process id used in log filename */
char *pghost = "";
char *pgport = "";
char *login = NULL;
char *dbName;
+volatile bool timer_exceeded = false; /* flag from signal handler */
+
/* variable definitions */
typedef struct
{
char *name; /* variable name */
char *value; /* its value */
-} Variable;
+} Variable;
+
+#define MAX_FILES 128 /* max number of SQL script files allowed */
+#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
/*
* structures used in custom query mode
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
- struct timeval until; /* napping until */
+ int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */
int nvariables;
- struct timeval txn_begin; /* used for measuring latencies */
+ instr_time txn_begin; /* used for measuring latencies */
int use_file; /* index in sql_files for this client */
-} CState;
+ bool prepared[MAX_FILES];
+} CState;
+
+/*
+ * Thread state and result
+ */
+typedef struct
+{
+ int tid; /* thread id */
+ pthread_t thread; /* thread handle */
+ CState *state; /* array of CState */
+ int nstate; /* length of state[] */
+ instr_time start_time; /* thread start time */
+} TState;
+
+#define INVALID_THREAD ((pthread_t) 0)
+
+typedef struct
+{
+ instr_time conn_time;
+ int xacts;
+} TResult;
/*
* queries read from files
#define META_COMMAND 2
#define MAX_ARGS 10
+typedef enum QueryMode
+{
+ QUERY_SIMPLE, /* simple query */
+ QUERY_EXTENDED, /* extended query */
+ QUERY_PREPARED, /* extended query with prepared statements */
+ NUM_QUERYMODE
+} QueryMode;
+
+static QueryMode querymode = QUERY_SIMPLE;
+static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
+
typedef struct
{
int type; /* command type (SQL_COMMAND or META_COMMAND) */
int argc; /* number of commands */
char *argv[MAX_ARGS]; /* command list */
-} Command;
-
-#define MAX_FILES 128 /* max number of SQL script files allowed */
+} Command;
-Command **sql_files[MAX_FILES]; /* SQL script files */
-int num_files; /* its number */
+static Command **sql_files[MAX_FILES]; /* SQL script files */
+static int num_files; /* number of script files */
+static int debug = 0; /* debug flag */
/* default scenario */
static char *tpc_b = {
- "\\set nbranches :scale\n"
- "\\set ntellers 10 * :scale\n"
- "\\set naccounts 100000 * :scale\n"
+ "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
+ "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
+ "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
"\\setrandom aid 1 :naccounts\n"
"\\setrandom bid 1 :nbranches\n"
"\\setrandom tid 1 :ntellers\n"
"\\setrandom delta -5000 5000\n"
"BEGIN;\n"
- "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
- "SELECT abalance FROM accounts WHERE aid = :aid;\n"
- "UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
- "UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
- "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
+ "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
+ "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
+ "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
+ "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
+ "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
};
/* -N case */
static char *simple_update = {
- "\\set nbranches :scale\n"
- "\\set ntellers 10 * :scale\n"
- "\\set naccounts 100000 * :scale\n"
+ "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
+ "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
+ "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
"\\setrandom aid 1 :naccounts\n"
"\\setrandom bid 1 :nbranches\n"
"\\setrandom tid 1 :ntellers\n"
"\\setrandom delta -5000 5000\n"
"BEGIN;\n"
- "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
- "SELECT abalance FROM accounts WHERE aid = :aid;\n"
- "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
+ "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
+ "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
+ "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
};
/* -S case */
static char *select_only = {
- "\\set naccounts 100000 * :scale\n"
+ "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
"\\setrandom aid 1 :naccounts\n"
- "SELECT abalance FROM accounts WHERE aid = :aid;\n"
+ "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
};
-/* Connection overhead time */
-static struct timeval conn_total_time = {0, 0};
-
-/* Calculate total time */
-static void
-addTime(struct timeval *t1, struct timeval *t2, struct timeval *result)
-{
- int sec = t1->tv_sec + t2->tv_sec;
- int usec = t1->tv_usec + t2->tv_usec;
- if (usec >= 1000000)
- {
- usec -= 1000000;
- sec++;
- }
- result->tv_sec = sec;
- result->tv_usec = usec;
-}
-
-/* Calculate time difference */
-static void
-diffTime(struct timeval *t1, struct timeval *t2, struct timeval *result)
-{
- int sec = t1->tv_sec - t2->tv_sec;
- int usec = t1->tv_usec - t2->tv_usec;
- if (usec < 0)
- {
- usec += 1000000;
- sec--;
- }
- result->tv_sec = sec;
- result->tv_usec = usec;
-}
+/* Function prototypes */
+static void setalarm(int seconds);
+static void *threadRun(void *arg);
static void
-usage(void)
+usage(const char *progname)
{
- fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-D varname=value][-n][-C][-v][-S][-N][-f filename][-l][-U login][-d][dbname]\n");
- fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor] [-F fillfactor] [-U login][-d][dbname]\n");
+ printf("%s is a benchmarking tool for PostgreSQL.\n\n"
+ "Usage:\n"
+ " %s [OPTIONS]... [DBNAME]\n"
+ "\nInitialization options:\n"
+ " -i invokes initialization mode\n"
+ " -F NUM fill factor\n"
+ " -s NUM scaling factor\n"
+ "\nBenchmarking options:\n"
+ " -c NUM number of concurrent database clients (default: 1)\n"
+ " -C establish new connection for each transaction\n"
+ " -D VARNAME=VALUE\n"
+ " define variable for use by custom script\n"
+ " -f FILENAME read transaction script from FILENAME\n"
+ " -j NUM number of threads (default: 1)\n"
+ " -l write transaction times to log file\n"
+ " -M {simple|extended|prepared}\n"
+ " protocol for submitting queries to server (default: simple)\n"
+ " -n do not run VACUUM before tests\n"
+ " -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
+ " -s NUM report this scale factor in output\n"
+ " -S perform SELECT-only transactions\n"
+ " -t NUM number of transactions each client runs (default: 10)\n"
+ " -T NUM duration of benchmark test in seconds\n"
+ " -v vacuum all four standard tables before tests\n"
+ "\nCommon options:\n"
+ " -d print debugging output\n"
+ " -h HOSTNAME database server host or socket directory\n"
+ " -p PORT database server port number\n"
+ " -U USERNAME connect as specified database user\n"
+ " --help show this help, then exit\n"
+ " --version output version information, then exit\n"
+ "\n"
+ "Report bugs to <pgsql-bugs@postgresql.org>.\n",
+ progname, progname);
}
/* random number generator: uniform distribution from min to max inclusive */
if (PQstatus(conn) == CONNECTION_BAD &&
PQconnectionNeedsPassword(conn) &&
- password == NULL &&
- !feof(stdin))
+ password == NULL)
{
PQfinish(conn);
password = simple_prompt("Password: ", 100, false);
return NULL;
}
- executeStatement(conn, "SET search_path = public");
-
return conn;
}
/* throw away response from backend */
static void
-discard_response(CState * state)
+discard_response(CState *state)
{
PGresult *res;
} while (res);
}
-/* check to see if the SQL result was good */
-static int
-check(CState * state, PGresult *res, int n)
-{
- CState *st = &state[n];
-
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- /* OK */
- break;
- default:
- fprintf(stderr, "Client %d aborted in state %d: %s",
- n, st->state, PQerrorMessage(st->con));
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return (-1);
- }
- return (0); /* OK */
-}
-
static int
compareVariables(const void *v1, const void *v2)
{
}
static char *
-getVariable(CState * st, char *name)
+getVariable(CState *st, char *name)
{
Variable key,
*var;
return NULL;
}
+/* check whether the name consists of alphabets, numerals and underscores. */
+static bool
+isLegalVariableName(const char *name)
+{
+ int i;
+
+ for (i = 0; name[i] != '\0'; i++)
+ {
+ if (!isalnum((unsigned char) name[i]) && name[i] != '_')
+ return false;
+ }
+
+ return true;
+}
+
static int
-putVariable(CState * st, char *name, char *value)
+putVariable(CState *st, const char *context, char *name, char *value)
{
Variable key,
*var;
{
Variable *newvars;
+ /*
+ * Check for the name only when declaring a new variable to avoid
+ * overhead.
+ */
+ if (!isLegalVariableName(name))
+ {
+ fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
+ return false;
+ }
+
if (st->variables)
newvars = (Variable *) realloc(st->variables,
(st->nvariables + 1) * sizeof(Variable));
newvars = (Variable *) malloc(sizeof(Variable));
if (newvars == NULL)
- return false;
+ goto out_of_memory;
st->variables = newvars;
var->name = NULL;
var->value = NULL;
- if ((var->name = strdup(name)) == NULL
- || (var->value = strdup(value)) == NULL)
+ if ((var->name = strdup(name)) == NULL ||
+ (var->value = strdup(value)) == NULL)
{
free(var->name);
free(var->value);
}
return true;
+
+out_of_memory:
+ fprintf(stderr, "%s: out of memory for variable '%s'\n", context, name);
+ return false;
}
static char *
-assignVariables(CState * st, char *sql)
+parseVariable(const char *sql, int *eaten)
+{
+ int i = 0;
+ char *name;
+
+ do
+ {
+ i++;
+ } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
+ if (i == 1)
+ return NULL;
+
+ name = malloc(i);
+ if (name == NULL)
+ return NULL;
+ memcpy(name, &sql[1], i - 1);
+ name[i - 1] = '\0';
+
+ *eaten = i;
+ return name;
+}
+
+static char *
+replaceVariable(char **sql, char *param, int len, char *value)
+{
+ int valueln = strlen(value);
+
+ if (valueln > len)
+ {
+ char *tmp;
+ size_t offset = param - *sql;
+
+ tmp = realloc(*sql, strlen(*sql) - len + valueln + 1);
+ if (tmp == NULL)
+ {
+ free(*sql);
+ return NULL;
+ }
+ *sql = tmp;
+ param = *sql + offset;
+ }
+
+ if (valueln != len)
+ memmove(param + valueln, param + len, strlen(param + len) + 1);
+ strncpy(param, value, valueln);
+
+ return param + valueln;
+}
+
+static char *
+assignVariables(CState *st, char *sql)
{
- int i,
- j;
char *p,
*name,
*val;
- void *tmp;
- i = 0;
- while ((p = strchr(&sql[i], ':')) != NULL)
+ p = sql;
+ while ((p = strchr(p, ':')) != NULL)
{
- i = j = p - sql;
- do
+ int eaten;
+
+ name = parseVariable(p, &eaten);
+ if (name == NULL)
{
- i++;
- } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
- if (i == j + 1)
+ while (*p == ':')
+ {
+ p++;
+ }
continue;
+ }
- name = malloc(i - j);
- if (name == NULL)
- return NULL;
- memcpy(name, &sql[j + 1], i - (j + 1));
- name[i - (j + 1)] = '\0';
val = getVariable(st, name);
free(name);
if (val == NULL)
+ {
+ p++;
continue;
+ }
+
+ if ((p = replaceVariable(&sql, p, eaten, val)) == NULL)
+ return NULL;
+ }
+
+ return sql;
+}
+
+static void
+getQueryParams(CState *st, const Command *command, const char **params)
+{
+ int i;
- if (strlen(val) > i - j)
+ for (i = 0; i < command->argc - 1; i++)
+ params[i] = getVariable(st, command->argv[i + 1]);
+}
+
+/*
+ * Run a shell command. The result is assigned to the variable if not NULL.
+ * Return true if succeeded, or false on error.
+ */
+static bool
+runShellCommand(CState *st, char *variable, char **argv, int argc)
+{
+ char command[SHELL_COMMAND_SIZE];
+ int i,
+ len = 0;
+ FILE *fp;
+ char res[64];
+ char *endptr;
+ int retval;
+
+ /*
+ * Join arguments with whilespace separaters. Arguments starting with
+ * exactly one colon are treated as variables: name - append a string
+ * "name" :var - append a variable named 'var'. ::name - append a string
+ * ":name"
+ */
+ for (i = 0; i < argc; i++)
+ {
+ char *arg;
+ int arglen;
+
+ if (argv[i][0] != ':')
{
- tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
- if (tmp == NULL)
- {
- free(sql);
- return NULL;
- }
- sql = tmp;
+ arg = argv[i]; /* a string literal */
+ }
+ else if (argv[i][1] == ':')
+ {
+ arg = argv[i] + 1; /* a string literal starting with colons */
+ }
+ else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
+ {
+ fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
+ return false;
+ }
+
+ arglen = strlen(arg);
+ if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
+ {
+ fprintf(stderr, "%s: too long shell command\n", argv[0]);
+ return false;
}
- if (strlen(val) != i - j)
- memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
+ if (i > 0)
+ command[len++] = ' ';
+ memcpy(command + len, arg, arglen);
+ len += arglen;
+ }
- strncpy(&sql[j], val, strlen(val));
+ command[len] = '\0';
- if (strlen(val) < i - j)
+ /* Fast path for non-assignment case */
+ if (variable == NULL)
+ {
+ if (system(command))
{
- tmp = realloc(sql, strlen(sql) + 1);
- if (tmp == NULL)
- {
- free(sql);
- return NULL;
- }
- sql = tmp;
+ if (!timer_exceeded)
+ fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
+ return false;
}
+ return true;
+ }
+
+ /* Execute the command with pipe and read the standard output. */
+ if ((fp = popen(command, "r")) == NULL)
+ {
+ fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
+ return false;
+ }
+ if (fgets(res, sizeof(res), fp) == NULL)
+ {
+ if (!timer_exceeded)
+ fprintf(stderr, "%s: cannot read the result\n", argv[0]);
+ return false;
+ }
+ if (pclose(fp) < 0)
+ {
+ fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
+ return false;
+ }
- i = j + strlen(val);
+ /* Check whether the result is an integer and assign it to the variable */
+ retval = (int) strtol(res, &endptr, 10);
+ while (*endptr != '\0' && isspace((unsigned char) *endptr))
+ endptr++;
+ if (*res == '\0' || *endptr != '\0')
+ {
+ fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
+ return false;
}
+ snprintf(res, sizeof(res), "%d", retval);
+ if (!putVariable(st, "setshell", variable, res))
+ return false;
- return sql;
+#ifdef DEBUG
+ printf("shell parameter name: %s, value: %s\n", argv[1], res);
+#endif
+ return true;
}
+#define MAX_PREPARE_NAME 32
static void
-doCustom(CState * state, int n, int debug)
+preparedStatementName(char *buffer, int file, int state)
+{
+ sprintf(buffer, "P%d_%d", file, state);
+}
+
+static bool
+clientDone(CState *st, bool ok)
+{
+ (void) ok; /* unused */
+
+ if (st->con != NULL)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ return false; /* always false */
+}
+
+/* return false iff client should be disconnected */
+static bool
+doCustom(CState *st, instr_time *conn_time, FILE *logfile)
{
PGresult *res;
- CState *st = &state[n];
Command **commands;
top:
if (st->sleeping)
{ /* are we sleeping? */
- int usec;
- struct timeval now;
+ instr_time now;
- gettimeofday(&now, NULL);
- usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
- st->until.tv_usec - now.tv_usec;
- if (usec <= 0)
+ INSTR_TIME_SET_CURRENT(now);
+ if (st->until <= INSTR_TIME_GET_MICROSEC(now))
st->sleeping = 0; /* Done sleeping, go ahead with next command */
else
- return; /* Still sleeping, nothing to do here */
+ return true; /* Still sleeping, nothing to do here */
}
if (st->listen)
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
- fprintf(stderr, "client %d receiving\n", n);
+ fprintf(stderr, "client %d receiving\n", st->id);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
- fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return;
+ fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
+ return clientDone(st, false);
}
if (PQisBusy(st->con))
- return; /* don't have the whole result yet */
+ return true; /* don't have the whole result yet */
}
/*
* transaction finished: record the time it took in the log
*/
- if (use_log && commands[st->state + 1] == NULL)
+ if (logfile && commands[st->state + 1] == NULL)
{
- double diff;
- struct timeval now;
-
- gettimeofday(&now, NULL);
- diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
- (int) (now.tv_usec - st->txn_begin.tv_usec);
-
- fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
- st->id, st->cnt, diff, st->use_file,
+ instr_time now;
+ instr_time diff;
+ double usec;
+
+ INSTR_TIME_SET_CURRENT(now);
+ diff = now;
+ INSTR_TIME_SUBTRACT(diff, st->txn_begin);
+ usec = (double) INSTR_TIME_GET_MICROSEC(diff);
+
+#ifndef WIN32
+ /* This is more than we really ought to know about instr_time */
+ fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
+ st->id, st->cnt, usec, st->use_file,
(long) now.tv_sec, (long) now.tv_usec);
+#else
+ /* On Windows, instr_time doesn't provide a timestamp anyway */
+ fprintf(logfile, "%d %d %.0f %d 0 0\n",
+ st->id, st->cnt, usec, st->use_file);
+#endif
}
if (commands[st->state]->type == SQL_COMMAND)
{
res = PQgetResult(st->con);
- if (check(state, res, n))
+ switch (PQresultStatus(res))
{
- PQclear(res);
- return;
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ break; /* OK */
+ default:
+ fprintf(stderr, "Client %d aborted in state %d: %s",
+ st->id, st->state, PQerrorMessage(st->con));
+ PQclear(res);
+ return clientDone(st, false);
}
PQclear(res);
discard_response(st);
st->con = NULL;
}
- if (++st->cnt >= nxacts)
- {
- remains--; /* I've done */
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
- return;
- }
+ ++st->cnt;
+ if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+ return clientDone(st, true); /* exit success */
}
/* increment state counter */
if (st->con == NULL)
{
- struct timeval t1, t2, t3;
+ instr_time start,
+ end;
- gettimeofday(&t1, NULL);
+ INSTR_TIME_SET_CURRENT(start);
if ((st->con = doConnect()) == NULL)
{
- fprintf(stderr, "Client %d aborted in establishing connection.\n",
- n);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return;
+ fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
+ return clientDone(st, false);
}
- gettimeofday(&t2, NULL);
- diffTime(&t2, &t1, &t3);
- addTime(&conn_total_time, &t3, &conn_total_time);
+ INSTR_TIME_SET_CURRENT(end);
+ INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
- if (use_log && st->state == 0)
- gettimeofday(&(st->txn_begin), NULL);
+ if (logfile && st->state == 0)
+ INSTR_TIME_SET_CURRENT(st->txn_begin);
if (commands[st->state]->type == SQL_COMMAND)
{
- char *sql;
+ const Command *command = commands[st->state];
+ int r;
- if ((sql = strdup(commands[st->state]->argv[0])) == NULL
- || (sql = assignVariables(st, sql)) == NULL)
+ if (querymode == QUERY_SIMPLE)
{
- fprintf(stderr, "out of memory\n");
- st->ecnt++;
- return;
+ char *sql;
+
+ if ((sql = strdup(command->argv[0])) == NULL
+ || (sql = assignVariables(st, sql)) == NULL)
+ {
+ fprintf(stderr, "out of memory\n");
+ st->ecnt++;
+ return true;
+ }
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQuery(st->con, sql);
+ free(sql);
}
+ else if (querymode == QUERY_EXTENDED)
+ {
+ const char *sql = command->argv[0];
+ const char *params[MAX_ARGS];
- if (debug)
- fprintf(stderr, "client %d sending %s\n", n, sql);
- if (PQsendQuery(st->con, sql) == 0)
+ getQueryParams(st, command, params);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQueryParams(st->con, sql, command->argc - 1,
+ NULL, params, NULL, NULL, 0);
+ }
+ else if (querymode == QUERY_PREPARED)
+ {
+ char name[MAX_PREPARE_NAME];
+ const char *params[MAX_ARGS];
+
+ if (!st->prepared[st->use_file])
+ {
+ int j;
+
+ for (j = 0; commands[j] != NULL; j++)
+ {
+ PGresult *res;
+ char name[MAX_PREPARE_NAME];
+
+ if (commands[j]->type != SQL_COMMAND)
+ continue;
+ preparedStatementName(name, st->use_file, j);
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ fprintf(stderr, "%s", PQerrorMessage(st->con));
+ PQclear(res);
+ }
+ st->prepared[st->use_file] = true;
+ }
+
+ getQueryParams(st, command, params);
+ preparedStatementName(name, st->use_file, st->state);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, name);
+ r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+ params, NULL, NULL, 0);
+ }
+ else /* unknown sql mode */
+ r = 0;
+
+ if (r == 0)
{
if (debug)
- fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
+ fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
st->ecnt++;
}
else
- {
st->listen = 1; /* flags that should be listened */
- }
- free(sql);
}
else if (commands[st->state]->type == META_COMMAND)
{
if (debug)
{
- fprintf(stderr, "client %d executing \\%s", n, argv[0]);
+ fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++;
- return;
+ return true;
}
min = atoi(var);
}
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
st->ecnt++;
- return;
+ return true;
}
max = atoi(var);
}
{
fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
st->ecnt++;
- return;
+ return true;
}
#ifdef DEBUG
#endif
snprintf(res, sizeof(res), "%d", getrand(min, max));
- if (putVariable(st, argv[1], res) == false)
+ if (!putVariable(st, argv[0], argv[1], res))
{
- fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
- return;
+ return true;
}
st->listen = 1;
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
st->ecnt++;
- return;
+ return true;
}
ope1 = atoi(var);
}
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
st->ecnt++;
- return;
+ return true;
}
ope2 = atoi(var);
}
{
fprintf(stderr, "%s: division by zero\n", argv[0]);
st->ecnt++;
- return;
+ return true;
}
snprintf(res, sizeof(res), "%d", ope1 / ope2);
}
{
fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
st->ecnt++;
- return;
+ return true;
}
}
- if (putVariable(st, argv[1], res) == false)
+ if (!putVariable(st, argv[0], argv[1], res))
{
- fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
- return;
+ return true;
}
st->listen = 1;
{
char *var;
int usec;
- struct timeval now;
+ instr_time now;
if (*argv[1] == ':')
{
{
fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
st->ecnt++;
- return;
+ return true;
}
usec = atoi(var);
}
else
usec *= 1000000;
- gettimeofday(&now, NULL);
- st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000;
- st->until.tv_usec = (now.tv_usec + usec) % 1000000;
+ INSTR_TIME_SET_CURRENT(now);
+ st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->sleeping = 1;
st->listen = 1;
}
+ else if (pg_strcasecmp(argv[0], "setshell") == 0)
+ {
+ bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+
+ if (timer_exceeded) /* timeout */
+ return clientDone(st, true);
+ else if (!ret) /* on error */
+ {
+ st->ecnt++;
+ return true;
+ }
+ else /* succeeded */
+ st->listen = 1;
+ }
+ else if (pg_strcasecmp(argv[0], "shell") == 0)
+ {
+ bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+ if (timer_exceeded) /* timeout */
+ return clientDone(st, true);
+ else if (!ret) /* on error */
+ {
+ st->ecnt++;
+ return true;
+ }
+ else /* succeeded */
+ st->listen = 1;
+ }
goto top;
}
+
+ return true;
}
/* discard connections */
static void
-disconnect_all(CState * state)
+disconnect_all(CState *state, int length)
{
int i;
- for (i = 0; i < nclients; i++)
+ for (i = 0; i < length; i++)
{
if (state[i].con)
+ {
PQfinish(state[i].con);
+ state[i].con = NULL;
+ }
}
}
static void
init(void)
{
- PGconn *con;
- PGresult *res;
/*
* Note: TPC-B requires at least 100 bytes per row, and the "filler"
* fields in these table declarations were intended to comply with that.
- * But because they default to NULLs, they don't actually take any
- * space. We could fix that by giving them non-null default values.
- * However, that would completely break comparability of pgbench
- * results with prior versions. Since pgbench has never pretended
- * to be fully TPC-B compliant anyway, we stick with the historical
- * behavior.
+ * But because they default to NULLs, they don't actually take any space.
+ * We could fix that by giving them non-null default values. However, that
+ * would completely break comparability of pgbench results with prior
+ * versions. Since pgbench has never pretended to be fully TPC-B
+ * compliant anyway, we stick with the historical behavior.
*/
static char *DDLs[] = {
- "drop table if exists branches",
- "create table branches(bid int not null,bbalance int,filler char(88)) with (fillfactor=%d)",
- "drop table if exists tellers",
- "create table tellers(tid int not null,bid int,tbalance int,filler char(84)) with (fillfactor=%d)",
- "drop table if exists accounts",
- "create table accounts(aid int not null,bid int,abalance int,filler char(84)) with (fillfactor=%d)",
- "drop table if exists history",
- "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
+ "drop table if exists pgbench_branches",
+ "create table pgbench_branches(bid int not null,bbalance int,filler char(88)) with (fillfactor=%d)",
+ "drop table if exists pgbench_tellers",
+ "create table pgbench_tellers(tid int not null,bid int,tbalance int,filler char(84)) with (fillfactor=%d)",
+ "drop table if exists pgbench_accounts",
+ "create table pgbench_accounts(aid int not null,bid int,abalance int,filler char(84)) with (fillfactor=%d)",
+ "drop table if exists pgbench_history",
+ "create table pgbench_history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"
+ };
static char *DDLAFTERs[] = {
- "alter table branches add primary key (bid)",
- "alter table tellers add primary key (tid)",
- "alter table accounts add primary key (aid)"};
-
+ "alter table pgbench_branches add primary key (bid)",
+ "alter table pgbench_tellers add primary key (tid)",
+ "alter table pgbench_accounts add primary key (aid)"
+ };
+ PGconn *con;
+ PGresult *res;
char sql[256];
-
int i;
if ((con = doConnect()) == NULL)
/*
* set fillfactor for branches, tellers and accounts tables
*/
- if ((strstr(DDLs[i], "create table branches") == DDLs[i]) ||
- (strstr(DDLs[i], "create table tellers") == DDLs[i]) ||
- (strstr(DDLs[i], "create table accounts") == DDLs[i]))
+ if ((strstr(DDLs[i], "create table pgbench_branches") == DDLs[i]) ||
+ (strstr(DDLs[i], "create table pgbench_tellers") == DDLs[i]) ||
+ (strstr(DDLs[i], "create table pgbench_accounts") == DDLs[i]))
{
char ddl_stmt[128];
for (i = 0; i < nbranches * scale; i++)
{
- snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
+ snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
executeStatement(con, sql);
}
for (i = 0; i < ntellers * scale; i++)
{
- snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
- ,i + 1, i / ntellers + 1);
+ snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
+ i + 1, i / ntellers + 1);
executeStatement(con, sql);
}
executeStatement(con, "commit");
/*
- * fill the accounts table with some data
+ * fill the pgbench_accounts table with some data
*/
fprintf(stderr, "creating tables...\n");
executeStatement(con, "begin");
- executeStatement(con, "truncate accounts");
+ executeStatement(con, "truncate pgbench_accounts");
- res = PQexec(con, "copy accounts from stdin");
+ res = PQexec(con, "copy pgbench_accounts from stdin");
if (PQresultStatus(res) != PGRES_COPY_IN)
{
fprintf(stderr, "%s", PQerrorMessage(con));
/* vacuum */
fprintf(stderr, "vacuum...");
- executeStatement(con, "vacuum analyze");
+ executeStatement(con, "vacuum analyze pgbench_branches");
+ executeStatement(con, "vacuum analyze pgbench_tellers");
+ executeStatement(con, "vacuum analyze pgbench_accounts");
+ executeStatement(con, "vacuum analyze pgbench_history");
fprintf(stderr, "done.\n");
PQfinish(con);
}
-static Command *
-process_commands(char *buf)
+/*
+ * Parse the raw sql and replace :param to $n.
+ */
+static bool
+parseQuery(Command *cmd, const char *raw_sql)
+{
+ char *sql,
+ *p;
+
+ sql = strdup(raw_sql);
+ if (sql == NULL)
+ return false;
+ cmd->argc = 1;
+
+ p = sql;
+ while ((p = strchr(p, ':')) != NULL)
+ {
+ char var[12];
+ char *name;
+ int eaten;
+
+ name = parseVariable(p, &eaten);
+ if (name == NULL)
+ {
+ while (*p == ':')
+ {
+ p++;
+ }
+ continue;
+ }
+
+ if (cmd->argc >= MAX_ARGS)
+ {
+ fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
+ return false;
+ }
+
+ sprintf(var, "$%d", cmd->argc);
+ if ((p = replaceVariable(&sql, p, eaten, var)) == NULL)
+ return false;
+
+ cmd->argv[cmd->argc] = name;
+ cmd->argc++;
+ }
+
+ cmd->argv[0] = sql;
+ return true;
+}
+
+static Command *
+process_commands(char *buf)
{
const char delim[] = " \f\n\r\t\v";
return NULL;
}
+ /*
+ * Split argument into number and unit to allow "sleep 1ms" etc.
+ * We don't have to terminate the number argument with null
+ * because it will be parsed with atoi, which ignores trailing
+ * non-digit characters.
+ */
+ if (my_commands->argv[1][0] != ':')
+ {
+ char *c = my_commands->argv[1];
+
+ while (isdigit((unsigned char) *c))
+ c++;
+ if (*c)
+ {
+ my_commands->argv[2] = c;
+ if (my_commands->argc < 3)
+ my_commands->argc = 3;
+ }
+ }
+
if (my_commands->argc >= 3)
{
if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
my_commands->argv[0], my_commands->argv[j]);
}
+ else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
+ {
+ if (my_commands->argc < 3)
+ {
+ fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
+ return NULL;
+ }
+ }
+ else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
+ {
+ if (my_commands->argc < 1)
+ {
+ fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
+ return NULL;
+ }
+ }
else
{
fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
{
my_commands->type = SQL_COMMAND;
- if ((my_commands->argv[0] = strdup(p)) == NULL)
- return NULL;
-
- my_commands->argc++;
+ switch (querymode)
+ {
+ case QUERY_SIMPLE:
+ if ((my_commands->argv[0] = strdup(p)) == NULL)
+ return NULL;
+ my_commands->argc++;
+ break;
+ case QUERY_EXTENDED:
+ case QUERY_PREPARED:
+ if (!parseQuery(my_commands, p))
+ return NULL;
+ break;
+ default:
+ return NULL;
+ }
}
return my_commands;
/* print out results */
static void
-printResults(
- int ttype, CState * state,
- struct timeval * start_time, struct timeval * end_time)
+printResults(int ttype, int normal_xacts, int nclients, int nthreads,
+ instr_time total_time, instr_time conn_total_time)
{
- double t1,
- t2;
- int i;
- int normal_xacts = 0;
+ double time_include,
+ tps_include,
+ tps_exclude;
char *s;
- for (i = 0; i < nclients; i++)
- normal_xacts += state[i].cnt;
-
- t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec);
- t1 = normal_xacts * 1000000.0 / t1;
-
- t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
- (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
- t2 = normal_xacts * 1000000.0 / t2;
+ time_include = INSTR_TIME_GET_DOUBLE(total_time);
+ tps_include = normal_xacts / time_include;
+ tps_exclude = normal_xacts / (time_include -
+ (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
if (ttype == 0)
s = "TPC-B (sort of)";
else if (ttype == 2)
- s = "Update only accounts";
+ s = "Update only pgbench_accounts";
else if (ttype == 1)
s = "SELECT only";
else
printf("transaction type: %s\n", s);
printf("scaling factor: %d\n", scale);
+ printf("query mode: %s\n", QUERYMODE[querymode]);
printf("number of clients: %d\n", nclients);
- printf("number of transactions per client: %d\n", nxacts);
- printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
- printf("tps = %f (including connections establishing)\n", t1);
- printf("tps = %f (excluding connections establishing)\n", t2);
+ printf("number of threads: %d\n", nthreads);
+ if (duration <= 0)
+ {
+ printf("number of transactions per client: %d\n", nxacts);
+ printf("number of transactions actually processed: %d/%d\n",
+ normal_xacts, nxacts * nclients);
+ }
+ else
+ {
+ printf("duration: %d s\n", duration);
+ printf("number of transactions actually processed: %d\n",
+ normal_xacts);
+ }
+ printf("tps = %f (including connections establishing)\n", tps_include);
+ printf("tps = %f (excluding connections establishing)\n", tps_exclude);
}
main(int argc, char **argv)
{
int c;
+ int nclients = 1; /* default number of simulated clients */
+ int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
- int debug = 0; /* debug flag */
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
* 2: skip update of branches and tellers */
char *filename = NULL;
+ bool scale_given = false;
CState *state; /* status of clients */
+ TState *threads; /* array of thread */
- struct timeval start_time; /* start up time */
- struct timeval end_time; /* end time */
+ instr_time start_time; /* start up time */
+ instr_time total_time;
+ instr_time conn_total_time;
+ int total_xacts;
int i;
- fd_set input_mask;
- int nsocks; /* return from select(2) */
- int maxsock; /* max socket number to be waited */
- struct timeval now;
- struct timeval timeout;
- int min_usec;
-
#ifdef HAVE_GETRLIMIT
struct rlimit rlim;
#endif
char val[64];
+ const char *progname;
+
+ progname = get_progname(argv[0]);
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage(progname);
+ exit(0);
+ }
+ if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+ {
+ puts("pgbench (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
#ifdef WIN32
/* stderr is buffered on Win32. */
setvbuf(stderr, NULL, _IONBF, 0);
memset(state, 0, sizeof(*state));
- while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:CNSlf:D:F:")) != -1)
+ while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
{
switch (c)
{
}
#endif /* HAVE_GETRLIMIT */
break;
+ case 'j': /* jobs */
+ nthreads = atoi(optarg);
+ if (nthreads <= 0)
+ {
+ fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+ exit(1);
+ }
+ break;
case 'C':
- is_connect = 1;
+ is_connect = true;
break;
case 's':
+ scale_given = true;
scale = atoi(optarg);
if (scale <= 0)
{
}
break;
case 't':
+ if (duration > 0)
+ {
+ fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
+ exit(1);
+ }
nxacts = atoi(optarg);
if (nxacts <= 0)
{
exit(1);
}
break;
+ case 'T':
+ if (nxacts > 0)
+ {
+ fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
+ exit(1);
+ }
+ duration = atoi(optarg);
+ if (duration <= 0)
+ {
+ fprintf(stderr, "invalid duration: %d\n", duration);
+ exit(1);
+ }
+ break;
case 'U':
login = optarg;
break;
}
*p++ = '\0';
- if (putVariable(&state[0], optarg, p) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
+ if (!putVariable(&state[0], "option", optarg, p))
exit(1);
- }
}
break;
case 'F':
exit(1);
}
break;
+ case 'M':
+ if (num_files > 0)
+ {
+ fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
+ exit(1);
+ }
+ for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
+ if (strcmp(optarg, QUERYMODE[querymode]) == 0)
+ break;
+ if (querymode >= NUM_QUERYMODE)
+ {
+ fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
+ exit(1);
+ }
+ break;
default:
- usage();
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
break;
}
exit(0);
}
- remains = nclients;
+ /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
+ if (nxacts <= 0 && duration <= 0)
+ nxacts = DEFAULT_NXACTS;
- if (getVariable(&state[0], "scale") == NULL)
+ if (nclients % nthreads != 0)
{
- snprintf(val, sizeof(val), "%d", scale);
- if (putVariable(&state[0], "scale", val) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
- exit(1);
- }
+ fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
+ exit(1);
}
+ /*
+ * save main process id in the global variable because process id will be
+ * changed after fork.
+ */
+ main_pid = (int) getpid();
+
if (nclients > 1)
{
state = (CState *) realloc(state, sizeof(CState) * nclients);
memset(state + 1, 0, sizeof(*state) * (nclients - 1));
- snprintf(val, sizeof(val), "%d", scale);
-
+ /* copy any -D switch values to all clients */
for (i = 1; i < nclients; i++)
{
int j;
+ state[i].id = i;
for (j = 0; j < state[0].nvariables; j++)
{
- if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
+ if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
exit(1);
- }
- }
-
- if (putVariable(&state[i], "scale", val) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
- exit(1);
}
}
}
- if (use_log)
- {
- char logpath[64];
-
- snprintf(logpath, 64, "pgbench_log.%d", (int) getpid());
- LOGFILE = fopen(logpath, "w");
-
- if (LOGFILE == NULL)
- {
- fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
- exit(1);
- }
- }
-
if (debug)
{
- printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
- pghost, pgport, nclients, nxacts, dbName);
+ if (duration <= 0)
+ printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
+ pghost, pgport, nclients, nxacts, dbName);
+ else
+ printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
+ pghost, pgport, nclients, duration, dbName);
}
/* opening connection... */
{
/*
* get the scaling factor that should be same as count(*) from
- * branches if this is not a custom query
+ * pgbench_branches if this is not a custom query
*/
- res = PQexec(con, "select count(*) from branches");
+ res = PQexec(con, "select count(*) from pgbench_branches");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
scale = atoi(PQgetvalue(res, 0, 0));
if (scale < 0)
{
- fprintf(stderr, "count(*) from branches invalid (%d)\n", scale);
+ fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
exit(1);
}
PQclear(res);
- snprintf(val, sizeof(val), "%d", scale);
- if (putVariable(&state[0], "scale", val) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
- exit(1);
- }
+ /* warn if we override user-given -s switch */
+ if (scale_given)
+ fprintf(stderr,
+ "Scale option ignored, using pgbench_branches table count = %d\n",
+ scale);
+ }
- if (nclients > 1)
+ /*
+ * :scale variables normally get -s or database scale, but don't override
+ * an explicit -D switch
+ */
+ if (getVariable(&state[0], "scale") == NULL)
+ {
+ snprintf(val, sizeof(val), "%d", scale);
+ for (i = 0; i < nclients; i++)
{
- for (i = 1; i < nclients; i++)
- {
- if (putVariable(&state[i], "scale", val) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
- exit(1);
- }
- }
+ if (!putVariable(&state[i], "startup", "scale", val))
+ exit(1);
}
}
if (!is_no_vacuum)
{
fprintf(stderr, "starting vacuum...");
- executeStatement(con, "vacuum branches");
- executeStatement(con, "vacuum tellers");
- executeStatement(con, "delete from history");
- executeStatement(con, "vacuum history");
+ executeStatement(con, "vacuum pgbench_branches");
+ executeStatement(con, "vacuum pgbench_tellers");
+ executeStatement(con, "truncate pgbench_history");
fprintf(stderr, "end.\n");
if (do_vacuum_accounts)
{
- fprintf(stderr, "starting vacuum accounts...");
- executeStatement(con, "vacuum analyze accounts");
+ fprintf(stderr, "starting vacuum pgbench_accounts...");
+ executeStatement(con, "vacuum analyze pgbench_accounts");
fprintf(stderr, "end.\n");
}
}
PQfinish(con);
/* set random seed */
- gettimeofday(&start_time, NULL);
- srandom((unsigned int) start_time.tv_usec);
+ INSTR_TIME_SET_CURRENT(start_time);
+ srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
- /* get start up time */
- gettimeofday(&start_time, NULL);
-
- if (is_connect == 0)
- {
- struct timeval t, now;
-
- /* make connections to the database */
- for (i = 0; i < nclients; i++)
- {
- state[i].id = i;
- if ((state[i].con = doConnect()) == NULL)
- exit(1);
- }
- /* time after connections set up */
- gettimeofday(&now, NULL);
- diffTime(&now, &start_time, &t);
- addTime(&conn_total_time, &t, &conn_total_time);
- }
-
- /* process bultin SQL scripts */
+ /* process builtin SQL scripts */
switch (ttype)
{
case 0:
break;
}
+ /* get start up time */
+ INSTR_TIME_SET_CURRENT(start_time);
+
+ /* set alarm if duration is specified. */
+ if (duration > 0)
+ setalarm(duration);
+
+ /* start threads */
+ threads = (TState *) malloc(sizeof(TState) * nthreads);
+ for (i = 0; i < nthreads; i++)
+ {
+ threads[i].tid = i;
+ threads[i].state = &state[nclients / nthreads * i];
+ threads[i].nstate = nclients / nthreads;
+ INSTR_TIME_SET_CURRENT(threads[i].start_time);
+
+ /* the first thread (i = 0) is executed by main thread */
+ if (i > 0)
+ {
+ int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+
+ if (err != 0 || threads[i].thread == INVALID_THREAD)
+ {
+ fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+ exit(1);
+ }
+ }
+ else
+ {
+ threads[i].thread = INVALID_THREAD;
+ }
+ }
+
+ /* wait for threads and accumulate results */
+ total_xacts = 0;
+ INSTR_TIME_SET_ZERO(conn_total_time);
+ for (i = 0; i < nthreads; i++)
+ {
+ void *ret = NULL;
+
+ if (threads[i].thread == INVALID_THREAD)
+ ret = threadRun(&threads[i]);
+ else
+ pthread_join(threads[i].thread, &ret);
+
+ if (ret != NULL)
+ {
+ TResult *r = (TResult *) ret;
+
+ total_xacts += r->xacts;
+ INSTR_TIME_ADD(conn_total_time, r->conn_time);
+ free(ret);
+ }
+ }
+ disconnect_all(state, nclients);
+
+ /* get end time */
+ INSTR_TIME_SET_CURRENT(total_time);
+ INSTR_TIME_SUBTRACT(total_time, start_time);
+ printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
+
+ return 0;
+}
+
+static void *
+threadRun(void *arg)
+{
+ TState *thread = (TState *) arg;
+ CState *state = thread->state;
+ TResult *result;
+ FILE *logfile = NULL; /* per-thread log file */
+ instr_time start,
+ end;
+ int nstate = thread->nstate;
+ int remains = nstate; /* number of remaining clients */
+ int i;
+
+ result = malloc(sizeof(TResult));
+ INSTR_TIME_SET_ZERO(result->conn_time);
+
+ /* open log file if requested */
+ if (use_log)
+ {
+ char logpath[64];
+
+ if (thread->tid == 0)
+ snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
+ else
+ snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
+ logfile = fopen(logpath, "w");
+
+ if (logfile == NULL)
+ {
+ fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+ goto done;
+ }
+ }
+
+ if (!is_connect)
+ {
+ /* make connections to the database */
+ for (i = 0; i < nstate; i++)
+ {
+ if ((state[i].con = doConnect()) == NULL)
+ goto done;
+ }
+ }
+
+ /* time after thread and connections set up */
+ INSTR_TIME_SET_CURRENT(result->conn_time);
+ INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+
/* send start up queries in async manner */
- for (i = 0; i < nclients; i++)
+ for (i = 0; i < nstate; i++)
{
- Command **commands = sql_files[state[i].use_file];
- int prev_ecnt = state[i].ecnt;
+ CState *st = &state[i];
+ Command **commands = sql_files[st->use_file];
+ int prev_ecnt = st->ecnt;
- state[i].use_file = getrand(0, num_files - 1);
- doCustom(state, i, debug);
+ st->use_file = getrand(0, num_files - 1);
+ if (!doCustom(st, &result->conn_time, logfile))
+ remains--; /* I've aborted */
- if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
- fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
+ fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
- PQfinish(state[i].con);
- state[i].con = NULL;
+ PQfinish(st->con);
+ st->con = NULL;
}
}
- for (;;)
+ while (remains > 0)
{
- if (remains <= 0)
- { /* all done ? */
- disconnect_all(state);
- /* get end time */
- gettimeofday(&end_time, NULL);
- printResults(ttype, state, &start_time, &end_time);
- if (LOGFILE)
- fclose(LOGFILE);
- exit(0);
- }
+ fd_set input_mask;
+ int maxsock; /* max socket number to be waited */
+ int64 now_usec = 0;
+ int64 min_usec;
FD_ZERO(&input_mask);
maxsock = -1;
- min_usec = -1;
- for (i = 0; i < nclients; i++)
+ min_usec = INT64_MAX;
+ for (i = 0; i < nstate; i++)
{
- Command **commands = sql_files[state[i].use_file];
+ CState *st = &state[i];
+ Command **commands = sql_files[st->use_file];
+ int sock;
- if (state[i].sleeping)
+ if (st->sleeping)
{
int this_usec;
- int sock = PQsocket(state[i].con);
- if (min_usec < 0)
+ if (min_usec == INT64_MAX)
{
- gettimeofday(&now, NULL);
- min_usec = 0;
- }
+ instr_time now;
- this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
- state[i].until.tv_usec - now.tv_usec;
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
+ }
- if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
+ this_usec = st->until - now_usec;
+ if (min_usec > this_usec)
min_usec = this_usec;
-
- FD_SET(sock, &input_mask);
- if (maxsock < sock)
- maxsock = sock;
}
- else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
+ else if (st->con == NULL)
+ {
+ continue;
+ }
+ else if (commands[st->state]->type == META_COMMAND)
{
- int sock = PQsocket(state[i].con);
+ min_usec = 0; /* the connection is ready to run */
+ break;
+ }
- if (sock < 0)
- {
- disconnect_all(state);
- exit(1);
- }
- FD_SET(sock, &input_mask);
- if (maxsock < sock)
- maxsock = sock;
+ sock = PQsocket(st->con);
+ if (sock < 0)
+ {
+ fprintf(stderr, "bad socket: %s\n", strerror(errno));
+ goto done;
}
+
+ FD_SET(sock, &input_mask);
+
+ if (maxsock < sock)
+ maxsock = sock;
}
- if (maxsock != -1)
+ if (min_usec > 0 && maxsock != -1)
{
- if (min_usec >= 0)
+ int nsocks; /* return from select(2) */
+
+ if (min_usec != INT64_MAX)
{
+ struct timeval timeout;
+
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
-
- nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
- (fd_set *) NULL, &timeout);
+ nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
}
else
- nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
- (fd_set *) NULL, (struct timeval *) NULL);
+ nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
if (nsocks < 0)
{
if (errno == EINTR)
continue;
/* must be something wrong */
- disconnect_all(state);
fprintf(stderr, "select failed: %s\n", strerror(errno));
- exit(1);
- }
-#ifdef NOT_USED
- else if (nsocks == 0)
- { /* timeout */
- fprintf(stderr, "select timeout\n");
- for (i = 0; i < nclients; i++)
- {
- fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
- i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
- }
- exit(0);
+ goto done;
}
-#endif
}
/* ok, backend returns reply */
- for (i = 0; i < nclients; i++)
+ for (i = 0; i < nstate; i++)
{
- Command **commands = sql_files[state[i].use_file];
- int prev_ecnt = state[i].ecnt;
+ CState *st = &state[i];
+ Command **commands = sql_files[st->use_file];
+ int prev_ecnt = st->ecnt;
- if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
- || commands[state[i].state]->type == META_COMMAND))
+ if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
+ || commands[st->state]->type == META_COMMAND))
{
- doCustom(state, i, debug);
+ if (!doCustom(st, &result->conn_time, logfile))
+ remains--; /* I've aborted */
}
- if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
- fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
+ fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
remains--; /* I've aborted */
- PQfinish(state[i].con);
- state[i].con = NULL;
+ PQfinish(st->con);
+ st->con = NULL;
}
}
}
+
+done:
+ INSTR_TIME_SET_CURRENT(start);
+ disconnect_all(state, nstate);
+ result->xacts = 0;
+ for (i = 0; i < nstate; i++)
+ result->xacts += state[i].cnt;
+ INSTR_TIME_SET_CURRENT(end);
+ INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ if (logfile)
+ fclose(logfile);
+ return result;
}
+
+
+/*
+ * Support for duration option: set timer_exceeded after so many seconds.
+ */
+
+#ifndef WIN32
+
+static void
+handle_sig_alarm(SIGNAL_ARGS)
+{
+ timer_exceeded = true;
+}
+
+static void
+setalarm(int seconds)
+{
+ pqsignal(SIGALRM, handle_sig_alarm);
+ alarm(seconds);
+}
+
+#ifndef ENABLE_THREAD_SAFETY
+
+/*
+ * implements pthread using fork.
+ */
+
+typedef struct fork_pthread
+{
+ pid_t pid;
+ int pipes[2];
+} fork_pthread;
+
+static int
+pthread_create(pthread_t *thread,
+ pthread_attr_t *attr,
+ void *(*start_routine) (void *),
+ void *arg)
+{
+ fork_pthread *th;
+ void *ret;
+ instr_time start_time;
+
+ th = (fork_pthread *) malloc(sizeof(fork_pthread));
+ pipe(th->pipes);
+
+ th->pid = fork();
+ if (th->pid == -1) /* error */
+ {
+ free(th);
+ return errno;
+ }
+ if (th->pid != 0) /* in parent process */
+ {
+ close(th->pipes[1]);
+ *thread = th;
+ return 0;
+ }
+
+ /* in child process */
+ close(th->pipes[0]);
+
+ /* set alarm again because the child does not inherit timers */
+ if (duration > 0)
+ setalarm(duration);
+
+ /*
+ * Set a different random seed in each child process. Otherwise they all
+ * inherit the parent's state and generate the same "random" sequence. (In
+ * the threaded case, the different threads will obtain subsets of the
+ * output of a single random() sequence, which should be okay for our
+ * purposes.)
+ */
+ INSTR_TIME_SET_CURRENT(start_time);
+ srandom(((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)) +
+ ((unsigned int) getpid()));
+
+ ret = start_routine(arg);
+ write(th->pipes[1], ret, sizeof(TResult));
+ close(th->pipes[1]);
+ free(th);
+ exit(0);
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+ int status;
+
+ while (waitpid(th->pid, &status, 0) != th->pid)
+ {
+ if (errno != EINTR)
+ return errno;
+ }
+
+ if (thread_return != NULL)
+ {
+ /* assume result is TResult */
+ *thread_return = malloc(sizeof(TResult));
+ if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
+ {
+ free(*thread_return);
+ *thread_return = NULL;
+ }
+ }
+ close(th->pipes[0]);
+
+ free(th);
+ return 0;
+}
+#endif
+#else /* WIN32 */
+
+static VOID CALLBACK
+win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
+{
+ timer_exceeded = true;
+}
+
+static void
+setalarm(int seconds)
+{
+ HANDLE queue;
+ HANDLE timer;
+
+ /* This function will be called at most once, so we can cheat a bit. */
+ queue = CreateTimerQueue();
+ if (seconds > ((DWORD) -1) / 1000 ||
+ !CreateTimerQueueTimer(&timer, queue,
+ win32_timer_callback, NULL, seconds * 1000, 0,
+ WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
+ {
+ fprintf(stderr, "Failed to set timer\n");
+ exit(1);
+ }
+}
+
+/* partial pthread implementation for Windows */
+
+typedef struct win32_pthread
+{
+ HANDLE handle;
+ void *(*routine) (void *);
+ void *arg;
+ void *result;
+} win32_pthread;
+
+static unsigned __stdcall
+win32_pthread_run(void *arg)
+{
+ win32_pthread *th = (win32_pthread *) arg;
+
+ th->result = th->routine(th->arg);
+
+ return 0;
+}
+
+static int
+pthread_create(pthread_t *thread,
+ pthread_attr_t *attr,
+ void *(*start_routine) (void *),
+ void *arg)
+{
+ int save_errno;
+ win32_pthread *th;
+
+ th = (win32_pthread *) malloc(sizeof(win32_pthread));
+ th->routine = start_routine;
+ th->arg = arg;
+ th->result = NULL;
+
+ th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+ if (th->handle == NULL)
+ {
+ save_errno = errno;
+ free(th);
+ return save_errno;
+ }
+
+ *thread = th;
+ return 0;
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+ if (th == NULL || th->handle == NULL)
+ return errno = EINVAL;
+
+ if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+ {
+ _dosmaperr(GetLastError());
+ return errno;
+ }
+
+ if (thread_return)
+ *thread_return = th->result;
+
+ CloseHandle(th->handle);
+ free(th);
+ return 0;
+}
+
+#endif /* WIN32 */