/*
- * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.12 2001/10/25 05:49:19 momjian Exp $
+ * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.20 2002/10/07 05:10:02 ishii Exp $
*
* pgbench: a simple TPC-B like benchmark program for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2000 Tatsuo Ishii
+ * Copyright (c) 2000-2002 Tatsuo Ishii
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
/* for getrlimit */
#include <sys/resource.h>
-#endif /* WIN32 */
+#endif /* ! WIN32 */
/********************************************************************
* some configurable parameters */
#define ntellers 10
#define naccounts 100000
-int remains; /* number of remained clients */
+FILE *LOGFILE = NULL;
+
+bool use_log; /* log transaction latencies to a file */
+
+int remains; /* number of remaining clients */
int is_connect; /* establish connection for each
- * transactoin */
+ * transaction */
char *pghost = "";
char *pgport = NULL;
typedef struct
{
PGconn *con; /* connection handle to DB */
+ int id; /* client No. */
int state; /* state No. */
int cnt; /* xacts count */
int ecnt; /* error count */
- int listen; /* none 0 indicates that an async query
- * has been sent */
+ int listen; /* 0 indicates that an async query has
+ * been sent */
int aid; /* account id for this transaction */
int bid; /* branch id for this transaction */
int tid; /* teller id for this transaction */
int delta;
int abalance;
-} CState;
+ struct timeval txn_begin; /* used for measuring latencies */
+} CState;
static void
usage()
{
- fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-U login][-P password][-d][dbname]\n");
+ fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-l][-U login][-P password][-d][dbname]\n");
fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
}
/* process a transaction */
static void
-doOne(CState * state, int n, int debug)
+doOne(CState * state, int n, int debug, int ttype)
{
char sql[256];
PGresult *res;
{ /* are we receiver? */
if (debug)
fprintf(stderr, "client %d receiving\n", n);
- while (PQisBusy(st->con) == TRUE)
- {
- if (!PQconsumeInput(st->con))
- { /* there's something wrong */
- fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return;
- }
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
+ remains--; /* I've aborted */
+ PQfinish(st->con);
+ st->con = NULL;
+ return;
}
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
switch (st->state)
{
discard_response(st);
break;
case 6: /* response to "end" */
+
+ /*
+ * transaction finished: record the time it took in the
+ * log
+ */
+ if (use_log)
+ {
+ long long diff;
+ struct timeval now;
+
+ gettimeofday(&now, 0);
+ diff = (now.tv_sec - st->txn_begin.tv_sec) * 1000000 +
+ (now.tv_usec - st->txn_begin.tv_usec);
+
+ fprintf(LOGFILE, "%d %d %lld\n", st->id, st->cnt, diff);
+ }
+
res = PQgetResult(st->con);
if (check(state, res, n, PGRES_COMMAND_OK))
return;
if (++st->cnt >= nxacts)
{
- remains--; /* I've done */
+ remains--; /* I'm done */
if (st->con != NULL)
{
PQfinish(st->con);
st->bid = getrand(1, nbranches * tps);
st->tid = getrand(1, ntellers * tps);
st->delta = getrand(1, 1000);
+ if (use_log)
+ gettimeofday(&(st->txn_begin), 0);
break;
case 1:
- sprintf(sql, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
+ snprintf(sql, 256, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
break;
case 2:
- sprintf(sql, "select abalance from accounts where aid = %d", st->aid);
+ snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
break;
case 3:
- sprintf(sql, "update tellers set tbalance = tbalance + %d where tid = %d\n",
- st->delta, st->tid);
- break;
+ if (ttype == 0)
+ {
+ snprintf(sql, 256, "update tellers set tbalance = tbalance + %d where tid = %d\n",
+ st->delta, st->tid);
+ break;
+ }
case 4:
- sprintf(sql, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
- break;
+ if (ttype == 0)
+ {
+ snprintf(sql, 256, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
+ break;
+ }
case 5:
- sprintf(sql, "insert into history(tid,bid,aid,delta,mtime) values(%d,%d,%d,%d,'now')",
- st->tid, st->bid, st->aid, st->delta);
+ snprintf(sql, 256, "insert into history(tid,bid,aid,delta,mtime) values(%d,%d,%d,%d,'now')",
+ st->tid, st->bid, st->aid, st->delta);
break;
case 6:
strcpy(sql, "end");
}
else
{
- st->listen++; /* flags that should be listned */
+ st->listen++; /* flags that should be listened */
}
}
{ /* are we receiver? */
if (debug)
fprintf(stderr, "client %d receiving\n", n);
- while (PQisBusy(st->con) == TRUE)
- {
- if (!PQconsumeInput(st->con))
- { /* there's something wrong */
- fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- return;
- }
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
+ remains--; /* I've aborted */
+ PQfinish(st->con);
+ st->con = NULL;
+ return;
}
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
switch (st->state)
{
{
case 0:
st->aid = getrand(1, naccounts * tps);
- sprintf(sql, "select abalance from accounts where aid = %d", st->aid);
+ snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
break;
}
}
else
{
- st->listen++; /* flags that should be listned */
+ st->listen++; /* flags that should be listened */
}
}
/* create tables and setup data */
static void
-init()
+init(void)
{
PGconn *con;
PGresult *res;
for (i = 0; i < nbranches * tps; i++)
{
- sprintf(sql, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
+ snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
res = PQexec(con, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
for (i = 0; i < ntellers * tps; i++)
{
- sprintf(sql, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
- ,i + 1, i / ntellers + 1);
+ snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
+ ,i + 1, i / ntellers + 1);
res = PQexec(con, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
PQclear(res);
}
- sprintf(sql, "%d\t%d\t%d\t\n", j, j / naccounts, 0);
+ snprintf(sql, 256, "%d\t%d\t%d\t\n", j, j / naccounts, 0);
if (PQputline(con, sql))
{
fprintf(stderr, "PQputline failed\n");
exit(1);
}
+#ifdef NOT_USED
+
/*
* do a checkpoint to purge the old WAL logs
*/
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
+#endif /* NOT_USED */
}
}
t2;
int i;
int normal_xacts = 0;
+ char *s;
for (i = 0; i < nclients; i++)
normal_xacts += state[i].cnt;
t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
t2 = normal_xacts * 1000000.0 / t2;
- printf("transaction type: %s\n", ttype == 0 ? "TPC-B (sort of)" : "SELECT only");
+ if (ttype == 0)
+ s = "TPC-B (sort of)";
+ else if (ttype == 2)
+ s = "Update only accounts";
+ else
+ s = "SELECT only";
+
+ printf("transaction type: %s\n", s);
printf("scaling factor: %d\n", tps);
printf("number of clients: %d\n", nclients);
printf("number of transactions per client: %d\n", nxacts);
printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
- printf("tps = %f(including connections establishing)\n", t1);
- printf("tps = %f(excluding connections establishing)\n", t2);
+ printf("tps = %f (including connections establishing)\n", t1);
+ printf("tps = %f (excluding connections establishing)\n", t2);
}
int is_full_vacuum = 0; /* do full vacuum before testing? */
int debug = 0; /* debug flag */
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT
- * only */
+ * only, 2: skip update of branches and
+ * tellers */
- static CState state[MAXCLIENTS]; /* clients status */
+ static CState *state; /* status of clients */
struct timeval tv1; /* start up time */
struct timeval tv2; /* after establishing all connections to
PGconn *con;
PGresult *res;
- while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CS")) != EOF)
+ while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1)
{
switch (c)
{
case 'S':
ttype = 1;
break;
+ case 'N':
+ ttype = 2;
+ break;
case 'c':
nclients = atoi(optarg);
if (nclients <= 0 || nclients > MAXCLIENTS)
{
- fprintf(stderr, "wrong number of clients: %d\n", nclients);
+ fprintf(stderr, "invalid number of clients: %d\n", nclients);
exit(1);
}
#ifndef __CYGWIN__
#else /* but BSD doesn't ... */
if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
{
-#endif /* HAVE_RLIMIT_NOFILE */
+#endif /* HAVE_RLIMIT_NOFILE */
fprintf(stderr, "getrlimit failed. reason: %s\n", strerror(errno));
exit(1);
}
fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
exit(1);
}
-#endif /* #ifndef __CYGWIN__ */
+#endif /* #ifndef __CYGWIN__ */
break;
case 'C':
is_connect = 1;
tps = atoi(optarg);
if (tps <= 0)
{
- fprintf(stderr, "wrong scaling factor: %d\n", tps);
+ fprintf(stderr, "invalid scaling factor: %d\n", tps);
exit(1);
}
break;
nxacts = atoi(optarg);
if (nxacts <= 0)
{
- fprintf(stderr, "wrong number of transactions: %d\n", nxacts);
+ fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
exit(1);
}
break;
case 'P':
pwd = optarg;
break;
+ case 'l':
+ use_log = true;
+ break;
default:
usage();
exit(1);
remains = nclients;
+ state = (CState *) malloc(sizeof(*state) * nclients);
+ memset(state, 0, sizeof(*state));
+
+ if (use_log)
+ {
+ char logpath[64];
+
+ snprintf(logpath, 64, "pgbench_log.%d", getpid());
+ LOGFILE = fopen(logpath, "w");
+
+ if (LOGFILE == NULL)
+ {
+ fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+ exit(1);
+ }
+ }
+
if (debug)
{
printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
/* make connections to the database */
for (i = 0; i < nclients; i++)
{
+ state[i].id = i;
if ((state[i].con = doConnect()) == NULL)
exit(1);
}
/* time after connections set up */
gettimeofday(&tv2, 0);
- /* send start up quries in async manner */
+ /* send start up queries in async manner */
for (i = 0; i < nclients; i++)
{
- if (ttype == 0)
- doOne(state, i, debug);
+ if (ttype == 0 || ttype == 2)
+ doOne(state, i, debug, ttype);
else if (ttype == 1)
doSelectOnly(state, i, debug);
}
/* get end time */
gettimeofday(&tv3, 0);
printResults(ttype, state, &tv1, &tv2, &tv3);
+ if (LOGFILE)
+ fclose(LOGFILE);
exit(0);
}
if (sock < 0)
{
- fprintf(stderr, "Client %d: PQsock failed\n", i);
+ fprintf(stderr, "Client %d: PQsocket failed\n", i);
disconnect_all(state);
exit(1);
}
{
if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask))
{
- if (ttype == 0)
- doOne(state, i, debug);
+ if (ttype == 0 || ttype == 2)
+ doOne(state, i, debug, ttype);
else if (ttype == 1)
doSelectOnly(state, i, debug);
}