2 * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.34 2004/10/25 02:15:01 tgl Exp $
4 * pgbench: a simple TPC-B like benchmark program for PostgreSQL
5 * written by Tatsuo Ishii
7 * Copyright (c) 2000-2004 Tatsuo Ishii
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
20 #include "postgres_fe.h"
36 #ifdef HAVE_SYS_SELECT_H
37 #include <sys/select.h>
41 #include <sys/resource.h>
52 /********************************************************************
53 * some configurable parameters */
55 #define MAXCLIENTS 1024 /* max number of clients allowed */
57 int nclients = 1; /* default number of simulated clients */
58 int nxacts = 10; /* default number of transactions per
62 * scaling factor. for example, tps = 10 will make 1000000 tuples of
68 * end of configurable parameters
69 *********************************************************************/
73 #define naccounts 100000
77 bool use_log; /* log transaction latencies to a file */
79 int remains; /* number of remaining clients */
81 int is_connect; /* establish connection for each
86 char *pgoptions = NULL;
94 PGconn *con; /* connection handle to DB */
95 int id; /* client No. */
96 int state; /* state No. */
97 int cnt; /* xacts count */
98 int ecnt; /* error count */
99 int listen; /* 0 indicates that an async query has
101 int aid; /* account id for this transaction */
102 int bid; /* branch id for this transaction */
103 int tid; /* teller id for this transaction */
106 struct timeval txn_begin; /* used for measuring latencies */
112 fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-l][-U login][-P password][-d][dbname]\n");
113 fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
116 /* random number generator */
118 getrand(int min, int max)
120 return (min + (int) (max * 1.0 * rand() / (RAND_MAX + 1.0)));
123 /* set up a connection to the backend */
130 con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
134 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
135 fprintf(stderr, "Memory allocatin problem?\n");
139 if (PQstatus(con) == CONNECTION_BAD)
141 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
143 if (PQerrorMessage(con))
144 fprintf(stderr, "%s", PQerrorMessage(con));
146 fprintf(stderr, "No explanation from the backend\n");
151 res = PQexec(con, "SET search_path = public");
152 if (PQresultStatus(res) != PGRES_COMMAND_OK)
154 fprintf(stderr, "%s", PQerrorMessage(con));
162 /* throw away response from backend */
164 discard_response(CState * state)
170 res = PQgetResult(state->con);
176 /* check to see if the SQL result was good */
178 check(CState * state, PGresult *res, int n, int good)
180 CState *st = &state[n];
182 if (res && PQresultStatus(res) != good)
184 fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con));
185 remains--; /* I've aborted */
193 /* process a transaction */
195 doOne(CState * state, int n, int debug, int ttype)
199 CState *st = &state[n];
202 { /* are we receiver? */
204 fprintf(stderr, "client %d receiving\n", n);
205 if (!PQconsumeInput(st->con))
206 { /* there's something wrong */
207 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
208 remains--; /* I've aborted */
213 if (PQisBusy(st->con))
214 return; /* don't have the whole result yet */
218 case 0: /* response to "begin" */
219 res = PQgetResult(st->con);
220 if (check(state, res, n, PGRES_COMMAND_OK))
223 discard_response(st);
225 case 1: /* response to "update accounts..." */
226 res = PQgetResult(st->con);
227 if (check(state, res, n, PGRES_COMMAND_OK))
230 discard_response(st);
232 case 2: /* response to "select abalance ..." */
233 res = PQgetResult(st->con);
234 if (check(state, res, n, PGRES_TUPLES_OK))
237 discard_response(st);
239 case 3: /* response to "update tellers ..." */
240 res = PQgetResult(st->con);
241 if (check(state, res, n, PGRES_COMMAND_OK))
244 discard_response(st);
246 case 4: /* response to "update branches ..." */
247 res = PQgetResult(st->con);
248 if (check(state, res, n, PGRES_COMMAND_OK))
251 discard_response(st);
253 case 5: /* response to "insert into history ..." */
254 res = PQgetResult(st->con);
255 if (check(state, res, n, PGRES_COMMAND_OK))
258 discard_response(st);
260 case 6: /* response to "end" */
263 * transaction finished: record the time it took in the
271 gettimeofday(&now, 0);
272 diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
273 (int) (now.tv_usec - st->txn_begin.tv_usec);
275 fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
278 res = PQgetResult(st->con);
279 if (check(state, res, n, PGRES_COMMAND_OK))
282 discard_response(st);
290 if (++st->cnt >= nxacts)
292 remains--; /* I'm done */
303 /* increment state counter */
311 if ((st->con = doConnect()) == NULL)
313 fprintf(stderr, "Client %d aborted in establishing connection.\n",
315 remains--; /* I've aborted */
324 case 0: /* about to start */
325 strcpy(sql, "begin");
326 st->aid = getrand(1, naccounts * tps);
327 st->bid = getrand(1, nbranches * tps);
328 st->tid = getrand(1, ntellers * tps);
329 st->delta = getrand(1, 1000);
331 gettimeofday(&(st->txn_begin), 0);
334 snprintf(sql, 256, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
337 snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
342 snprintf(sql, 256, "update tellers set tbalance = tbalance + %d where tid = %d\n",
349 snprintf(sql, 256, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
353 snprintf(sql, 256, "insert into history(tid,bid,aid,delta,mtime) values(%d,%d,%d,%d,'now')",
354 st->tid, st->bid, st->aid, st->delta);
362 fprintf(stderr, "client %d sending %s\n", n, sql);
363 if (PQsendQuery(st->con, sql) == 0)
366 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
371 st->listen++; /* flags that should be listened */
375 /* process a select only transaction */
377 doSelectOnly(CState * state, int n, int debug)
381 CState *st = &state[n];
384 { /* are we receiver? */
386 fprintf(stderr, "client %d receiving\n", n);
387 if (!PQconsumeInput(st->con))
388 { /* there's something wrong */
389 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
390 remains--; /* I've aborted */
395 if (PQisBusy(st->con))
396 return; /* don't have the whole result yet */
400 case 0: /* response to "select abalance ..." */
401 res = PQgetResult(st->con);
402 if (check(state, res, n, PGRES_TUPLES_OK))
405 discard_response(st);
413 if (++st->cnt >= nxacts)
415 remains--; /* I've done */
426 /* increment state counter */
434 if ((st->con = doConnect()) == NULL)
436 fprintf(stderr, "Client %d aborted in establishing connection.\n",
438 remains--; /* I've aborted */
448 st->aid = getrand(1, naccounts * tps);
449 snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
454 fprintf(stderr, "client %d sending %s\n", n, sql);
456 if (PQsendQuery(st->con, sql) == 0)
459 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
464 st->listen++; /* flags that should be listened */
468 /* discard connections */
470 disconnect_all(CState * state)
474 for (i = 0; i < nclients; i++)
477 PQfinish(state[i].con);
481 /* create tables and setup data */
487 static char *DDLs[] = {
488 "drop table branches",
489 "create table branches(bid int not null,bbalance int,filler char(88))",
490 "drop table tellers",
491 "create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
492 "drop table accounts",
493 "create table accounts(aid int not null,bid int,abalance int,filler char(84))",
494 "drop table history",
495 "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
496 static char *DDLAFTERs[] = {
497 "alter table branches add primary key (bid)",
498 "alter table tellers add primary key (tid)",
499 "alter table accounts add primary key (aid)"};
506 if ((con = doConnect()) == NULL)
509 for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
511 res = PQexec(con, DDLs[i]);
512 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
514 fprintf(stderr, "%s", PQerrorMessage(con));
520 res = PQexec(con, "begin");
521 if (PQresultStatus(res) != PGRES_COMMAND_OK)
523 fprintf(stderr, "%s", PQerrorMessage(con));
528 for (i = 0; i < nbranches * tps; i++)
530 snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
531 res = PQexec(con, sql);
532 if (PQresultStatus(res) != PGRES_COMMAND_OK)
534 fprintf(stderr, "%s", PQerrorMessage(con));
540 for (i = 0; i < ntellers * tps; i++)
542 snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
543 ,i + 1, i / ntellers + 1);
544 res = PQexec(con, sql);
545 if (PQresultStatus(res) != PGRES_COMMAND_OK)
547 fprintf(stderr, "%s", PQerrorMessage(con));
553 res = PQexec(con, "end");
554 if (PQresultStatus(res) != PGRES_COMMAND_OK)
556 fprintf(stderr, "%s", PQerrorMessage(con));
562 * occupy accounts table with some data
564 fprintf(stderr, "creating tables...\n");
565 for (i = 0; i < naccounts * tps; i++)
571 res = PQexec(con, "copy accounts from stdin");
572 if (PQresultStatus(res) != PGRES_COPY_IN)
574 fprintf(stderr, "%s", PQerrorMessage(con));
580 snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
581 if (PQputline(con, sql))
583 fprintf(stderr, "PQputline failed\n");
590 * every 10000 tuples, we commit the copy command. this should
591 * avoid generating too much WAL logs
593 fprintf(stderr, "%d tuples done.\n", j);
594 if (PQputline(con, "\\.\n"))
596 fprintf(stderr, "very last PQputline failed\n");
602 fprintf(stderr, "PQendcopy failed\n");
609 * do a checkpoint to purge the old WAL logs
611 res = PQexec(con, "checkpoint");
612 if (PQresultStatus(res) != PGRES_COMMAND_OK)
614 fprintf(stderr, "%s", PQerrorMessage(con));
618 #endif /* NOT_USED */
621 fprintf(stderr, "set primary key...\n");
622 for (i = 0; i < (sizeof(DDLAFTERs) / sizeof(char *)); i++)
624 res = PQexec(con, DDLAFTERs[i]);
625 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
627 fprintf(stderr, "%s", PQerrorMessage(con));
634 fprintf(stderr, "vacuum...");
635 res = PQexec(con, "vacuum analyze");
636 if (PQresultStatus(res) != PGRES_COMMAND_OK)
638 fprintf(stderr, "%s", PQerrorMessage(con));
642 fprintf(stderr, "done.\n");
647 /* print out results */
650 int ttype, CState * state,
651 struct timeval * tv1, struct timeval * tv2,
652 struct timeval * tv3)
657 int normal_xacts = 0;
660 for (i = 0; i < nclients; i++)
661 normal_xacts += state[i].cnt;
663 t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
664 t1 = normal_xacts * 1000000.0 / t1;
666 t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
667 t2 = normal_xacts * 1000000.0 / t2;
670 s = "TPC-B (sort of)";
672 s = "Update only accounts";
676 printf("transaction type: %s\n", s);
677 printf("scaling factor: %d\n", tps);
678 printf("number of clients: %d\n", nclients);
679 printf("number of transactions per client: %d\n", nxacts);
680 printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
681 printf("tps = %f (including connections establishing)\n", t1);
682 printf("tps = %f (excluding connections establishing)\n", t2);
687 main(int argc, char **argv)
690 int is_init_mode = 0; /* initialize mode? */
691 int is_no_vacuum = 0; /* no vacuum at all before
693 int is_full_vacuum = 0; /* do full vacuum before testing? */
694 int debug = 0; /* debug flag */
695 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT
696 * only, 2: skip update of branches and
699 static CState *state; /* status of clients */
701 struct timeval tv1; /* start up time */
702 struct timeval tv2; /* after establishing all connections to
704 struct timeval tv3; /* end time */
709 int nsocks; /* return from select(2) */
710 int maxsock; /* max socket number to be waited */
712 #if !(defined(__CYGWIN__) || defined(__MINGW32__))
720 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
722 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
724 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
727 while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1)
756 nclients = atoi(optarg);
757 if (nclients <= 0 || nclients > MAXCLIENTS)
759 fprintf(stderr, "invalid number of clients: %d\n", nclients);
762 #if !(defined(__CYGWIN__) || defined(__MINGW32__))
763 #ifdef RLIMIT_NOFILE /* most platform uses RLIMIT_NOFILE */
764 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
766 #else /* but BSD doesn't ... */
767 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
769 #endif /* HAVE_RLIMIT_NOFILE */
770 fprintf(stderr, "getrlimit failed. reason: %s\n", strerror(errno));
773 if (rlim.rlim_cur <= (nclients + 2))
775 fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
776 fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
779 #endif /* #if !(defined(__CYGWIN__) || defined(__MINGW32__)) */
788 fprintf(stderr, "invalid scaling factor: %d\n", tps);
793 nxacts = atoi(optarg);
796 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
817 dbName = argv[optind];
820 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
822 else if (login != NULL && *login != '\0')
836 state = (CState *) malloc(sizeof(*state) * nclients);
837 memset(state, 0, sizeof(*state) * nclients);
843 snprintf(logpath, 64, "pgbench_log.%d", getpid());
844 LOGFILE = fopen(logpath, "w");
848 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
855 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
856 pghost, pgport, nclients, nxacts, dbName);
859 /* opening connection... */
864 if (PQstatus(con) == CONNECTION_BAD)
866 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
867 fprintf(stderr, "%s", PQerrorMessage(con));
872 * get the scaling factor that should be same as count(*) from
875 res = PQexec(con, "select count(*) from branches");
876 if (PQresultStatus(res) != PGRES_TUPLES_OK)
878 fprintf(stderr, "%s", PQerrorMessage(con));
881 tps = atoi(PQgetvalue(res, 0, 0));
884 fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
891 fprintf(stderr, "starting vacuum...");
892 res = PQexec(con, "vacuum branches");
893 if (PQresultStatus(res) != PGRES_COMMAND_OK)
895 fprintf(stderr, "%s", PQerrorMessage(con));
900 res = PQexec(con, "vacuum tellers");
901 if (PQresultStatus(res) != PGRES_COMMAND_OK)
903 fprintf(stderr, "%s", PQerrorMessage(con));
908 res = PQexec(con, "delete from history");
909 if (PQresultStatus(res) != PGRES_COMMAND_OK)
911 fprintf(stderr, "%s", PQerrorMessage(con));
915 res = PQexec(con, "vacuum history");
916 if (PQresultStatus(res) != PGRES_COMMAND_OK)
918 fprintf(stderr, "%s", PQerrorMessage(con));
923 fprintf(stderr, "end.\n");
927 fprintf(stderr, "starting full vacuum...");
928 res = PQexec(con, "vacuum analyze accounts");
929 if (PQresultStatus(res) != PGRES_COMMAND_OK)
931 fprintf(stderr, "%s", PQerrorMessage(con));
935 fprintf(stderr, "end.\n");
940 /* set random seed */
941 gettimeofday(&tv1, 0);
942 srand((unsigned int) tv1.tv_usec);
944 /* get start up time */
945 gettimeofday(&tv1, 0);
949 /* make connections to the database */
950 for (i = 0; i < nclients; i++)
953 if ((state[i].con = doConnect()) == NULL)
958 /* time after connections set up */
959 gettimeofday(&tv2, 0);
961 /* send start up queries in async manner */
962 for (i = 0; i < nclients; i++)
964 if (ttype == 0 || ttype == 2)
965 doOne(state, i, debug, ttype);
967 doSelectOnly(state, i, debug);
974 disconnect_all(state);
976 gettimeofday(&tv3, 0);
977 printResults(ttype, state, &tv1, &tv2, &tv3);
983 FD_ZERO(&input_mask);
986 for (i = 0; i < nclients; i++)
990 int sock = PQsocket(state[i].con);
994 fprintf(stderr, "Client %d: PQsocket failed\n", i);
995 disconnect_all(state);
998 FD_SET(sock, &input_mask);
1004 if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
1005 (fd_set *) NULL, (struct timeval *) NULL)) < 0)
1009 /* must be something wrong */
1010 disconnect_all(state);
1011 fprintf(stderr, "select failed: %s\n", strerror(errno));
1014 else if (nsocks == 0)
1016 fprintf(stderr, "select timeout\n");
1017 for (i = 0; i < nclients; i++)
1019 fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
1020 i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
1025 /* ok, backend returns reply */
1026 for (i = 0; i < nclients; i++)
1028 if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask))
1030 if (ttype == 0 || ttype == 2)
1031 doOne(state, i, debug, ttype);
1032 else if (ttype == 1)
1033 doSelectOnly(state, i, debug);