]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
Enhance pgbench -l option to add timestamp. Patch contributed by Greg
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.64 2007/04/06 09:16:16 ishii Exp $
3  *
4  * pgbench: a simple benchmark program for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2000-2007      Tatsuo Ishii
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  */
20 #include "postgres_fe.h"
21
22 #include "libpq-fe.h"
23
24 #include <ctype.h>
25
26 #ifdef WIN32
27 #include "win32.h"
28 #else
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #ifdef HAVE_GETOPT_H
33 #include <getopt.h>
34 #endif
35
36 #ifdef HAVE_SYS_SELECT_H
37 #include <sys/select.h>
38 #endif
39
40 #ifdef HAVE_SYS_RESOURCE_H
41 #include <sys/resource.h>               /* for getrlimit */
42 #endif
43 #endif   /* ! WIN32 */
44
45 extern char *optarg;
46 extern int      optind;
47
48 #ifdef WIN32
49 #undef select
50 #endif
51
52
53 /********************************************************************
54  * some configurable parameters */
55
56 #define MAXCLIENTS 1024                 /* max number of clients allowed */
57
58 int                     nclients = 1;           /* default number of simulated clients */
59 int                     nxacts = 10;            /* default number of transactions per clients */
60
61 /*
62  * scaling factor. for example, scale = 10 will make 1000000 tuples of
63  * accounts table.
64  */
65 int                     scale = 1;
66
67 /*
68  * end of configurable parameters
69  *********************************************************************/
70
71 #define nbranches       1
72 #define ntellers        10
73 #define naccounts       100000
74
75 FILE       *LOGFILE = NULL;
76
77 bool            use_log;                        /* log transaction latencies to a file */
78
79 int                     remains;                        /* number of remaining clients */
80
81 int                     is_connect;                     /* establish connection  for each transaction */
82
83 char       *pghost = "";
84 char       *pgport = NULL;
85 char       *pgoptions = NULL;
86 char       *pgtty = NULL;
87 char       *login = NULL;
88 char       *pwd = NULL;
89 char       *dbName;
90
91 /* variable definitions */
92 typedef struct
93 {
94         char       *name;                       /* variable name */
95         char       *value;                      /* its value */
96 }       Variable;
97
98 /*
99  * structures used in custom query mode
100  */
101
102 typedef struct
103 {
104         PGconn     *con;                        /* connection handle to DB */
105         int                     id;                             /* client No. */
106         int                     state;                  /* state No. */
107         int                     cnt;                    /* xacts count */
108         int                     ecnt;                   /* error count */
109         int                     listen;                 /* 0 indicates that an async query has been
110                                                                  * sent */
111         Variable   *variables;          /* array of variable definitions */
112         int                     nvariables;
113         struct timeval txn_begin;       /* used for measuring latencies */
114         int                     use_file;               /* index in sql_files for this client */
115 }       CState;
116
117 /*
118  * queries read from files
119  */
120 #define SQL_COMMAND             1
121 #define META_COMMAND    2
122 #define MAX_ARGS                10
123
124 typedef struct
125 {
126         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
127         int                     argc;                   /* number of commands */
128         char       *argv[MAX_ARGS]; /* command list */
129 }       Command;
130
131 #define MAX_FILES               128             /* max number of SQL script files allowed */
132
133 Command   **sql_files[MAX_FILES];               /* SQL script files */
134 int                     num_files;                      /* its number */
135
136 /* default scenario */
137 static char *tpc_b = {
138         "\\set nbranches :scale\n"
139         "\\set ntellers 10 * :scale\n"
140         "\\set naccounts 100000 * :scale\n"
141         "\\setrandom aid 1 :naccounts\n"
142         "\\setrandom bid 1 :nbranches\n"
143         "\\setrandom tid 1 :ntellers\n"
144         "\\setrandom delta -5000 5000\n"
145         "BEGIN;\n"
146         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
147         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
148         "UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
149         "UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
150         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
151         "END;\n"
152 };
153
154 /* -N case */
155 static char *simple_update = {
156         "\\set nbranches :scale\n"
157         "\\set ntellers 10 * :scale\n"
158         "\\set naccounts 100000 * :scale\n"
159         "\\setrandom aid 1 :naccounts\n"
160         "\\setrandom bid 1 :nbranches\n"
161         "\\setrandom tid 1 :ntellers\n"
162         "\\setrandom delta -5000 5000\n"
163         "BEGIN;\n"
164         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
165         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
166         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
167         "END;\n"
168 };
169
170 /* -S case */
171 static char *select_only = {
172         "\\set naccounts 100000 * :scale\n"
173         "\\setrandom aid 1 :naccounts\n"
174         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
175 };
176
177 static void
178 usage(void)
179 {
180         fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-D varname=value][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
181         fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
182 }
183
184 /* random number generator */
185 static int
186 getrand(int min, int max)
187 {
188         return min + (int) (((max - min) * (double) random()) / MAX_RANDOM_VALUE + 0.5);
189 }
190
191 /* call PQexec() and exit() on failure */
192 static void
193 executeStatement(PGconn *con, const char* sql)
194 {
195         PGresult   *res;
196
197         res = PQexec(con, sql);
198         if (PQresultStatus(res) != PGRES_COMMAND_OK)
199         {
200                 fprintf(stderr, "%s", PQerrorMessage(con));
201                 exit(1);
202         }
203         PQclear(res);
204 }
205
206 /* set up a connection to the backend */
207 static PGconn *
208 doConnect(void)
209 {
210         PGconn     *con;
211
212         con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
213                                            login, pwd);
214         if (con == NULL)
215         {
216                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
217                 fprintf(stderr, "Memory allocatin problem?\n");
218                 return (NULL);
219         }
220
221         if (PQstatus(con) == CONNECTION_BAD)
222         {
223                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
224
225                 if (PQerrorMessage(con))
226                         fprintf(stderr, "%s", PQerrorMessage(con));
227                 else
228                         fprintf(stderr, "No explanation from the backend\n");
229
230                 return (NULL);
231         }
232
233         executeStatement(con, "SET search_path = public");
234
235         return (con);
236 }
237
238 /* throw away response from backend */
239 static void
240 discard_response(CState * state)
241 {
242         PGresult   *res;
243
244         do
245         {
246                 res = PQgetResult(state->con);
247                 if (res)
248                         PQclear(res);
249         } while (res);
250 }
251
252 /* check to see if the SQL result was good */
253 static int
254 check(CState *state, PGresult *res, int n)
255 {
256         CState     *st = &state[n];
257
258         switch (PQresultStatus(res))
259         {
260                 case PGRES_COMMAND_OK:
261                 case PGRES_TUPLES_OK:
262                         /* OK */
263                         break;
264                 default:
265                         fprintf(stderr, "Client %d aborted in state %d: %s",
266                                         n, st->state, PQerrorMessage(st->con));
267                         remains--;                              /* I've aborted */
268                         PQfinish(st->con);
269                         st->con = NULL;
270                         return (-1);
271         }
272         return (0);                                     /* OK */
273 }
274
275 static int
276 compareVariables(const void *v1, const void *v2)
277 {
278         return strcmp(((const Variable *) v1)->name,
279                                   ((const Variable *) v2)->name);
280 }
281
282 static char *
283 getVariable(CState * st, char *name)
284 {
285         Variable        key,
286                            *var;
287
288         /* On some versions of Solaris, bsearch of zero items dumps core */
289         if (st->nvariables <= 0)
290                 return NULL;
291
292         key.name = name;
293         var = (Variable *) bsearch((void *) &key,
294                                                            (void *) st->variables,
295                                                            st->nvariables,
296                                                            sizeof(Variable),
297                                                            compareVariables);
298         if (var != NULL)
299                 return var->value;
300         else
301                 return NULL;
302 }
303
304 static int
305 putVariable(CState * st, char *name, char *value)
306 {
307         Variable        key,
308                            *var;
309
310         key.name = name;
311         /* On some versions of Solaris, bsearch of zero items dumps core */
312         if (st->nvariables > 0)
313                 var = (Variable *) bsearch((void *) &key,
314                                                                    (void *) st->variables,
315                                                                    st->nvariables,
316                                                                    sizeof(Variable),
317                                                                    compareVariables);
318         else
319                 var = NULL;
320
321         if (var == NULL)
322         {
323                 Variable   *newvars;
324
325                 if (st->variables)
326                         newvars = (Variable *) realloc(st->variables,
327                                                                         (st->nvariables + 1) * sizeof(Variable));
328                 else
329                         newvars = (Variable *) malloc(sizeof(Variable));
330
331                 if (newvars == NULL)
332                         return false;
333
334                 st->variables = newvars;
335
336                 var = &newvars[st->nvariables];
337
338                 var->name = NULL;
339                 var->value = NULL;
340
341                 if ((var->name = strdup(name)) == NULL
342                         || (var->value = strdup(value)) == NULL)
343                 {
344                         free(var->name);
345                         free(var->value);
346                         return false;
347                 }
348
349                 st->nvariables++;
350
351                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
352                           compareVariables);
353         }
354         else
355         {
356                 char       *val;
357
358                 if ((val = strdup(value)) == NULL)
359                         return false;
360
361                 free(var->value);
362                 var->value = val;
363         }
364
365         return true;
366 }
367
368 static char *
369 assignVariables(CState * st, char *sql)
370 {
371         int                     i,
372                                 j;
373         char       *p,
374                            *name,
375                            *val;
376         void       *tmp;
377
378         i = 0;
379         while ((p = strchr(&sql[i], ':')) != NULL)
380         {
381                 i = j = p - sql;
382                 do
383                 {
384                         i++;
385                 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
386                 if (i == j + 1)
387                         continue;
388
389                 name = malloc(i - j);
390                 if (name == NULL)
391                         return NULL;
392                 memcpy(name, &sql[j + 1], i - (j + 1));
393                 name[i - (j + 1)] = '\0';
394                 val = getVariable(st, name);
395                 free(name);
396                 if (val == NULL)
397                         continue;
398
399                 if (strlen(val) > i - j)
400                 {
401                         tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
402                         if (tmp == NULL)
403                         {
404                                 free(sql);
405                                 return NULL;
406                         }
407                         sql = tmp;
408                 }
409
410                 if (strlen(val) != i - j)
411                         memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
412
413                 strncpy(&sql[j], val, strlen(val));
414
415                 if (strlen(val) < i - j)
416                 {
417                         tmp = realloc(sql, strlen(sql) + 1);
418                         if (tmp == NULL)
419                         {
420                                 free(sql);
421                                 return NULL;
422                         }
423                         sql = tmp;
424                 }
425
426                 i = j + strlen(val);
427         }
428
429         return sql;
430 }
431
432 static void
433 doCustom(CState * state, int n, int debug)
434 {
435         PGresult   *res;
436         CState     *st = &state[n];
437         Command   **commands;
438
439 top:
440         commands = sql_files[st->use_file];
441
442         if (st->listen)
443         {                                                       /* are we receiver? */
444                 if (commands[st->state]->type == SQL_COMMAND)
445                 {
446                         if (debug)
447                                 fprintf(stderr, "client %d receiving\n", n);
448                         if (!PQconsumeInput(st->con))
449                         {                                       /* there's something wrong */
450                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
451                                 remains--;              /* I've aborted */
452                                 PQfinish(st->con);
453                                 st->con = NULL;
454                                 return;
455                         }
456                         if (PQisBusy(st->con))
457                                 return;                 /* don't have the whole result yet */
458                 }
459
460                 /*
461                  * transaction finished: record the time it took in the log
462                  */
463                 if (use_log && commands[st->state + 1] == NULL)
464                 {
465                         double          diff;
466                         struct timeval now;
467
468                         gettimeofday(&now, NULL);
469                         diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
470                                 (int) (now.tv_usec - st->txn_begin.tv_usec);
471
472                         fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
473                                 st->id, st->cnt, diff, st->use_file, now.tv_sec,now.tv_usec);
474                 }
475
476                 if (commands[st->state]->type == SQL_COMMAND)
477                 {
478                         res = PQgetResult(st->con);
479                         if (check(state, res, n))
480                         {
481                                 PQclear(res);
482                                 return;
483                         }
484                         PQclear(res);
485                         discard_response(st);
486                 }
487
488                 if (commands[st->state + 1] == NULL)
489                 {
490                         if (is_connect)
491                         {
492                                 PQfinish(st->con);
493                                 st->con = NULL;
494                         }
495
496                         if (++st->cnt >= nxacts)
497                         {
498                                 remains--;              /* I've done */
499                                 if (st->con != NULL)
500                                 {
501                                         PQfinish(st->con);
502                                         st->con = NULL;
503                                 }
504                                 return;
505                         }
506                 }
507
508                 /* increment state counter */
509                 st->state++;
510                 if (commands[st->state] == NULL)
511                 {
512                         st->state = 0;
513                         st->use_file = getrand(0, num_files - 1);
514                         commands = sql_files[st->use_file];
515                 }
516         }
517
518         if (st->con == NULL)
519         {
520                 if ((st->con = doConnect()) == NULL)
521                 {
522                         fprintf(stderr, "Client %d aborted in establishing connection.\n",
523                                         n);
524                         remains--;                      /* I've aborted */
525                         PQfinish(st->con);
526                         st->con = NULL;
527                         return;
528                 }
529         }
530
531         if (use_log && st->state == 0)
532                 gettimeofday(&(st->txn_begin), NULL);
533
534         if (commands[st->state]->type == SQL_COMMAND)
535         {
536                 char       *sql;
537
538                 if ((sql = strdup(commands[st->state]->argv[0])) == NULL
539                         || (sql = assignVariables(st, sql)) == NULL)
540                 {
541                         fprintf(stderr, "out of memory\n");
542                         st->ecnt++;
543                         return;
544                 }
545
546                 if (debug)
547                         fprintf(stderr, "client %d sending %s\n", n, sql);
548                 if (PQsendQuery(st->con, sql) == 0)
549                 {
550                         if (debug)
551                                 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
552                         st->ecnt++;
553                 }
554                 else
555                 {
556                         st->listen = 1;         /* flags that should be listened */
557                 }
558                 free(sql);
559         }
560         else if (commands[st->state]->type == META_COMMAND)
561         {
562                 int                     argc = commands[st->state]->argc,
563                                         i;
564                 char      **argv = commands[st->state]->argv;
565
566                 if (debug)
567                 {
568                         fprintf(stderr, "client %d executing \\%s", n, argv[0]);
569                         for (i = 1; i < argc; i++)
570                                 fprintf(stderr, " %s", argv[i]);
571                         fprintf(stderr, "\n");
572                 }
573
574                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
575                 {
576                         char       *var;
577                         int                     min,
578                                                 max;
579                         char            res[64];
580
581                         if (*argv[2] == ':')
582                         {
583                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
584                                 {
585                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
586                                         st->ecnt++;
587                                         return;
588                                 }
589                                 min = atoi(var);
590                         }
591                         else
592                                 min = atoi(argv[2]);
593
594 #ifdef NOT_USED
595                         if (min < 0)
596                         {
597                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
598                                 st->ecnt++;
599                                 return;
600                         }
601 #endif
602
603                         if (*argv[3] == ':')
604                         {
605                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
606                                 {
607                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
608                                         st->ecnt++;
609                                         return;
610                                 }
611                                 max = atoi(var);
612                         }
613                         else
614                                 max = atoi(argv[3]);
615
616                         if (max < min || max > MAX_RANDOM_VALUE)
617                         {
618                                 fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
619                                 st->ecnt++;
620                                 return;
621                         }
622
623 #ifdef DEBUG
624                         printf("min: %d max: %d random: %d\n", min, max, getrand(min, max));
625 #endif
626                         snprintf(res, sizeof(res), "%d", getrand(min, max));
627
628                         if (putVariable(st, argv[1], res) == false)
629                         {
630                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
631                                 st->ecnt++;
632                                 return;
633                         }
634
635                         st->listen = 1;
636                 }
637                 else if (pg_strcasecmp(argv[0], "set") == 0)
638                 {
639                         char       *var;
640                         int                     ope1,
641                                                 ope2;
642                         char            res[64];
643
644                         if (*argv[2] == ':')
645                         {
646                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
647                                 {
648                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
649                                         st->ecnt++;
650                                         return;
651                                 }
652                                 ope1 = atoi(var);
653                         }
654                         else
655                                 ope1 = atoi(argv[2]);
656
657                         if (argc < 5)
658                                 snprintf(res, sizeof(res), "%d", ope1);
659                         else
660                         {
661                                 if (*argv[4] == ':')
662                                 {
663                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
664                                         {
665                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
666                                                 st->ecnt++;
667                                                 return;
668                                         }
669                                         ope2 = atoi(var);
670                                 }
671                                 else
672                                         ope2 = atoi(argv[4]);
673
674                                 if (strcmp(argv[3], "+") == 0)
675                                         snprintf(res, sizeof(res), "%d", ope1 + ope2);
676                                 else if (strcmp(argv[3], "-") == 0)
677                                         snprintf(res, sizeof(res), "%d", ope1 - ope2);
678                                 else if (strcmp(argv[3], "*") == 0)
679                                         snprintf(res, sizeof(res), "%d", ope1 * ope2);
680                                 else if (strcmp(argv[3], "/") == 0)
681                                 {
682                                         if (ope2 == 0)
683                                         {
684                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
685                                                 st->ecnt++;
686                                                 return;
687                                         }
688                                         snprintf(res, sizeof(res), "%d", ope1 / ope2);
689                                 }
690                                 else
691                                 {
692                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
693                                         st->ecnt++;
694                                         return;
695                                 }
696                         }
697
698                         if (putVariable(st, argv[1], res) == false)
699                         {
700                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
701                                 st->ecnt++;
702                                 return;
703                         }
704
705                         st->listen = 1;
706                 }
707
708                 goto top;
709         }
710 }
711
712 /* discard connections */
713 static void
714 disconnect_all(CState * state)
715 {
716         int                     i;
717
718         for (i = 0; i < nclients; i++)
719         {
720                 if (state[i].con)
721                         PQfinish(state[i].con);
722         }
723 }
724
725 /* create tables and setup data */
726 static void
727 init(void)
728 {
729         PGconn     *con;
730         PGresult   *res;
731         static char *DDLs[] = {
732                 "drop table if exists branches",
733                 "create table branches(bid int not null,bbalance int,filler char(88))",
734                 "drop table if exists tellers",
735                 "create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
736                 "drop table if exists accounts",
737                 "create table accounts(aid int not null,bid int,abalance int,filler char(84))",
738                 "drop table if exists history",
739                 "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
740         static char *DDLAFTERs[] = {
741                 "alter table branches add primary key (bid)",
742                 "alter table tellers add primary key (tid)",
743                 "alter table accounts add primary key (aid)"};
744
745
746         char            sql[256];
747
748         int                     i;
749
750         if ((con = doConnect()) == NULL)
751                 exit(1);
752
753         for (i = 0; i < lengthof(DDLs); i++)
754                 executeStatement(con, DDLs[i]);
755
756         executeStatement(con, "begin");
757
758         for (i = 0; i < nbranches * scale; i++)
759         {
760                 snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
761                 executeStatement(con, sql);
762         }
763
764         for (i = 0; i < ntellers * scale; i++)
765         {
766                 snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
767                                  ,i + 1, i / ntellers + 1);
768                 executeStatement(con, sql);
769         }
770
771         executeStatement(con, "commit");
772
773         /*
774          * fill the accounts table with some data
775          */
776         fprintf(stderr, "creating tables...\n");
777
778         executeStatement(con, "begin");
779         executeStatement(con, "truncate accounts");
780
781         res = PQexec(con, "copy accounts from stdin");
782         if (PQresultStatus(res) != PGRES_COPY_IN)
783         {
784                 fprintf(stderr, "%s", PQerrorMessage(con));
785                 exit(1);
786         }
787         PQclear(res);
788
789         for (i = 0; i < naccounts * scale; i++)
790         {
791                 int                     j = i + 1;
792
793                 snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
794                 if (PQputline(con, sql))
795                 {
796                         fprintf(stderr, "PQputline failed\n");
797                         exit(1);
798                 }
799
800                 if (j % 10000 == 0)
801                         fprintf(stderr, "%d tuples done.\n", j);
802         }
803         if (PQputline(con, "\\.\n"))
804         {
805                 fprintf(stderr, "very last PQputline failed\n");
806                 exit(1);
807         }
808         if (PQendcopy(con))
809         {
810                 fprintf(stderr, "PQendcopy failed\n");
811                 exit(1);
812         }
813         executeStatement(con, "commit");
814
815         /*
816          * create indexes
817          */
818         fprintf(stderr, "set primary key...\n");
819         for (i = 0; i < lengthof(DDLAFTERs); i++)
820                 executeStatement(con, DDLAFTERs[i]);
821
822         /* vacuum */
823         fprintf(stderr, "vacuum...");
824         executeStatement(con, "vacuum analyze");
825
826         fprintf(stderr, "done.\n");
827         PQfinish(con);
828 }
829
830 static Command *
831 process_commands(char *buf)
832 {
833         const char      delim[] = " \f\n\r\t\v";
834
835         Command    *my_commands;
836         int                     j;
837         char       *p,
838                            *tok;
839
840         if ((p = strchr(buf, '\n')) != NULL)
841                 *p = '\0';
842
843         p = buf;
844         while (isspace((unsigned char) *p))
845                 p++;
846
847         if (*p == '\0' || strncmp(p, "--", 2) == 0)
848         {
849                 return NULL;
850         }
851
852         my_commands = (Command *) malloc(sizeof(Command));
853         if (my_commands == NULL)
854         {
855                 return NULL;
856         }
857
858         my_commands->argc = 0;
859
860         if (*p == '\\')
861         {
862                 my_commands->type = META_COMMAND;
863
864                 j = 0;
865                 tok = strtok(++p, delim);
866
867                 while (tok != NULL)
868                 {
869                         if ((my_commands->argv[j] = strdup(tok)) == NULL)
870                                 return NULL;
871
872                         my_commands->argc++;
873
874                         j++;
875                         tok = strtok(NULL, delim);
876                 }
877
878                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
879                 {
880                         if (my_commands->argc < 4)
881                         {
882                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
883                                 return NULL;
884                         }
885
886                         for (j = 4; j < my_commands->argc; j++)
887                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
888                                                 my_commands->argv[0], my_commands->argv[j]);
889                 }
890                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
891                 {
892                         if (my_commands->argc < 3)
893                         {
894                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
895                                 return NULL;
896                         }
897
898                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
899                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
900                                                 my_commands->argv[0], my_commands->argv[j]);
901                 }
902                 else
903                 {
904                         fprintf(stderr, "invalid command %s\n", my_commands->argv[0]);
905                         return NULL;
906                 }
907         }
908         else
909         {
910                 my_commands->type = SQL_COMMAND;
911
912                 if ((my_commands->argv[0] = strdup(p)) == NULL)
913                         return NULL;
914
915                 my_commands->argc++;
916         }
917
918         return my_commands;
919 }
920
921 static int
922 process_file(char *filename)
923 {
924 #define COMMANDS_ALLOC_NUM 128
925
926         Command   **my_commands;
927         FILE       *fd;
928         int                     lineno;
929         char            buf[BUFSIZ];
930         int                     alloc_num;
931
932         if (num_files >= MAX_FILES)
933         {
934                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
935                 exit(1);
936         }
937
938         alloc_num = COMMANDS_ALLOC_NUM;
939         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
940         if (my_commands == NULL)
941                 return false;
942
943         if (strcmp(filename, "-") == 0)
944                 fd = stdin;
945         else if ((fd = fopen(filename, "r")) == NULL)
946         {
947                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
948                 return false;
949         }
950
951         lineno = 0;
952
953         while (fgets(buf, sizeof(buf), fd) != NULL)
954         {
955                 Command    *commands;
956                 int                     i;
957
958                 i = 0;
959                 while (isspace((unsigned char) buf[i]))
960                         i++;
961
962                 if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0)
963                 {
964                         commands = process_commands(&buf[i]);
965                         if (commands == NULL)
966                         {
967                                 fclose(fd);
968                                 return false;
969                         }
970                 }
971                 else
972                         continue;
973
974                 my_commands[lineno] = commands;
975                 lineno++;
976
977                 if (lineno >= alloc_num)
978                 {
979                         alloc_num += COMMANDS_ALLOC_NUM;
980                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
981                         if (my_commands == NULL)
982                         {
983                                 fclose(fd);
984                                 return false;
985                         }
986                 }
987         }
988         fclose(fd);
989
990         my_commands[lineno] = NULL;
991
992         sql_files[num_files++] = my_commands;
993
994         return true;
995 }
996
997 static Command **
998 process_builtin(char *tb)
999 {
1000 #define COMMANDS_ALLOC_NUM 128
1001
1002         Command   **my_commands;
1003         int                     lineno;
1004         char            buf[BUFSIZ];
1005         int                     alloc_num;
1006
1007         if (*tb == '\0')
1008                 return NULL;
1009
1010         alloc_num = COMMANDS_ALLOC_NUM;
1011         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
1012         if (my_commands == NULL)
1013                 return NULL;
1014
1015         lineno = 0;
1016
1017         for (;;)
1018         {
1019                 char       *p;
1020                 Command    *commands;
1021
1022                 p = buf;
1023                 while (*tb && *tb != '\n')
1024                         *p++ = *tb++;
1025
1026                 if (*tb == '\0')
1027                         break;
1028
1029                 if (*tb == '\n')
1030                         tb++;
1031
1032                 *p = '\0';
1033
1034                 commands = process_commands(buf);
1035                 if (commands == NULL)
1036                 {
1037                         return NULL;
1038                 }
1039
1040                 my_commands[lineno] = commands;
1041                 lineno++;
1042
1043                 if (lineno >= alloc_num)
1044                 {
1045                         alloc_num += COMMANDS_ALLOC_NUM;
1046                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1047                         if (my_commands == NULL)
1048                         {
1049                                 return NULL;
1050                         }
1051                 }
1052         }
1053
1054         my_commands[lineno] = NULL;
1055
1056         return my_commands;
1057 }
1058
1059 /* print out results */
1060 static void
1061 printResults(
1062                          int ttype, CState * state,
1063                          struct timeval * tv1, struct timeval * tv2,
1064                          struct timeval * tv3)
1065 {
1066         double          t1,
1067                                 t2;
1068         int                     i;
1069         int                     normal_xacts = 0;
1070         char       *s;
1071
1072         for (i = 0; i < nclients; i++)
1073                 normal_xacts += state[i].cnt;
1074
1075         t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
1076         t1 = normal_xacts * 1000000.0 / t1;
1077
1078         t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
1079         t2 = normal_xacts * 1000000.0 / t2;
1080
1081         if (ttype == 0)
1082                 s = "TPC-B (sort of)";
1083         else if (ttype == 2)
1084                 s = "Update only accounts";
1085         else if (ttype == 1)
1086                 s = "SELECT only";
1087         else
1088                 s = "Custom query";
1089
1090         printf("transaction type: %s\n", s);
1091         printf("scaling factor: %d\n", scale);
1092         printf("number of clients: %d\n", nclients);
1093         printf("number of transactions per client: %d\n", nxacts);
1094         printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
1095         printf("tps = %f (including connections establishing)\n", t1);
1096         printf("tps = %f (excluding connections establishing)\n", t2);
1097 }
1098
1099
1100 int
1101 main(int argc, char **argv)
1102 {
1103         int                     c;
1104         int                     is_init_mode = 0;               /* initialize mode? */
1105         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
1106         int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
1107         int                     debug = 0;              /* debug flag */
1108         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
1109                                                                  * 2: skip update of branches and tellers */
1110         char       *filename = NULL;
1111
1112         CState     *state;                      /* status of clients */
1113
1114         struct timeval tv1;                     /* start up time */
1115         struct timeval tv2;                     /* after establishing all connections to the
1116                                                                  * backend */
1117         struct timeval tv3;                     /* end time */
1118
1119         int                     i;
1120
1121         fd_set          input_mask;
1122         int                     nsocks;                 /* return from select(2) */
1123         int                     maxsock;                /* max socket number to be waited */
1124
1125 #ifdef HAVE_GETRLIMIT
1126         struct rlimit rlim;
1127 #endif
1128
1129         PGconn     *con;
1130         PGresult   *res;
1131         char       *env;
1132
1133         char            val[64];
1134
1135 #ifdef WIN32
1136         /* stderr is buffered on Win32. */
1137         setvbuf(stderr, NULL, _IONBF, 0);
1138 #endif
1139
1140         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
1141                 pghost = env;
1142         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
1143                 pgport = env;
1144         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
1145                 login = env;
1146
1147         state = (CState *) malloc(sizeof(CState));
1148         if (state == NULL)
1149         {
1150                 fprintf(stderr, "Couldn't allocate memory for state\n");
1151                 exit(1);
1152         }
1153
1154         memset(state, 0, sizeof(*state));
1155
1156         while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:D:")) != -1)
1157         {
1158                 switch (c)
1159                 {
1160                         case 'i':
1161                                 is_init_mode++;
1162                                 break;
1163                         case 'h':
1164                                 pghost = optarg;
1165                                 break;
1166                         case 'n':
1167                                 is_no_vacuum++;
1168                                 break;
1169                         case 'v':
1170                                 do_vacuum_accounts++;
1171                                 break;
1172                         case 'p':
1173                                 pgport = optarg;
1174                                 break;
1175                         case 'd':
1176                                 debug++;
1177                                 break;
1178                         case 'S':
1179                                 ttype = 1;
1180                                 break;
1181                         case 'N':
1182                                 ttype = 2;
1183                                 break;
1184                         case 'c':
1185                                 nclients = atoi(optarg);
1186                                 if (nclients <= 0 || nclients > MAXCLIENTS)
1187                                 {
1188                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
1189                                         exit(1);
1190                                 }
1191 #ifdef HAVE_GETRLIMIT
1192 #ifdef RLIMIT_NOFILE                    /* most platforms use RLIMIT_NOFILE */
1193                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
1194 #else                                                   /* but BSD doesn't ... */
1195                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
1196 #endif   /* RLIMIT_NOFILE */
1197                                 {
1198                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
1199                                         exit(1);
1200                                 }
1201                                 if (rlim.rlim_cur <= (nclients + 2))
1202                                 {
1203                                         fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
1204                                         fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
1205                                         exit(1);
1206                                 }
1207 #endif /* HAVE_GETRLIMIT */
1208                                 break;
1209                         case 'C':
1210                                 is_connect = 1;
1211                                 break;
1212                         case 's':
1213                                 scale = atoi(optarg);
1214                                 if (scale <= 0)
1215                                 {
1216                                         fprintf(stderr, "invalid scaling factor: %d\n", scale);
1217                                         exit(1);
1218                                 }
1219                                 break;
1220                         case 't':
1221                                 nxacts = atoi(optarg);
1222                                 if (nxacts <= 0)
1223                                 {
1224                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
1225                                         exit(1);
1226                                 }
1227                                 break;
1228                         case 'U':
1229                                 login = optarg;
1230                                 break;
1231                         case 'P':
1232                                 pwd = optarg;
1233                                 break;
1234                         case 'l':
1235                                 use_log = true;
1236                                 break;
1237                         case 'f':
1238                                 ttype = 3;
1239                                 filename = optarg;
1240                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
1241                                         exit(1);
1242                                 break;
1243                         case 'D':
1244                                 {
1245                                         char       *p;
1246
1247                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
1248                                         {
1249                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
1250                                                 exit(1);
1251                                         }
1252
1253                                         *p++ = '\0';
1254                                         if (putVariable(&state[0], optarg, p) == false)
1255                                         {
1256                                                 fprintf(stderr, "Couldn't allocate memory for variable\n");
1257                                                 exit(1);
1258                                         }
1259                                 }
1260                                 break;
1261                         default:
1262                                 usage();
1263                                 exit(1);
1264                                 break;
1265                 }
1266         }
1267
1268         if (argc > optind)
1269                 dbName = argv[optind];
1270         else
1271         {
1272                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
1273                         dbName = env;
1274                 else if (login != NULL && *login != '\0')
1275                         dbName = login;
1276                 else
1277                         dbName = "";
1278         }
1279
1280         if (is_init_mode)
1281         {
1282                 init();
1283                 exit(0);
1284         }
1285
1286         remains = nclients;
1287
1288         if (getVariable(&state[0], "scale") == NULL)
1289         {
1290                 snprintf(val, sizeof(val), "%d", scale);
1291                 if (putVariable(&state[0], "scale", val) == false)
1292                 {
1293                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1294                         exit(1);
1295                 }
1296         }
1297
1298         if (nclients > 1)
1299         {
1300                 state = (CState *) realloc(state, sizeof(CState) * nclients);
1301                 if (state == NULL)
1302                 {
1303                         fprintf(stderr, "Couldn't allocate memory for state\n");
1304                         exit(1);
1305                 }
1306
1307                 memset(state + 1, 0, sizeof(*state) * (nclients - 1));
1308
1309                 snprintf(val, sizeof(val), "%d", scale);
1310
1311                 for (i = 1; i < nclients; i++)
1312                 {
1313                         int                     j;
1314
1315                         for (j = 0; j < state[0].nvariables; j++)
1316                         {
1317                                 if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
1318                                 {
1319                                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1320                                         exit(1);
1321                                 }
1322                         }
1323
1324                         if (putVariable(&state[i], "scale", val) == false)
1325                         {
1326                                 fprintf(stderr, "Couldn't allocate memory for variable\n");
1327                                 exit(1);
1328                         }
1329                 }
1330         }
1331
1332         if (use_log)
1333         {
1334                 char            logpath[64];
1335
1336                 snprintf(logpath, 64, "pgbench_log.%d", getpid());
1337                 LOGFILE = fopen(logpath, "w");
1338
1339                 if (LOGFILE == NULL)
1340                 {
1341                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
1342                         exit(1);
1343                 }
1344         }
1345
1346         if (debug)
1347         {
1348                 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
1349                            pghost, pgport, nclients, nxacts, dbName);
1350         }
1351
1352         /* opening connection... */
1353         con = doConnect();
1354         if (con == NULL)
1355                 exit(1);
1356
1357         if (PQstatus(con) == CONNECTION_BAD)
1358         {
1359                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
1360                 fprintf(stderr, "%s", PQerrorMessage(con));
1361                 exit(1);
1362         }
1363
1364         if (ttype != 3)
1365         {
1366                 /*
1367                  * get the scaling factor that should be same as count(*) from
1368                  * branches if this is not a custom query
1369                  */
1370                 res = PQexec(con, "select count(*) from branches");
1371                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1372                 {
1373                         fprintf(stderr, "%s", PQerrorMessage(con));
1374                         exit(1);
1375                 }
1376                 scale = atoi(PQgetvalue(res, 0, 0));
1377                 if (scale < 0)
1378                 {
1379                         fprintf(stderr, "count(*) from branches invalid (%d)\n", scale);
1380                         exit(1);
1381                 }
1382                 PQclear(res);
1383
1384                 snprintf(val, sizeof(val), "%d", scale);
1385                 if (putVariable(&state[0], "scale", val) == false)
1386                 {
1387                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1388                         exit(1);
1389                 }
1390
1391                 if (nclients > 1)
1392                 {
1393                         for (i = 1; i < nclients; i++)
1394                         {
1395                                 if (putVariable(&state[i], "scale", val) == false)
1396                                 {
1397                                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1398                                         exit(1);
1399                                 }
1400                         }
1401                 }
1402         }
1403
1404         if (!is_no_vacuum)
1405         {
1406                 fprintf(stderr, "starting vacuum...");
1407                 executeStatement(con, "vacuum branches");
1408                 executeStatement(con, "vacuum tellers");
1409                 executeStatement(con, "delete from history");
1410                 executeStatement(con, "vacuum history");
1411                 fprintf(stderr, "end.\n");
1412
1413                 if (do_vacuum_accounts)
1414                 {
1415                         fprintf(stderr, "starting vacuum accounts...");
1416                         executeStatement(con, "vacuum analyze accounts");
1417                         fprintf(stderr, "end.\n");
1418                 }
1419         }
1420         PQfinish(con);
1421
1422         /* set random seed */
1423         gettimeofday(&tv1, NULL);
1424         srandom((unsigned int) tv1.tv_usec);
1425
1426         /* get start up time */
1427         gettimeofday(&tv1, NULL);
1428
1429         if (is_connect == 0)
1430         {
1431                 /* make connections to the database */
1432                 for (i = 0; i < nclients; i++)
1433                 {
1434                         state[i].id = i;
1435                         if ((state[i].con = doConnect()) == NULL)
1436                                 exit(1);
1437                 }
1438         }
1439
1440         /* time after connections set up */
1441         gettimeofday(&tv2, NULL);
1442
1443         /* process bultin SQL scripts */
1444         switch (ttype)
1445         {
1446                 case 0:
1447                         sql_files[0] = process_builtin(tpc_b);
1448                         num_files = 1;
1449                         break;
1450
1451                 case 1:
1452                         sql_files[0] = process_builtin(select_only);
1453                         num_files = 1;
1454                         break;
1455
1456                 case 2:
1457                         sql_files[0] = process_builtin(simple_update);
1458                         num_files = 1;
1459                         break;
1460
1461                 default:
1462                         break;
1463         }
1464
1465         /* send start up queries in async manner */
1466         for (i = 0; i < nclients; i++)
1467         {
1468                 Command   **commands = sql_files[state[i].use_file];
1469                 int                     prev_ecnt = state[i].ecnt;
1470
1471                 state[i].use_file = getrand(0, num_files - 1);
1472                 doCustom(state, i, debug);
1473
1474                 if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1475                 {
1476                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1477                         remains--;                      /* I've aborted */
1478                         PQfinish(state[i].con);
1479                         state[i].con = NULL;
1480                 }
1481         }
1482
1483         for (;;)
1484         {
1485                 if (remains <= 0)
1486                 {                                               /* all done ? */
1487                         disconnect_all(state);
1488                         /* get end time */
1489                         gettimeofday(&tv3, NULL);
1490                         printResults(ttype, state, &tv1, &tv2, &tv3);
1491                         if (LOGFILE)
1492                                 fclose(LOGFILE);
1493                         exit(0);
1494                 }
1495
1496                 FD_ZERO(&input_mask);
1497
1498                 maxsock = -1;
1499                 for (i = 0; i < nclients; i++)
1500                 {
1501                         Command   **commands = sql_files[state[i].use_file];
1502
1503                         if (state[i].con && commands[state[i].state]->type != META_COMMAND)
1504                         {
1505                                 int                     sock = PQsocket(state[i].con);
1506
1507                                 if (sock < 0)
1508                                 {
1509                                         disconnect_all(state);
1510                                         exit(1);
1511                                 }
1512                                 FD_SET(sock, &input_mask);
1513                                 if (maxsock < sock)
1514                                         maxsock = sock;
1515                         }
1516                 }
1517
1518                 if (maxsock != -1)
1519                 {
1520                         if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
1521                                                           (fd_set *) NULL, (struct timeval *) NULL)) < 0)
1522                         {
1523                                 if (errno == EINTR)
1524                                         continue;
1525                                 /* must be something wrong */
1526                                 disconnect_all(state);
1527                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
1528                                 exit(1);
1529                         }
1530                         else if (nsocks == 0)
1531                         {                                       /* timeout */
1532                                 fprintf(stderr, "select timeout\n");
1533                                 for (i = 0; i < nclients; i++)
1534                                 {
1535                                         fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
1536                                                         i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
1537                                 }
1538                                 exit(0);
1539                         }
1540                 }
1541
1542                 /* ok, backend returns reply */
1543                 for (i = 0; i < nclients; i++)
1544                 {
1545                         Command   **commands = sql_files[state[i].use_file];
1546                         int                     prev_ecnt = state[i].ecnt;
1547
1548                         if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
1549                                                   || commands[state[i].state]->type == META_COMMAND))
1550                         {
1551                                 doCustom(state, i, debug);
1552                         }
1553
1554                         if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1555                         {
1556                                 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1557                                 remains--;              /* I've aborted */
1558                                 PQfinish(state[i].con);
1559                                 state[i].con = NULL;
1560                         }
1561                 }
1562         }
1563 }