]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
Avoid PQisBusy/PQconsumeInput busy loop in case of PQisBusy returning
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.20 2002/10/07 05:10:02 ishii Exp $
3  *
4  * pgbench: a simple TPC-B like benchmark program for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2000-2002      Tatsuo Ishii
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  */
20 #include "postgres_fe.h"
21
22 #include "libpq-fe.h"
23
24 #include <errno.h>
25
26 #ifdef WIN32
27 #include "win32.h"
28 #else
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #ifdef HAVE_GETOPT_H
33 #include <getopt.h>
34 #endif
35
36 #ifdef HAVE_SYS_SELECT_H
37 #include <sys/select.h>
38 #endif
39
40 /* for getrlimit */
41 #include <sys/resource.h>
42 #endif   /* ! WIN32 */
43
44 /********************************************************************
45  * some configurable parameters */
46
47 #define MAXCLIENTS 1024                 /* max number of clients allowed */
48
49 int                     nclients = 1;           /* default number of simulated clients */
50 int                     nxacts = 10;            /* default number of transactions per
51                                                                  * clients */
52
53 /*
54  * scaling factor. for example, tps = 10 will make 1000000 tuples of
55  * accounts table.
56  */
57 int                     tps = 1;
58
59 /*
60  * end of configurable parameters
61  *********************************************************************/
62
63 #define nbranches       1
64 #define ntellers        10
65 #define naccounts       100000
66
67 FILE       *LOGFILE = NULL;
68
69 bool            use_log;                        /* log transaction latencies to a file */
70
71 int                     remains;                        /* number of remaining clients */
72
73 int                     is_connect;                     /* establish connection  for each
74                                                                  * transaction */
75
76 char       *pghost = "";
77 char       *pgport = NULL;
78 char       *pgoptions = NULL;
79 char       *pgtty = NULL;
80 char       *login = NULL;
81 char       *pwd = NULL;
82 char       *dbName;
83
84 typedef struct
85 {
86         PGconn     *con;                        /* connection handle to DB */
87         int                     id;                             /* client No. */
88         int                     state;                  /* state No. */
89         int                     cnt;                    /* xacts count */
90         int                     ecnt;                   /* error count */
91         int                     listen;                 /* 0 indicates that an async query has
92                                                                  * been sent */
93         int                     aid;                    /* account id for this transaction */
94         int                     bid;                    /* branch id for this transaction */
95         int                     tid;                    /* teller id for this transaction */
96         int                     delta;
97         int                     abalance;
98         struct timeval txn_begin;       /* used for measuring latencies */
99 }       CState;
100
101 static void
102 usage()
103 {
104         fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-l][-U login][-P password][-d][dbname]\n");
105         fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
106 }
107
108 /* random number generator */
109 static int
110 getrand(int min, int max)
111 {
112         return (min + (int) (max * 1.0 * rand() / (RAND_MAX + 1.0)));
113 }
114
115 /* set up a connection to the backend */
116 static PGconn *
117 doConnect()
118 {
119         PGconn     *con;
120
121         con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
122                                            login, pwd);
123         if (con == NULL)
124         {
125                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
126                 fprintf(stderr, "Memory allocatin problem?\n");
127                 return (NULL);
128         }
129
130         if (PQstatus(con) == CONNECTION_BAD)
131         {
132                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
133
134                 if (PQerrorMessage(con))
135                         fprintf(stderr, "%s", PQerrorMessage(con));
136                 else
137                         fprintf(stderr, "No explanation from the backend\n");
138
139                 return (NULL);
140         }
141         return (con);
142 }
143
144 /* throw away response from backend */
145 static void
146 discard_response(CState * state)
147 {
148         PGresult   *res;
149
150         do
151         {
152                 res = PQgetResult(state->con);
153                 if (res)
154                         PQclear(res);
155         } while (res);
156 }
157
158 /* check to see if the SQL result was good */
159 static int
160 check(CState * state, PGresult *res, int n, int good)
161 {
162         CState     *st = &state[n];
163
164         if (res && PQresultStatus(res) != good)
165         {
166                 fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con));
167                 remains--;                              /* I've aborted */
168                 PQfinish(st->con);
169                 st->con = NULL;
170                 return (-1);
171         }
172         return (0);                                     /* OK */
173 }
174
175 /* process a transaction */
176 static void
177 doOne(CState * state, int n, int debug, int ttype)
178 {
179         char            sql[256];
180         PGresult   *res;
181         CState     *st = &state[n];
182
183         if (st->listen)
184         {                                                       /* are we receiver? */
185                 if (debug)
186                         fprintf(stderr, "client %d receiving\n", n);
187                 if (!PQconsumeInput(st->con))
188                 {                                               /* there's something wrong */
189                         fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
190                         remains--;                      /* I've aborted */
191                         PQfinish(st->con);
192                         st->con = NULL;
193                         return;
194                 }
195                 if (PQisBusy(st->con))
196                         return;                         /* don't have the whole result yet */
197
198                 switch (st->state)
199                 {
200                         case 0:                         /* response to "begin" */
201                                 res = PQgetResult(st->con);
202                                 if (check(state, res, n, PGRES_COMMAND_OK))
203                                         return;
204                                 PQclear(res);
205                                 discard_response(st);
206                                 break;
207                         case 1:                         /* response to "update accounts..." */
208                                 res = PQgetResult(st->con);
209                                 if (check(state, res, n, PGRES_COMMAND_OK))
210                                         return;
211                                 PQclear(res);
212                                 discard_response(st);
213                                 break;
214                         case 2:                         /* response to "select abalance ..." */
215                                 res = PQgetResult(st->con);
216                                 if (check(state, res, n, PGRES_TUPLES_OK))
217                                         return;
218                                 PQclear(res);
219                                 discard_response(st);
220                                 break;
221                         case 3:                         /* response to "update tellers ..." */
222                                 res = PQgetResult(st->con);
223                                 if (check(state, res, n, PGRES_COMMAND_OK))
224                                         return;
225                                 PQclear(res);
226                                 discard_response(st);
227                                 break;
228                         case 4:                         /* response to "update branches ..." */
229                                 res = PQgetResult(st->con);
230                                 if (check(state, res, n, PGRES_COMMAND_OK))
231                                         return;
232                                 PQclear(res);
233                                 discard_response(st);
234                                 break;
235                         case 5:                         /* response to "insert into history ..." */
236                                 res = PQgetResult(st->con);
237                                 if (check(state, res, n, PGRES_COMMAND_OK))
238                                         return;
239                                 PQclear(res);
240                                 discard_response(st);
241                                 break;
242                         case 6:                         /* response to "end" */
243
244                                 /*
245                                  * transaction finished: record the time it took in the
246                                  * log
247                                  */
248                                 if (use_log)
249                                 {
250                                         long long       diff;
251                                         struct timeval now;
252
253                                         gettimeofday(&now, 0);
254                                         diff = (now.tv_sec - st->txn_begin.tv_sec) * 1000000 +
255                                                 (now.tv_usec - st->txn_begin.tv_usec);
256
257                                         fprintf(LOGFILE, "%d %d %lld\n", st->id, st->cnt, diff);
258                                 }
259
260                                 res = PQgetResult(st->con);
261                                 if (check(state, res, n, PGRES_COMMAND_OK))
262                                         return;
263                                 PQclear(res);
264                                 discard_response(st);
265
266                                 if (is_connect)
267                                 {
268                                         PQfinish(st->con);
269                                         st->con = NULL;
270                                 }
271
272                                 if (++st->cnt >= nxacts)
273                                 {
274                                         remains--;      /* I'm done */
275                                         if (st->con != NULL)
276                                         {
277                                                 PQfinish(st->con);
278                                                 st->con = NULL;
279                                         }
280                                         return;
281                                 }
282                                 break;
283                 }
284
285                 /* increment state counter */
286                 st->state++;
287                 if (st->state > 6)
288                         st->state = 0;
289         }
290
291         if (st->con == NULL)
292         {
293                 if ((st->con = doConnect()) == NULL)
294                 {
295                         fprintf(stderr, "Client %d aborted in establishing connection.\n",
296                                         n);
297                         remains--;                      /* I've aborted */
298                         PQfinish(st->con);
299                         st->con = NULL;
300                         return;
301                 }
302         }
303
304         switch (st->state)
305         {
306                 case 0:                 /* about to start */
307                         strcpy(sql, "begin");
308                         st->aid = getrand(1, naccounts * tps);
309                         st->bid = getrand(1, nbranches * tps);
310                         st->tid = getrand(1, ntellers * tps);
311                         st->delta = getrand(1, 1000);
312                         if (use_log)
313                                 gettimeofday(&(st->txn_begin), 0);
314                         break;
315                 case 1:
316                         snprintf(sql, 256, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
317                         break;
318                 case 2:
319                         snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
320                         break;
321                 case 3:
322                         if (ttype == 0)
323                         {
324                                 snprintf(sql, 256, "update tellers set tbalance = tbalance + %d where tid = %d\n",
325                                                  st->delta, st->tid);
326                                 break;
327                         }
328                 case 4:
329                         if (ttype == 0)
330                         {
331                                 snprintf(sql, 256, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
332                                 break;
333                         }
334                 case 5:
335                         snprintf(sql, 256, "insert into history(tid,bid,aid,delta,mtime) values(%d,%d,%d,%d,'now')",
336                                          st->tid, st->bid, st->aid, st->delta);
337                         break;
338                 case 6:
339                         strcpy(sql, "end");
340                         break;
341         }
342
343         if (debug)
344                 fprintf(stderr, "client %d sending %s\n", n, sql);
345         if (PQsendQuery(st->con, sql) == 0)
346         {
347                 if (debug)
348                         fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
349                 st->ecnt++;
350         }
351         else
352         {
353                 st->listen++;                   /* flags that should be listened */
354         }
355 }
356
357 /* process a select only transaction */
358 static void
359 doSelectOnly(CState * state, int n, int debug)
360 {
361         char            sql[256];
362         PGresult   *res;
363         CState     *st = &state[n];
364
365         if (st->listen)
366         {                                                       /* are we receiver? */
367                 if (debug)
368                         fprintf(stderr, "client %d receiving\n", n);
369                 if (!PQconsumeInput(st->con))
370                 {                                               /* there's something wrong */
371                         fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
372                         remains--;                      /* I've aborted */
373                         PQfinish(st->con);
374                         st->con = NULL;
375                         return;
376                 }
377                 if (PQisBusy(st->con))
378                         return;                         /* don't have the whole result yet */
379
380                 switch (st->state)
381                 {
382                         case 0:                         /* response to "select abalance ..." */
383                                 res = PQgetResult(st->con);
384                                 if (check(state, res, n, PGRES_TUPLES_OK))
385                                         return;
386                                 PQclear(res);
387                                 discard_response(st);
388
389                                 if (is_connect)
390                                 {
391                                         PQfinish(st->con);
392                                         st->con = NULL;
393                                 }
394
395                                 if (++st->cnt >= nxacts)
396                                 {
397                                         remains--;      /* I've done */
398                                         if (st->con != NULL)
399                                         {
400                                                 PQfinish(st->con);
401                                                 st->con = NULL;
402                                         }
403                                         return;
404                                 }
405                                 break;
406                 }
407
408                 /* increment state counter */
409                 st->state++;
410                 if (st->state > 0)
411                         st->state = 0;
412         }
413
414         if (st->con == NULL)
415         {
416                 if ((st->con = doConnect()) == NULL)
417                 {
418                         fprintf(stderr, "Client %d aborted in establishing connection.\n",
419                                         n);
420                         remains--;                      /* I've aborted */
421                         PQfinish(st->con);
422                         st->con = NULL;
423                         return;
424                 }
425         }
426
427         switch (st->state)
428         {
429                 case 0:
430                         st->aid = getrand(1, naccounts * tps);
431                         snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
432                         break;
433         }
434
435         if (debug)
436                 fprintf(stderr, "client %d sending %s\n", n, sql);
437
438         if (PQsendQuery(st->con, sql) == 0)
439         {
440                 if (debug)
441                         fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
442                 st->ecnt++;
443         }
444         else
445         {
446                 st->listen++;                   /* flags that should be listened */
447         }
448 }
449
450 /* discard connections */
451 static void
452 disconnect_all(CState * state)
453 {
454         int                     i;
455
456         for (i = 0; i < nclients; i++)
457         {
458                 if (state[i].con)
459                         PQfinish(state[i].con);
460         }
461 }
462
463 /* create tables and setup data */
464 static void
465 init(void)
466 {
467         PGconn     *con;
468         PGresult   *res;
469         static char *DDLs[] = {
470                 "drop table branches",
471                 "create table branches(bid int, primary key(bid),bbalance int,filler char(88))",
472                 "drop table tellers",
473                 "create table tellers(tid int, primary key(tid),bid int,tbalance int,filler char(84))",
474                 "drop table accounts",
475                 "create table accounts(aid int,primary key(aid),bid int,abalance int,filler char(84))",
476                 "drop table history",
477         "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
478         char            sql[256];
479
480         int                     i;
481
482         if ((con = doConnect()) == NULL)
483                 exit(1);
484
485         for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
486         {
487                 res = PQexec(con, DDLs[i]);
488                 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
489                 {
490                         fprintf(stderr, "%s", PQerrorMessage(con));
491                         exit(1);
492                 }
493                 PQclear(res);
494         }
495
496         res = PQexec(con, "begin");
497         if (PQresultStatus(res) != PGRES_COMMAND_OK)
498         {
499                 fprintf(stderr, "%s", PQerrorMessage(con));
500                 exit(1);
501         }
502
503         for (i = 0; i < nbranches * tps; i++)
504         {
505                 snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
506                 res = PQexec(con, sql);
507                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
508                 {
509                         fprintf(stderr, "%s", PQerrorMessage(con));
510                         exit(1);
511                 }
512                 PQclear(res);
513         }
514
515         for (i = 0; i < ntellers * tps; i++)
516         {
517                 snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
518                                  ,i + 1, i / ntellers + 1);
519                 res = PQexec(con, sql);
520                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
521                 {
522                         fprintf(stderr, "%s", PQerrorMessage(con));
523                         exit(1);
524                 }
525                 PQclear(res);
526         }
527
528         res = PQexec(con, "end");
529         if (PQresultStatus(res) != PGRES_COMMAND_OK)
530         {
531                 fprintf(stderr, "%s", PQerrorMessage(con));
532                 exit(1);
533         }
534
535
536         /*
537          * occupy accounts table with some data
538          */
539         fprintf(stderr, "creating tables...\n");
540         for (i = 0; i < naccounts * tps; i++)
541         {
542                 int                     j = i + 1;
543
544                 if (j % 10000 == 1)
545                 {
546                         res = PQexec(con, "copy accounts from stdin");
547                         if (PQresultStatus(res) != PGRES_COPY_IN)
548                         {
549                                 fprintf(stderr, "%s", PQerrorMessage(con));
550                                 exit(1);
551                         }
552                         PQclear(res);
553                 }
554
555                 snprintf(sql, 256, "%d\t%d\t%d\t\n", j, j / naccounts, 0);
556                 if (PQputline(con, sql))
557                 {
558                         fprintf(stderr, "PQputline failed\n");
559                         exit(1);
560                 }
561
562                 if (j % 10000 == 0)
563                 {
564                         /*
565                          * every 10000 tuples, we commit the copy command. this should
566                          * avoid generating too much WAL logs
567                          */
568                         fprintf(stderr, "%d tuples done.\n", j);
569                         if (PQputline(con, "\\.\n"))
570                         {
571                                 fprintf(stderr, "very last PQputline failed\n");
572                                 exit(1);
573                         }
574
575                         if (PQendcopy(con))
576                         {
577                                 fprintf(stderr, "PQendcopy failed\n");
578                                 exit(1);
579                         }
580
581 #ifdef NOT_USED
582
583                         /*
584                          * do a checkpoint to purge the old WAL logs
585                          */
586                         res = PQexec(con, "checkpoint");
587                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
588                         {
589                                 fprintf(stderr, "%s", PQerrorMessage(con));
590                                 exit(1);
591                         }
592 #endif   /* NOT_USED */
593                 }
594         }
595
596         /* vacuum */
597         fprintf(stderr, "vacuum...");
598         res = PQexec(con, "vacuum analyze");
599         if (PQresultStatus(res) != PGRES_COMMAND_OK)
600         {
601                 fprintf(stderr, "%s", PQerrorMessage(con));
602                 exit(1);
603         }
604         fprintf(stderr, "done.\n");
605
606         PQfinish(con);
607 }
608
609 /* print out results */
610 static void
611 printResults(
612                          int ttype, CState * state,
613                          struct timeval * tv1, struct timeval * tv2,
614                          struct timeval * tv3)
615 {
616         double          t1,
617                                 t2;
618         int                     i;
619         int                     normal_xacts = 0;
620         char       *s;
621
622         for (i = 0; i < nclients; i++)
623                 normal_xacts += state[i].cnt;
624
625         t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
626         t1 = normal_xacts * 1000000.0 / t1;
627
628         t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
629         t2 = normal_xacts * 1000000.0 / t2;
630
631         if (ttype == 0)
632                 s = "TPC-B (sort of)";
633         else if (ttype == 2)
634                 s = "Update only accounts";
635         else
636                 s = "SELECT only";
637
638         printf("transaction type: %s\n", s);
639         printf("scaling factor: %d\n", tps);
640         printf("number of clients: %d\n", nclients);
641         printf("number of transactions per client: %d\n", nxacts);
642         printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
643         printf("tps = %f (including connections establishing)\n", t1);
644         printf("tps = %f (excluding connections establishing)\n", t2);
645 }
646
647
648 int
649 main(int argc, char **argv)
650 {
651         extern char *optarg;
652         extern int      optind,
653                                 opterr,
654                                 optopt;
655         int                     c;
656         int                     is_init_mode = 0;               /* initialize mode? */
657         int                     is_no_vacuum = 0;               /* no vacuum at all before
658                                                                                  * testing? */
659         int                     is_full_vacuum = 0;             /* do full vacuum before testing? */
660         int                     debug = 0;              /* debug flag */
661         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT
662                                                                  * only, 2: skip update of branches and
663                                                                  * tellers */
664
665         static CState *state;           /* status of clients */
666
667         struct timeval tv1;                     /* start up time */
668         struct timeval tv2;                     /* after establishing all connections to
669                                                                  * the backend */
670         struct timeval tv3;                     /* end time */
671
672         int                     i;
673
674         fd_set          input_mask;
675         int                     nsocks;                 /* return from select(2) */
676         int                     maxsock;                /* max socket number to be waited */
677
678 #ifndef __CYGWIN__
679         struct rlimit rlim;
680 #endif
681
682         PGconn     *con;
683         PGresult   *res;
684
685         while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1)
686         {
687                 switch (c)
688                 {
689                         case 'i':
690                                 is_init_mode++;
691                                 break;
692                         case 'h':
693                                 pghost = optarg;
694                                 break;
695                         case 'n':
696                                 is_no_vacuum++;
697                                 break;
698                         case 'v':
699                                 is_full_vacuum++;
700                                 break;
701                         case 'p':
702                                 pgport = optarg;
703                                 break;
704                         case 'd':
705                                 debug++;
706                                 break;
707                         case 'S':
708                                 ttype = 1;
709                                 break;
710                         case 'N':
711                                 ttype = 2;
712                                 break;
713                         case 'c':
714                                 nclients = atoi(optarg);
715                                 if (nclients <= 0 || nclients > MAXCLIENTS)
716                                 {
717                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
718                                         exit(1);
719                                 }
720 #ifndef __CYGWIN__
721 #ifdef RLIMIT_NOFILE                    /* most platform uses RLIMIT_NOFILE */
722                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
723                                 {
724 #else                                                   /* but BSD doesn't ... */
725                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
726                                 {
727 #endif   /* HAVE_RLIMIT_NOFILE */
728                                         fprintf(stderr, "getrlimit failed. reason: %s\n", strerror(errno));
729                                         exit(1);
730                                 }
731                                 if (rlim.rlim_cur <= (nclients + 2))
732                                 {
733                                         fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
734                                         fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
735                                         exit(1);
736                                 }
737 #endif   /* #ifndef __CYGWIN__ */
738                                 break;
739                         case 'C':
740                                 is_connect = 1;
741                                 break;
742                         case 's':
743                                 tps = atoi(optarg);
744                                 if (tps <= 0)
745                                 {
746                                         fprintf(stderr, "invalid scaling factor: %d\n", tps);
747                                         exit(1);
748                                 }
749                                 break;
750                         case 't':
751                                 nxacts = atoi(optarg);
752                                 if (nxacts <= 0)
753                                 {
754                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
755                                         exit(1);
756                                 }
757                                 break;
758                         case 'U':
759                                 login = optarg;
760                                 break;
761                         case 'P':
762                                 pwd = optarg;
763                                 break;
764                         case 'l':
765                                 use_log = true;
766                                 break;
767                         default:
768                                 usage();
769                                 exit(1);
770                                 break;
771                 }
772         }
773
774         if (argc > optind)
775                 dbName = argv[optind];
776         else
777         {
778                 dbName = getenv("USER");
779                 if (dbName == NULL)
780                         dbName = "";
781         }
782
783         if (is_init_mode)
784         {
785                 init();
786                 exit(0);
787         }
788
789         remains = nclients;
790
791         state = (CState *) malloc(sizeof(*state) * nclients);
792         memset(state, 0, sizeof(*state));
793
794         if (use_log)
795         {
796                 char            logpath[64];
797
798                 snprintf(logpath, 64, "pgbench_log.%d", getpid());
799                 LOGFILE = fopen(logpath, "w");
800
801                 if (LOGFILE == NULL)
802                 {
803                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
804                         exit(1);
805                 }
806         }
807
808         if (debug)
809         {
810                 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
811                            pghost, pgport, nclients, nxacts, dbName);
812         }
813
814         /* opening connection... */
815         con = doConnect();
816         if (con == NULL)
817                 exit(1);
818
819         if (PQstatus(con) == CONNECTION_BAD)
820         {
821                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
822                 fprintf(stderr, "%s", PQerrorMessage(con));
823                 exit(1);
824         }
825
826         /*
827          * get the scaling factor that should be same as count(*) from
828          * branches...
829          */
830         res = PQexec(con, "select count(*) from branches");
831         if (PQresultStatus(res) != PGRES_TUPLES_OK)
832         {
833                 fprintf(stderr, "%s", PQerrorMessage(con));
834                 exit(1);
835         }
836         tps = atoi(PQgetvalue(res, 0, 0));
837         if (tps < 0)
838         {
839                 fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
840                 exit(1);
841         }
842         PQclear(res);
843
844         if (!is_no_vacuum)
845         {
846                 fprintf(stderr, "starting vacuum...");
847                 res = PQexec(con, "vacuum branches");
848                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
849                 {
850                         fprintf(stderr, "%s", PQerrorMessage(con));
851                         exit(1);
852                 }
853                 PQclear(res);
854
855                 res = PQexec(con, "vacuum tellers");
856                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
857                 {
858                         fprintf(stderr, "%s", PQerrorMessage(con));
859                         exit(1);
860                 }
861                 PQclear(res);
862
863                 res = PQexec(con, "delete from history");
864                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
865                 {
866                         fprintf(stderr, "%s", PQerrorMessage(con));
867                         exit(1);
868                 }
869                 PQclear(res);
870                 res = PQexec(con, "vacuum history");
871                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
872                 {
873                         fprintf(stderr, "%s", PQerrorMessage(con));
874                         exit(1);
875                 }
876                 PQclear(res);
877
878                 fprintf(stderr, "end.\n");
879
880                 if (is_full_vacuum)
881                 {
882                         fprintf(stderr, "starting full vacuum...");
883                         res = PQexec(con, "vacuum analyze accounts");
884                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
885                         {
886                                 fprintf(stderr, "%s", PQerrorMessage(con));
887                                 exit(1);
888                         }
889                         PQclear(res);
890                         fprintf(stderr, "end.\n");
891                 }
892         }
893         PQfinish(con);
894
895         /* set random seed */
896         gettimeofday(&tv1, 0);
897         srand((uint) tv1.tv_usec);
898
899         /* get start up time */
900         gettimeofday(&tv1, 0);
901
902         if (is_connect == 0)
903         {
904                 /* make connections to the database */
905                 for (i = 0; i < nclients; i++)
906                 {
907                         state[i].id = i;
908                         if ((state[i].con = doConnect()) == NULL)
909                                 exit(1);
910                 }
911         }
912
913         /* time after connections set up */
914         gettimeofday(&tv2, 0);
915
916         /* send start up queries in async manner */
917         for (i = 0; i < nclients; i++)
918         {
919                 if (ttype == 0 || ttype == 2)
920                         doOne(state, i, debug, ttype);
921                 else if (ttype == 1)
922                         doSelectOnly(state, i, debug);
923         }
924
925         for (;;)
926         {
927                 if (remains <= 0)
928                 {                                               /* all done ? */
929                         disconnect_all(state);
930                         /* get end time */
931                         gettimeofday(&tv3, 0);
932                         printResults(ttype, state, &tv1, &tv2, &tv3);
933                         if (LOGFILE)
934                                 fclose(LOGFILE);
935                         exit(0);
936                 }
937
938                 FD_ZERO(&input_mask);
939
940                 maxsock = 0;
941                 for (i = 0; i < nclients; i++)
942                 {
943                         if (state[i].con)
944                         {
945                                 int                     sock = PQsocket(state[i].con);
946
947                                 if (sock < 0)
948                                 {
949                                         fprintf(stderr, "Client %d: PQsocket failed\n", i);
950                                         disconnect_all(state);
951                                         exit(1);
952                                 }
953                                 FD_SET(sock, &input_mask);
954                                 if (maxsock < sock)
955                                         maxsock = sock;
956                         }
957                 }
958
959                 if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
960                                                   (fd_set *) NULL, (struct timeval *) NULL)) < 0)
961                 {
962                         if (errno == EINTR)
963                                 continue;
964                         /* must be something wrong */
965                         disconnect_all(state);
966                         fprintf(stderr, "select failed: %s\n", strerror(errno));
967                         exit(1);
968                 }
969                 else if (nsocks == 0)
970                 {                                               /* timeout */
971                         fprintf(stderr, "select timeout\n");
972                         for (i = 0; i < nclients; i++)
973                         {
974                                 fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
975                                                 i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
976                         }
977                         exit(0);
978                 }
979
980                 /* ok, backend returns reply */
981                 for (i = 0; i < nclients; i++)
982                 {
983                         if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask))
984                         {
985                                 if (ttype == 0 || ttype == 2)
986                                         doOne(state, i, debug, ttype);
987                                 else if (ttype == 1)
988                                         doSelectOnly(state, i, debug);
989                         }
990                 }
991         }
992 }