2 * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.6 2000/09/29 13:53:29 petere Exp $
4 * pgbench: a simple TPC-B like benchmark program for PostgreSQL
5 * written by Tatsuo Ishii
7 * Copyright (c) 2000 Tatsuo Ishii
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
39 #ifdef HAVE_SYS_SELECT_H
40 #include <sys/select.h>
44 #include <sys/resource.h>
48 /********************************************************************
49 * some configurable parameters */
51 #define MAXCLIENTS 1024 /* max number of clients allowed */
53 int nclients = 1; /* default number of simulated clients */
54 int nxacts = 10; /* default number of transactions per
58 * scaling factor. for example, tps = 10 will make 1000000 tuples of
64 * end of configurable parameters
65 *********************************************************************/
69 #define naccounts 100000
71 int remains; /* number of remained clients */
75 PGconn *con; /* connection handle to DB */
76 int state; /* state No. */
77 int cnt; /* xacts count */
78 int ecnt; /* error count */
79 int listen; /* none 0 indicates that an async query
81 int aid; /* account id for this transaction */
82 int bid; /* branch id for this transaction */
83 int tid; /* teller id for this transaction */
91 fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-v][-S][-d][dbname]\n");
92 fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-d][dbname]\n");
95 /* random number generator */
97 getrand(int min, int max)
99 return (min + (int) (max * 1.0 * rand() / (RAND_MAX + 1.0)));
102 /* throw away response from backend */
104 discard_response(CState * state)
110 res = PQgetResult(state->con);
117 check(CState * state, PGresult *res, int n, int good)
119 CState *st = &state[n];
121 if (res && PQresultStatus(res) != good)
123 fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con));
124 remains--; /* I've aborted */
132 /* process a transaction */
134 doOne(CState * state, int n, int debug)
138 CState *st = &state[n];
141 { /* are we receiver? */
143 fprintf(stderr, "client %d receiving\n", n);
144 while (PQisBusy(st->con) == TRUE)
146 if (!PQconsumeInput(st->con))
147 { /* there's something wrong */
148 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
149 remains--; /* I've aborted */
158 case 0: /* response to "begin" */
159 res = PQgetResult(st->con);
160 if (check(state, res, n, PGRES_COMMAND_OK))
163 discard_response(st);
165 case 1: /* response to "update accounts..." */
166 res = PQgetResult(st->con);
167 if (check(state, res, n, PGRES_COMMAND_OK))
170 discard_response(st);
172 case 2: /* response to "select abalance ..." */
173 res = PQgetResult(st->con);
174 if (check(state, res, n, PGRES_TUPLES_OK))
177 discard_response(st);
179 case 3: /* response to "update tellers ..." */
180 res = PQgetResult(st->con);
181 if (check(state, res, n, PGRES_COMMAND_OK))
184 discard_response(st);
186 case 4: /* response to "update branches ..." */
187 res = PQgetResult(st->con);
188 if (check(state, res, n, PGRES_COMMAND_OK))
191 discard_response(st);
193 case 5: /* response to "insert into history ..." */
194 res = PQgetResult(st->con);
195 if (check(state, res, n, PGRES_COMMAND_OK))
198 discard_response(st);
200 case 6: /* response to "end" */
201 res = PQgetResult(st->con);
202 if (check(state, res, n, PGRES_COMMAND_OK))
205 discard_response(st);
207 if (++st->cnt >= nxacts)
209 remains--; /* I've done */
217 /* increment state counter */
225 case 0: /* about to start */
226 strcpy(sql, "begin");
227 st->aid = getrand(1, naccounts * tps);
228 st->bid = getrand(1, nbranches * tps);
229 st->tid = getrand(1, ntellers * tps);
230 st->delta = getrand(1, 1000);
233 sprintf(sql, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
236 sprintf(sql, "select abalance from accounts where aid = %d", st->aid);
239 sprintf(sql, "update tellers set tbalance = tbalance + %d where tid = %d\n",
243 sprintf(sql, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
246 sprintf(sql, "insert into history(tid,bid,aid,delta,time) values(%d,%d,%d,%d,'now')",
247 st->tid, st->bid, st->aid, st->delta);
255 fprintf(stderr, "client %d sending %s\n", n, sql);
256 if (PQsendQuery(st->con, sql) == 0)
259 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
264 st->listen++; /* flags that should be listned */
268 /* process a select only transaction */
270 doSelectOnly(CState * state, int n, int debug)
274 CState *st = &state[n];
277 { /* are we receiver? */
279 fprintf(stderr, "client %d receiving\n", n);
280 while (PQisBusy(st->con) == TRUE)
282 if (!PQconsumeInput(st->con))
283 { /* there's something wrong */
284 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
285 remains--; /* I've aborted */
294 case 0: /* response to "select abalance ..." */
295 res = PQgetResult(st->con);
296 if (check(state, res, n, PGRES_TUPLES_OK))
299 discard_response(st);
301 if (++st->cnt >= nxacts)
303 remains--; /* I've done */
311 /* increment state counter */
320 st->aid = getrand(1, naccounts * tps);
321 sprintf(sql, "select abalance from accounts where aid = %d", st->aid);
326 fprintf(stderr, "client %d sending %s\n", n, sql);
328 if (PQsendQuery(st->con, sql) == 0)
331 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
336 st->listen++; /* flags that should be listned */
340 /* discard connections */
342 disconnect_all(CState * state)
346 for (i = 0; i < nclients; i++)
349 PQfinish(state[i].con);
353 /* create tables and setup data */
355 init(char *pghost, char *pgport, char *dbName)
359 static char *DDLs[] = {
360 "drop table branches",
361 "create table branches(bid int, primary key(bid),bbalance int,filler char(88))",
362 "drop table tellers",
363 "create table tellers(tid int, primary key(tid),bid int,tbalance int,filler char(84))",
364 "drop table accounts",
365 "create table accounts(aid int,primary key(aid),bid int,abalance int,filler char(84))",
366 "drop table history",
367 "create table history(tid int,bid int,aid int,delta int,time timestamp,filler char(22))"};
372 con = PQsetdb(pghost, pgport, NULL, NULL, dbName);
373 if (PQstatus(con) == CONNECTION_BAD)
375 fprintf(stderr, "Connection to database '%s' on %s failed.\n", dbName, pghost);
376 fprintf(stderr, "%s", PQerrorMessage(con));
380 for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
382 res = PQexec(con, DDLs[i]);
383 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
385 fprintf(stderr, "%s", PQerrorMessage(con));
391 res = PQexec(con, "begin");
392 if (PQresultStatus(res) != PGRES_COMMAND_OK)
394 fprintf(stderr, "%s", PQerrorMessage(con));
398 for (i = 0; i < nbranches * tps; i++)
400 sprintf(sql, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
401 res = PQexec(con, sql);
402 if (PQresultStatus(res) != PGRES_COMMAND_OK)
404 fprintf(stderr, "%s", PQerrorMessage(con));
410 for (i = 0; i < ntellers * tps; i++)
412 sprintf(sql, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
413 ,i + 1, i / ntellers + 1);
414 res = PQexec(con, sql);
415 if (PQresultStatus(res) != PGRES_COMMAND_OK)
417 fprintf(stderr, "%s", PQerrorMessage(con));
423 res = PQexec(con, "copy accounts from stdin");
424 if (PQresultStatus(res) != PGRES_COPY_IN)
426 fprintf(stderr, "%s", PQerrorMessage(con));
431 fprintf(stderr, "creating tables...\n");
432 for (i = 0; i < naccounts * tps; i++)
436 sprintf(sql, "%d\t%d\t%d\t\n", i + 1, (i + 1) / naccounts, 0);
437 if (PQputline(con, sql))
439 fprintf(stderr, "PQputline failed\n");
443 fprintf(stderr, "%d tuples done.\n", j);
445 if (PQputline(con, "\\.\n"))
447 fprintf(stderr, "very last PQputline failed\n");
453 fprintf(stderr, "PQendcopy failed\n");
457 res = PQexec(con, "end");
458 if (PQresultStatus(res) != PGRES_COMMAND_OK)
460 fprintf(stderr, "%s", PQerrorMessage(con));
465 fprintf(stderr, "vacuum...");
466 res = PQexec(con, "vacuum analyze");
467 if (PQresultStatus(res) != PGRES_COMMAND_OK)
469 fprintf(stderr, "%s", PQerrorMessage(con));
472 fprintf(stderr, "done.\n");
477 /* print out results */
480 int ttype, CState * state,
481 struct timeval * tv1, struct timeval * tv2,
482 struct timeval * tv3)
487 int normal_xacts = 0;
489 for (i = 0; i < nclients; i++)
490 normal_xacts += state[i].cnt;
492 t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
493 t1 = normal_xacts * 1000000.0 / t1;
495 t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
496 t2 = normal_xacts * 1000000.0 / t2;
498 printf("transaction type: %s\n", ttype == 0 ? "TPC-B (sort of)" : "SELECT only");
499 printf("scaling factor: %d\n", tps);
500 printf("number of clients: %d\n", nclients);
501 printf("number of transactions per client: %d\n", nxacts);
502 printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
503 printf("tps = %f(including connections establishing)\n", t1);
504 printf("tps = %f(excluding connections establishing)\n", t2);
508 main(int argc, char **argv)
518 int is_init_mode = 0; /* initialize mode? */
519 int is_no_vacuum = 0; /* no vacuum at all before
521 int is_full_vacuum = 0; /* do full vacuum before testing? */
522 int debug = 0; /* debug flag */
523 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT
526 static CState state[MAXCLIENTS]; /* clients status */
528 struct timeval tv1; /* start up time */
529 struct timeval tv2; /* after establishing all connections to
531 struct timeval tv3; /* end time */
536 int nsocks; /* return from select(2) */
537 int maxsock; /* max socket number to be waited */
547 while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:S")) != EOF)
573 nclients = atoi(optarg);
574 if (nclients <= 0 || nclients > MAXCLIENTS)
576 fprintf(stderr, "wrong number of clients: %d\n", nclients);
580 #ifdef RLIMIT_NOFILE /* most platform uses RLIMIT_NOFILE */
581 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
583 #else /* but BSD doesn't ... */
584 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
586 #endif /* HAVE_RLIMIT_NOFILE */
587 fprintf(stderr, "getrlimit failed. reason: %s\n", strerror(errno));
590 if (rlim.rlim_cur <= (nclients + 2))
592 fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
593 fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
596 #endif /* #ifndef __CYGWIN__ */
602 fprintf(stderr, "wrong scaling factor: %d\n", tps);
607 nxacts = atoi(optarg);
610 fprintf(stderr, "wrong number of transactions: %d\n", nxacts);
622 dbName = argv[optind];
625 dbName = getenv("USER");
632 init(pghost, pgport, dbName);
640 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
641 pghost, pgport, nclients, nxacts, dbName);
644 /* opening connection... */
645 con = PQsetdb(pghost, pgport, NULL, NULL, dbName);
646 if (PQstatus(con) == CONNECTION_BAD)
648 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
649 fprintf(stderr, "%s", PQerrorMessage(con));
654 * get the scaling factor that should be same as count(*) from
657 res = PQexec(con, "select count(*) from branches");
658 if (PQresultStatus(res) != PGRES_TUPLES_OK)
660 fprintf(stderr, "%s", PQerrorMessage(con));
663 tps = atoi(PQgetvalue(res, 0, 0));
666 fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
673 fprintf(stderr, "starting vacuum...");
674 res = PQexec(con, "vacuum branches");
675 if (PQresultStatus(res) != PGRES_COMMAND_OK)
677 fprintf(stderr, "%s", PQerrorMessage(con));
682 res = PQexec(con, "vacuum tellers");
683 if (PQresultStatus(res) != PGRES_COMMAND_OK)
685 fprintf(stderr, "%s", PQerrorMessage(con));
690 res = PQexec(con, "delete from history");
691 if (PQresultStatus(res) != PGRES_COMMAND_OK)
693 fprintf(stderr, "%s", PQerrorMessage(con));
697 res = PQexec(con, "vacuum history");
698 if (PQresultStatus(res) != PGRES_COMMAND_OK)
700 fprintf(stderr, "%s", PQerrorMessage(con));
705 fprintf(stderr, "end.\n");
709 fprintf(stderr, "starting full vacuum...");
710 res = PQexec(con, "vacuum analyze accounts");
711 if (PQresultStatus(res) != PGRES_COMMAND_OK)
713 fprintf(stderr, "%s", PQerrorMessage(con));
717 fprintf(stderr, "end.\n");
722 /* set random seed */
723 gettimeofday(&tv1, 0);
724 srand((uint) tv1.tv_usec);
726 /* get start up time */
727 gettimeofday(&tv1, 0);
729 /* make connections to the database */
730 for (i = 0; i < nclients; i++)
732 state[i].con = PQsetdb(pghost, pgport, NULL, NULL, dbName);
733 if (PQstatus(state[i].con) == CONNECTION_BAD)
735 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
736 fprintf(stderr, "%s", PQerrorMessage(state[i].con));
741 /* time after connections set up */
742 gettimeofday(&tv2, 0);
744 /* send start up quries in async manner */
745 for (i = 0; i < nclients; i++)
748 doOne(state, i, debug);
750 doSelectOnly(state, i, debug);
757 disconnect_all(state);
759 gettimeofday(&tv3, 0);
760 printResults(ttype, state, &tv1, &tv2, &tv3);
764 FD_ZERO(&input_mask);
767 for (i = 0; i < nclients; i++)
771 int sock = PQsocket(state[i].con);
775 fprintf(stderr, "Client %d: PQsock failed\n", i);
776 disconnect_all(state);
779 FD_SET(sock, &input_mask);
785 if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
786 (fd_set *) NULL, (struct timeval *) NULL)) < 0)
790 /* must be something wrong */
791 disconnect_all(state);
792 fprintf(stderr, "select failed: %s\n", strerror(errno));
795 else if (nsocks == 0)
797 fprintf(stderr, "select timeout\n");
798 for (i = 0; i < nclients; i++)
800 fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
801 i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
806 /* ok, backend returns reply */
807 for (i = 0; i < nclients; i++)
809 if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask))
812 doOne(state, i, debug);
814 doSelectOnly(state, i, debug);