* A simple benchmark program for PostgreSQL
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
- * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.93 2009/12/15 07:17:57 itagaki Exp $
- * Copyright (c) 2000-2009, PostgreSQL Global Development Group
+ * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.99 2010/07/06 19:18:55 momjian Exp $
+ * Copyright (c) 2000-2010, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
*/
#ifdef WIN32
-#define FD_SETSIZE 1024 /* set before winsock2.h is included */
+#define FD_SETSIZE 1024 /* set before winsock2.h is included */
#endif /* ! WIN32 */
#include "postgres_fe.h"
#include "libpq-fe.h"
-#include "pqsignal.h"
+#include "libpq/pqsignal.h"
#include "portability/instr_time.h"
#include <ctype.h>
-#ifdef WIN32
-#include <win32.h>
-#else
-#include <signal.h>
+#ifndef WIN32
#include <sys/time.h>
#include <unistd.h>
#endif /* ! WIN32 */
#ifdef WIN32
/* Use native win32 threads on Windows */
-typedef struct win32_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
-static int pthread_join(pthread_t th, void **thread_return);
+typedef struct win32_pthread *pthread_t;
+typedef int pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
#elif defined(ENABLE_THREAD_SAFETY)
/* Use platform-dependent pthread capability */
#include <pthread.h>
-
#else
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
#define pthread_create pg_pthread_create
#define pthread_join pg_pthread_join
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
-static int pthread_join(pthread_t th, void **thread_return);
+typedef struct fork_pthread *pthread_t;
+typedef int pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
#endif
extern char *optarg;
* end of configurable parameters
*********************************************************************/
-#define nbranches 1 /* Makes little sense to change this. Change -s instead */
+#define nbranches 1 /* Makes little sense to change this. Change
+ * -s instead */
#define ntellers 10
#define naccounts 100000
-FILE *LOGFILE = NULL;
-
bool use_log; /* log transaction latencies to a file */
-
-int is_connect; /* establish connection for each transaction */
+bool is_connect; /* establish connection for each transaction */
+int main_pid; /* main process id used in log filename */
char *pghost = "";
char *pgport = "";
} Variable;
#define MAX_FILES 128 /* max number of SQL script files allowed */
-#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
+#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
/*
* structures used in custom query mode
*/
typedef struct
{
- pthread_t thread; /* thread handle */
- CState *state; /* array of CState */
- int nstate; /* length of state[] */
- instr_time start_time; /* thread start time */
+ int tid; /* thread id */
+ pthread_t thread; /* thread handle */
+ CState *state; /* array of CState */
+ int nstate; /* length of state[] */
+ instr_time start_time; /* thread start time */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
typedef struct
{
- instr_time conn_time;
- int xacts;
+ instr_time conn_time;
+ int xacts;
} TResult;
/*
char *argv[MAX_ARGS]; /* command list */
} Command;
-static Command **sql_files[MAX_FILES]; /* SQL script files */
-static int num_files; /* number of script files */
-static int debug = 0; /* debug flag */
+static Command **sql_files[MAX_FILES]; /* SQL script files */
+static int num_files; /* number of script files */
+static int debug = 0; /* debug flag */
/* default scenario */
static char *tpc_b = {
/* Function prototypes */
static void setalarm(int seconds);
-static void* threadRun(void *arg);
+static void *threadRun(void *arg);
static void
usage(const char *progname)
return NULL;
}
+/* check whether the name consists of alphabets, numerals and underscores. */
+static bool
+isLegalVariableName(const char *name)
+{
+ int i;
+
+ for (i = 0; name[i] != '\0'; i++)
+ {
+ if (!isalnum((unsigned char) name[i]) && name[i] != '_')
+ return false;
+ }
+
+ return true;
+}
+
static int
-putVariable(CState *st, char *name, char *value)
+putVariable(CState *st, const char *context, char *name, char *value)
{
Variable key,
*var;
{
Variable *newvars;
+ /*
+ * Check for the name only when declaring a new variable to avoid
+ * overhead.
+ */
+ if (!isLegalVariableName(name))
+ {
+ fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
+ return false;
+ }
+
if (st->variables)
newvars = (Variable *) realloc(st->variables,
(st->nvariables + 1) * sizeof(Variable));
newvars = (Variable *) malloc(sizeof(Variable));
if (newvars == NULL)
- return false;
+ goto out_of_memory;
st->variables = newvars;
}
return true;
+
+out_of_memory:
+ fprintf(stderr, "%s: out of memory for variable '%s'\n", context, name);
+ return false;
}
static char *
static bool
runShellCommand(CState *st, char *variable, char **argv, int argc)
{
- char command[SHELL_COMMAND_SIZE];
- int i,
- len = 0;
- FILE *fp;
- char res[64];
- char *endptr;
- int retval;
+ char command[SHELL_COMMAND_SIZE];
+ int i,
+ len = 0;
+ FILE *fp;
+ char res[64];
+ char *endptr;
+ int retval;
/*
* Join arguments with whilespace separaters. Arguments starting with
- * exactly one colon are treated as variables:
- * name - append a string "name"
- * :var - append a variable named 'var'.
- * ::name - append a string ":name"
+ * exactly one colon are treated as variables: name - append a string
+ * "name" :var - append a variable named 'var'. ::name - append a string
+ * ":name"
*/
for (i = 0; i < argc; i++)
{
- char *arg;
- int arglen;
+ char *arg;
+ int arglen;
if (argv[i][0] != ':')
{
- arg = argv[i]; /* a string literal */
+ arg = argv[i]; /* a string literal */
}
else if (argv[i][1] == ':')
{
return false;
}
snprintf(res, sizeof(res), "%d", retval);
- if (!putVariable(st, variable, res))
- {
- fprintf(stderr, "%s: out of memory\n", argv[0]);
+ if (!putVariable(st, "setshell", variable, res))
return false;
- }
#ifdef DEBUG
printf("shell parameter name: %s, value: %s\n", argv[1], res);
static bool
clientDone(CState *st, bool ok)
{
- (void) ok; /* unused */
+ (void) ok; /* unused */
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
- return false; /* always false */
+ return false; /* always false */
}
/* return false iff client should be disconnected */
static bool
-doCustom(CState *st, instr_time *conn_time)
+doCustom(CState *st, instr_time *conn_time, FILE *logfile)
{
PGresult *res;
Command **commands;
/*
* transaction finished: record the time it took in the log
*/
- if (use_log && commands[st->state + 1] == NULL)
+ if (logfile && commands[st->state + 1] == NULL)
{
instr_time now;
instr_time diff;
#ifndef WIN32
/* This is more than we really ought to know about instr_time */
- fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
+ fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
st->id, st->cnt, usec, st->use_file,
(long) now.tv_sec, (long) now.tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
- fprintf(LOGFILE, "%d %d %.0f %d 0 0\n",
+ fprintf(logfile, "%d %d %.0f %d 0 0\n",
st->id, st->cnt, usec, st->use_file);
#endif
}
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
- break; /* OK */
+ break; /* OK */
default:
fprintf(stderr, "Client %d aborted in state %d: %s",
- st->id, st->state, PQerrorMessage(st->con));
+ st->id, st->state, PQerrorMessage(st->con));
PQclear(res);
return clientDone(st, false);
}
if (st->con == NULL)
{
- instr_time start, end;
+ instr_time start,
+ end;
INSTR_TIME_SET_CURRENT(start);
if ((st->con = doConnect()) == NULL)
INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
- if (use_log && st->state == 0)
+ if (logfile && st->state == 0)
INSTR_TIME_SET_CURRENT(st->txn_begin);
if (commands[st->state]->type == SQL_COMMAND)
#endif
snprintf(res, sizeof(res), "%d", getrand(min, max));
- if (putVariable(st, argv[1], res) == false)
+ if (!putVariable(st, argv[0], argv[1], res))
{
- fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
return true;
}
}
}
- if (putVariable(st, argv[1], res) == false)
+ if (!putVariable(st, argv[0], argv[1], res))
{
- fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
return true;
}
{
char *var;
int usec;
- instr_time now;
+ instr_time now;
if (*argv[1] == ':')
{
}
else if (pg_strcasecmp(argv[0], "setshell") == 0)
{
- bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+ bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
- if (timer_exceeded) /* timeout */
+ if (timer_exceeded) /* timeout */
return clientDone(st, true);
else if (!ret) /* on error */
{
}
else if (pg_strcasecmp(argv[0], "shell") == 0)
{
- bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+ bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
- if (timer_exceeded) /* timeout */
+ if (timer_exceeded) /* timeout */
return clientDone(st, true);
else if (!ret) /* on error */
{
*/
if (my_commands->argv[1][0] != ':')
{
- char *c = my_commands->argv[1];
+ char *c = my_commands->argv[1];
while (isdigit((unsigned char) *c))
c++;
time_include = INSTR_TIME_GET_DOUBLE(total_time);
tps_include = normal_xacts / time_include;
tps_exclude = normal_xacts / (time_include -
- (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
+ (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
if (ttype == 0)
s = "TPC-B (sort of)";
main(int argc, char **argv)
{
int c;
- int nclients = 1; /* default number of simulated clients */
- int nthreads = 1; /* default number of threads */
+ int nclients = 1; /* default number of simulated clients */
+ int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
}
#endif /* HAVE_GETRLIMIT */
break;
- case 'j': /* jobs */
+ case 'j': /* jobs */
nthreads = atoi(optarg);
if (nthreads <= 0)
{
}
break;
case 'C':
- is_connect = 1;
+ is_connect = true;
break;
case 's':
scale_given = true;
}
*p++ = '\0';
- if (putVariable(&state[0], optarg, p) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
+ if (!putVariable(&state[0], "option", optarg, p))
exit(1);
- }
}
break;
case 'F':
exit(1);
}
+ /*
+ * save main process id in the global variable because process id will be
+ * changed after fork.
+ */
+ main_pid = (int) getpid();
+
if (nclients > 1)
{
state = (CState *) realloc(state, sizeof(CState) * nclients);
state[i].id = i;
for (j = 0; j < state[0].nvariables; j++)
{
- if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
+ if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
exit(1);
- }
}
}
}
- if (use_log)
- {
- char logpath[64];
-
- snprintf(logpath, 64, "pgbench_log.%d", (int) getpid());
- LOGFILE = fopen(logpath, "w");
-
- if (LOGFILE == NULL)
- {
- fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
- exit(1);
- }
- }
-
if (debug)
{
if (duration <= 0)
snprintf(val, sizeof(val), "%d", scale);
for (i = 0; i < nclients; i++)
{
- if (putVariable(&state[i], "scale", val) == false)
- {
- fprintf(stderr, "Couldn't allocate memory for variable\n");
+ if (!putVariable(&state[i], "startup", "scale", val))
exit(1);
- }
}
}
threads = (TState *) malloc(sizeof(TState) * nthreads);
for (i = 0; i < nthreads; i++)
{
+ threads[i].tid = i;
threads[i].state = &state[nclients / nthreads * i];
threads[i].nstate = nclients / nthreads;
INSTR_TIME_SET_CURRENT(threads[i].start_time);
/* the first thread (i = 0) is executed by main thread */
if (i > 0)
{
- int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+ int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+
if (err != 0 || threads[i].thread == INVALID_THREAD)
{
fprintf(stderr, "cannot create thread: %s\n", strerror(err));
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
- void *ret = NULL;
+ void *ret = NULL;
if (threads[i].thread == INVALID_THREAD)
ret = threadRun(&threads[i]);
if (ret != NULL)
{
- TResult *r = (TResult *) ret;
+ TResult *r = (TResult *) ret;
+
total_xacts += r->xacts;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
free(ret);
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
- if (LOGFILE)
- fclose(LOGFILE);
return 0;
}
{
TState *thread = (TState *) arg;
CState *state = thread->state;
- TResult *result;
- instr_time start, end;
+ TResult *result;
+ FILE *logfile = NULL; /* per-thread log file */
+ instr_time start,
+ end;
int nstate = thread->nstate;
- int remains = nstate; /* number of remaining clients */
+ int remains = nstate; /* number of remaining clients */
int i;
result = malloc(sizeof(TResult));
INSTR_TIME_SET_ZERO(result->conn_time);
- if (is_connect == 0)
+ /* open log file if requested */
+ if (use_log)
+ {
+ char logpath[64];
+
+ if (thread->tid == 0)
+ snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
+ else
+ snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
+ logfile = fopen(logpath, "w");
+
+ if (logfile == NULL)
+ {
+ fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+ goto done;
+ }
+ }
+
+ if (!is_connect)
{
/* make connections to the database */
for (i = 0; i < nstate; i++)
int prev_ecnt = st->ecnt;
st->use_file = getrand(0, num_files - 1);
- if (!doCustom(st, &result->conn_time))
- remains--; /* I've aborted */
+ if (!doCustom(st, &result->conn_time, logfile))
+ remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
{
while (remains > 0)
{
- fd_set input_mask;
- int maxsock; /* max socket number to be waited */
- int64 now_usec = 0;
- int64 min_usec;
+ fd_set input_mask;
+ int maxsock; /* max socket number to be waited */
+ int64 now_usec = 0;
+ int64 min_usec;
FD_ZERO(&input_mask);
if (min_usec == INT64_MAX)
{
instr_time now;
+
INSTR_TIME_SET_CURRENT(now);
now_usec = INSTR_TIME_GET_MICROSEC(now);
}
}
FD_SET(sock, &input_mask);
+
if (maxsock < sock)
maxsock = sock;
}
if (min_usec > 0 && maxsock != -1)
{
- int nsocks; /* return from select(2) */
+ int nsocks; /* return from select(2) */
if (min_usec != INT64_MAX)
{
- struct timeval timeout;
+ struct timeval timeout;
+
timeout.tv_sec = min_usec / 1000000;
timeout.tv_usec = min_usec % 1000000;
nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
int prev_ecnt = st->ecnt;
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
- || commands[st->state]->type == META_COMMAND))
+ || commands[st->state]->type == META_COMMAND))
{
- if (!doCustom(st, &result->conn_time))
- remains--; /* I've aborted */
+ if (!doCustom(st, &result->conn_time, logfile))
+ remains--; /* I've aborted */
}
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
result->xacts += state[i].cnt;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ if (logfile)
+ fclose(logfile);
return result;
}
typedef struct fork_pthread
{
- pid_t pid;
- int pipes[2];
-} fork_pthread;
+ pid_t pid;
+ int pipes[2];
+} fork_pthread;
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
- void * (*start_routine)(void *),
+ void *(*start_routine) (void *),
void *arg)
{
- fork_pthread *th;
- void *ret;
- instr_time start_time;
+ fork_pthread *th;
+ void *ret;
+ instr_time start_time;
th = (fork_pthread *) malloc(sizeof(fork_pthread));
pipe(th->pipes);
th->pid = fork();
- if (th->pid == -1) /* error */
+ if (th->pid == -1) /* error */
{
free(th);
return errno;
}
- if (th->pid != 0) /* in parent process */
+ if (th->pid != 0) /* in parent process */
{
close(th->pipes[1]);
*thread = th;
setalarm(duration);
/*
- * Set a different random seed in each child process. Otherwise they
- * all inherit the parent's state and generate the same "random"
- * sequence. (In the threaded case, the different threads will obtain
- * subsets of the output of a single random() sequence, which should be
- * okay for our purposes.)
+ * Set a different random seed in each child process. Otherwise they all
+ * inherit the parent's state and generate the same "random" sequence. (In
+ * the threaded case, the different threads will obtain subsets of the
+ * output of a single random() sequence, which should be okay for our
+ * purposes.)
*/
INSTR_TIME_SET_CURRENT(start_time);
srandom(((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)) +
static int
pthread_join(pthread_t th, void **thread_return)
{
- int status;
+ int status;
while (waitpid(th->pid, &status, 0) != th->pid)
{
free(th);
return 0;
}
-
#endif
-
#else /* WIN32 */
static VOID CALLBACK
typedef struct win32_pthread
{
HANDLE handle;
- void *(*routine)(void *);
+ void *(*routine) (void *);
void *arg;
void *result;
-} win32_pthread;
+} win32_pthread;
static unsigned __stdcall
win32_pthread_run(void *arg)
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
- void * (*start_routine)(void *),
+ void *(*start_routine) (void *),
void *arg)
{
- int save_errno;
- win32_pthread *th;
+ int save_errno;
+ win32_pthread *th;
th = (win32_pthread *) malloc(sizeof(win32_pthread));
th->routine = start_routine;