]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
1f2e0bf4167e5d27cc5917d8c217ff492f06d9c1
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.62 2007/03/13 09:06:35 mha Exp $
3  *
4  * pgbench: a simple benchmark program for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2000-2007      Tatsuo Ishii
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  */
20 #include "postgres_fe.h"
21
22 #include "libpq-fe.h"
23
24 #include <ctype.h>
25
26 #ifdef WIN32
27 #include "win32.h"
28 #else
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #ifdef HAVE_GETOPT_H
33 #include <getopt.h>
34 #endif
35
36 #ifdef HAVE_SYS_SELECT_H
37 #include <sys/select.h>
38 #endif
39
40 #ifdef HAVE_SYS_RESOURCE_H
41 #include <sys/resource.h>               /* for getrlimit */
42 #endif
43 #endif   /* ! WIN32 */
44
45 extern char *optarg;
46 extern int      optind;
47
48 #ifdef WIN32
49 #undef select
50 #endif
51
52
53 /********************************************************************
54  * some configurable parameters */
55
56 #define MAXCLIENTS 1024                 /* max number of clients allowed */
57
58 int                     nclients = 1;           /* default number of simulated clients */
59 int                     nxacts = 10;            /* default number of transactions per clients */
60
61 /*
62  * scaling factor. for example, scale = 10 will make 1000000 tuples of
63  * accounts table.
64  */
65 int                     scale = 1;
66
67 /*
68  * end of configurable parameters
69  *********************************************************************/
70
71 #define nbranches       1
72 #define ntellers        10
73 #define naccounts       100000
74
75 FILE       *LOGFILE = NULL;
76
77 bool            use_log;                        /* log transaction latencies to a file */
78
79 int                     remains;                        /* number of remaining clients */
80
81 int                     is_connect;                     /* establish connection  for each transaction */
82
83 char       *pghost = "";
84 char       *pgport = NULL;
85 char       *pgoptions = NULL;
86 char       *pgtty = NULL;
87 char       *login = NULL;
88 char       *pwd = NULL;
89 char       *dbName;
90
91 /* variable definitions */
92 typedef struct
93 {
94         char       *name;                       /* variable name */
95         char       *value;                      /* its value */
96 }       Variable;
97
98 /*
99  * structures used in custom query mode
100  */
101
102 typedef struct
103 {
104         PGconn     *con;                        /* connection handle to DB */
105         int                     id;                             /* client No. */
106         int                     state;                  /* state No. */
107         int                     cnt;                    /* xacts count */
108         int                     ecnt;                   /* error count */
109         int                     listen;                 /* 0 indicates that an async query has been
110                                                                  * sent */
111         Variable   *variables;          /* array of variable definitions */
112         int                     nvariables;
113         struct timeval txn_begin;       /* used for measuring latencies */
114         int                     use_file;               /* index in sql_files for this client */
115 }       CState;
116
117 /*
118  * queries read from files
119  */
120 #define SQL_COMMAND             1
121 #define META_COMMAND    2
122 #define MAX_ARGS                10
123
124 typedef struct
125 {
126         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
127         int                     argc;                   /* number of commands */
128         char       *argv[MAX_ARGS]; /* command list */
129 }       Command;
130
131 #define MAX_FILES               128             /* max number of SQL script files allowed */
132
133 Command   **sql_files[MAX_FILES];               /* SQL script files */
134 int                     num_files;                      /* its number */
135
136 /* default scenario */
137 static char *tpc_b = {
138         "\\set nbranches :scale\n"
139         "\\set ntellers 10 * :scale\n"
140         "\\set naccounts 100000 * :scale\n"
141         "\\setrandom aid 1 :naccounts\n"
142         "\\setrandom bid 1 :nbranches\n"
143         "\\setrandom tid 1 :ntellers\n"
144         "\\setrandom delta -5000 5000\n"
145         "BEGIN;\n"
146         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
147         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
148         "UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
149         "UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
150         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
151         "END;\n"
152 };
153
154 /* -N case */
155 static char *simple_update = {
156         "\\set nbranches :scale\n"
157         "\\set ntellers 10 * :scale\n"
158         "\\set naccounts 100000 * :scale\n"
159         "\\setrandom aid 1 :naccounts\n"
160         "\\setrandom bid 1 :nbranches\n"
161         "\\setrandom tid 1 :ntellers\n"
162         "\\setrandom delta -5000 5000\n"
163         "BEGIN;\n"
164         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
165         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
166         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
167         "END;\n"
168 };
169
170 /* -S case */
171 static char *select_only = {
172         "\\set naccounts 100000 * :scale\n"
173         "\\setrandom aid 1 :naccounts\n"
174         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
175 };
176
177 static void
178 usage(void)
179 {
180         fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-D varname=value][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
181         fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
182 }
183
184 /* random number generator */
185 static int
186 getrand(int min, int max)
187 {
188         return min + (int) (((max - min) * (double) random()) / MAX_RANDOM_VALUE + 0.5);
189 }
190
191 /* set up a connection to the backend */
192 static PGconn *
193 doConnect(void)
194 {
195         PGconn     *con;
196         PGresult   *res;
197
198         con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
199                                            login, pwd);
200         if (con == NULL)
201         {
202                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
203                 fprintf(stderr, "Memory allocatin problem?\n");
204                 return (NULL);
205         }
206
207         if (PQstatus(con) == CONNECTION_BAD)
208         {
209                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
210
211                 if (PQerrorMessage(con))
212                         fprintf(stderr, "%s", PQerrorMessage(con));
213                 else
214                         fprintf(stderr, "No explanation from the backend\n");
215
216                 return (NULL);
217         }
218
219         res = PQexec(con, "SET search_path = public");
220         if (PQresultStatus(res) != PGRES_COMMAND_OK)
221         {
222                 fprintf(stderr, "%s", PQerrorMessage(con));
223                 exit(1);
224         }
225         PQclear(res);
226
227         return (con);
228 }
229
230 /* throw away response from backend */
231 static void
232 discard_response(CState * state)
233 {
234         PGresult   *res;
235
236         do
237         {
238                 res = PQgetResult(state->con);
239                 if (res)
240                         PQclear(res);
241         } while (res);
242 }
243
244 /* check to see if the SQL result was good */
245 static int
246 check(CState *state, PGresult *res, int n)
247 {
248         CState     *st = &state[n];
249
250         switch (PQresultStatus(res))
251         {
252                 case PGRES_COMMAND_OK:
253                 case PGRES_TUPLES_OK:
254                         /* OK */
255                         break;
256                 default:
257                         fprintf(stderr, "Client %d aborted in state %d: %s",
258                                         n, st->state, PQerrorMessage(st->con));
259                         remains--;                              /* I've aborted */
260                         PQfinish(st->con);
261                         st->con = NULL;
262                         return (-1);
263         }
264         return (0);                                     /* OK */
265 }
266
267 static int
268 compareVariables(const void *v1, const void *v2)
269 {
270         return strcmp(((const Variable *) v1)->name,
271                                   ((const Variable *) v2)->name);
272 }
273
274 static char *
275 getVariable(CState * st, char *name)
276 {
277         Variable        key,
278                            *var;
279
280         /* On some versions of Solaris, bsearch of zero items dumps core */
281         if (st->nvariables <= 0)
282                 return NULL;
283
284         key.name = name;
285         var = (Variable *) bsearch((void *) &key,
286                                                            (void *) st->variables,
287                                                            st->nvariables,
288                                                            sizeof(Variable),
289                                                            compareVariables);
290         if (var != NULL)
291                 return var->value;
292         else
293                 return NULL;
294 }
295
296 static int
297 putVariable(CState * st, char *name, char *value)
298 {
299         Variable        key,
300                            *var;
301
302         key.name = name;
303         /* On some versions of Solaris, bsearch of zero items dumps core */
304         if (st->nvariables > 0)
305                 var = (Variable *) bsearch((void *) &key,
306                                                                    (void *) st->variables,
307                                                                    st->nvariables,
308                                                                    sizeof(Variable),
309                                                                    compareVariables);
310         else
311                 var = NULL;
312
313         if (var == NULL)
314         {
315                 Variable   *newvars;
316
317                 if (st->variables)
318                         newvars = (Variable *) realloc(st->variables,
319                                                                         (st->nvariables + 1) * sizeof(Variable));
320                 else
321                         newvars = (Variable *) malloc(sizeof(Variable));
322
323                 if (newvars == NULL)
324                         return false;
325
326                 st->variables = newvars;
327
328                 var = &newvars[st->nvariables];
329
330                 var->name = NULL;
331                 var->value = NULL;
332
333                 if ((var->name = strdup(name)) == NULL
334                         || (var->value = strdup(value)) == NULL)
335                 {
336                         free(var->name);
337                         free(var->value);
338                         return false;
339                 }
340
341                 st->nvariables++;
342
343                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
344                           compareVariables);
345         }
346         else
347         {
348                 char       *val;
349
350                 if ((val = strdup(value)) == NULL)
351                         return false;
352
353                 free(var->value);
354                 var->value = val;
355         }
356
357         return true;
358 }
359
360 static char *
361 assignVariables(CState * st, char *sql)
362 {
363         int                     i,
364                                 j;
365         char       *p,
366                            *name,
367                            *val;
368         void       *tmp;
369
370         i = 0;
371         while ((p = strchr(&sql[i], ':')) != NULL)
372         {
373                 i = j = p - sql;
374                 do
375                 {
376                         i++;
377                 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
378                 if (i == j + 1)
379                         continue;
380
381                 name = malloc(i - j);
382                 if (name == NULL)
383                         return NULL;
384                 memcpy(name, &sql[j + 1], i - (j + 1));
385                 name[i - (j + 1)] = '\0';
386                 val = getVariable(st, name);
387                 free(name);
388                 if (val == NULL)
389                         continue;
390
391                 if (strlen(val) > i - j)
392                 {
393                         tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
394                         if (tmp == NULL)
395                         {
396                                 free(sql);
397                                 return NULL;
398                         }
399                         sql = tmp;
400                 }
401
402                 if (strlen(val) != i - j)
403                         memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
404
405                 strncpy(&sql[j], val, strlen(val));
406
407                 if (strlen(val) < i - j)
408                 {
409                         tmp = realloc(sql, strlen(sql) + 1);
410                         if (tmp == NULL)
411                         {
412                                 free(sql);
413                                 return NULL;
414                         }
415                         sql = tmp;
416                 }
417
418                 i = j + strlen(val);
419         }
420
421         return sql;
422 }
423
424 static void
425 doCustom(CState * state, int n, int debug)
426 {
427         PGresult   *res;
428         CState     *st = &state[n];
429         Command   **commands;
430
431 top:
432         commands = sql_files[st->use_file];
433
434         if (st->listen)
435         {                                                       /* are we receiver? */
436                 if (commands[st->state]->type == SQL_COMMAND)
437                 {
438                         if (debug)
439                                 fprintf(stderr, "client %d receiving\n", n);
440                         if (!PQconsumeInput(st->con))
441                         {                                       /* there's something wrong */
442                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
443                                 remains--;              /* I've aborted */
444                                 PQfinish(st->con);
445                                 st->con = NULL;
446                                 return;
447                         }
448                         if (PQisBusy(st->con))
449                                 return;                 /* don't have the whole result yet */
450                 }
451
452                 /*
453                  * transaction finished: record the time it took in the log
454                  */
455                 if (use_log && commands[st->state + 1] == NULL)
456                 {
457                         double          diff;
458                         struct timeval now;
459
460                         gettimeofday(&now, NULL);
461                         diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
462                                 (int) (now.tv_usec - st->txn_begin.tv_usec);
463
464                         fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
465                 }
466
467                 if (commands[st->state]->type == SQL_COMMAND)
468                 {
469                         res = PQgetResult(st->con);
470                         if (check(state, res, n))
471                         {
472                                 PQclear(res);
473                                 return;
474                         }
475                         PQclear(res);
476                         discard_response(st);
477                 }
478
479                 if (commands[st->state + 1] == NULL)
480                 {
481                         if (is_connect)
482                         {
483                                 PQfinish(st->con);
484                                 st->con = NULL;
485                         }
486
487                         if (++st->cnt >= nxacts)
488                         {
489                                 remains--;              /* I've done */
490                                 if (st->con != NULL)
491                                 {
492                                         PQfinish(st->con);
493                                         st->con = NULL;
494                                 }
495                                 return;
496                         }
497                 }
498
499                 /* increment state counter */
500                 st->state++;
501                 if (commands[st->state] == NULL)
502                 {
503                         st->state = 0;
504                         st->use_file = getrand(0, num_files - 1);
505                         commands = sql_files[st->use_file];
506                 }
507         }
508
509         if (st->con == NULL)
510         {
511                 if ((st->con = doConnect()) == NULL)
512                 {
513                         fprintf(stderr, "Client %d aborted in establishing connection.\n",
514                                         n);
515                         remains--;                      /* I've aborted */
516                         PQfinish(st->con);
517                         st->con = NULL;
518                         return;
519                 }
520         }
521
522         if (use_log && st->state == 0)
523                 gettimeofday(&(st->txn_begin), NULL);
524
525         if (commands[st->state]->type == SQL_COMMAND)
526         {
527                 char       *sql;
528
529                 if ((sql = strdup(commands[st->state]->argv[0])) == NULL
530                         || (sql = assignVariables(st, sql)) == NULL)
531                 {
532                         fprintf(stderr, "out of memory\n");
533                         st->ecnt++;
534                         return;
535                 }
536
537                 if (debug)
538                         fprintf(stderr, "client %d sending %s\n", n, sql);
539                 if (PQsendQuery(st->con, sql) == 0)
540                 {
541                         if (debug)
542                                 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
543                         st->ecnt++;
544                 }
545                 else
546                 {
547                         st->listen = 1;         /* flags that should be listened */
548                 }
549                 free(sql);
550         }
551         else if (commands[st->state]->type == META_COMMAND)
552         {
553                 int                     argc = commands[st->state]->argc,
554                                         i;
555                 char      **argv = commands[st->state]->argv;
556
557                 if (debug)
558                 {
559                         fprintf(stderr, "client %d executing \\%s", n, argv[0]);
560                         for (i = 1; i < argc; i++)
561                                 fprintf(stderr, " %s", argv[i]);
562                         fprintf(stderr, "\n");
563                 }
564
565                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
566                 {
567                         char       *var;
568                         int                     min,
569                                                 max;
570                         char            res[64];
571
572                         if (*argv[2] == ':')
573                         {
574                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
575                                 {
576                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
577                                         st->ecnt++;
578                                         return;
579                                 }
580                                 min = atoi(var);
581                         }
582                         else
583                                 min = atoi(argv[2]);
584
585 #ifdef NOT_USED
586                         if (min < 0)
587                         {
588                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
589                                 st->ecnt++;
590                                 return;
591                         }
592 #endif
593
594                         if (*argv[3] == ':')
595                         {
596                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
597                                 {
598                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
599                                         st->ecnt++;
600                                         return;
601                                 }
602                                 max = atoi(var);
603                         }
604                         else
605                                 max = atoi(argv[3]);
606
607                         if (max < min || max > MAX_RANDOM_VALUE)
608                         {
609                                 fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
610                                 st->ecnt++;
611                                 return;
612                         }
613
614 #ifdef DEBUG
615                         printf("min: %d max: %d random: %d\n", min, max, getrand(min, max));
616 #endif
617                         snprintf(res, sizeof(res), "%d", getrand(min, max));
618
619                         if (putVariable(st, argv[1], res) == false)
620                         {
621                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
622                                 st->ecnt++;
623                                 return;
624                         }
625
626                         st->listen = 1;
627                 }
628                 else if (pg_strcasecmp(argv[0], "set") == 0)
629                 {
630                         char       *var;
631                         int                     ope1,
632                                                 ope2;
633                         char            res[64];
634
635                         if (*argv[2] == ':')
636                         {
637                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
638                                 {
639                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
640                                         st->ecnt++;
641                                         return;
642                                 }
643                                 ope1 = atoi(var);
644                         }
645                         else
646                                 ope1 = atoi(argv[2]);
647
648                         if (argc < 5)
649                                 snprintf(res, sizeof(res), "%d", ope1);
650                         else
651                         {
652                                 if (*argv[4] == ':')
653                                 {
654                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
655                                         {
656                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
657                                                 st->ecnt++;
658                                                 return;
659                                         }
660                                         ope2 = atoi(var);
661                                 }
662                                 else
663                                         ope2 = atoi(argv[4]);
664
665                                 if (strcmp(argv[3], "+") == 0)
666                                         snprintf(res, sizeof(res), "%d", ope1 + ope2);
667                                 else if (strcmp(argv[3], "-") == 0)
668                                         snprintf(res, sizeof(res), "%d", ope1 - ope2);
669                                 else if (strcmp(argv[3], "*") == 0)
670                                         snprintf(res, sizeof(res), "%d", ope1 * ope2);
671                                 else if (strcmp(argv[3], "/") == 0)
672                                 {
673                                         if (ope2 == 0)
674                                         {
675                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
676                                                 st->ecnt++;
677                                                 return;
678                                         }
679                                         snprintf(res, sizeof(res), "%d", ope1 / ope2);
680                                 }
681                                 else
682                                 {
683                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
684                                         st->ecnt++;
685                                         return;
686                                 }
687                         }
688
689                         if (putVariable(st, argv[1], res) == false)
690                         {
691                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
692                                 st->ecnt++;
693                                 return;
694                         }
695
696                         st->listen = 1;
697                 }
698
699                 goto top;
700         }
701 }
702
703 /* discard connections */
704 static void
705 disconnect_all(CState * state)
706 {
707         int                     i;
708
709         for (i = 0; i < nclients; i++)
710         {
711                 if (state[i].con)
712                         PQfinish(state[i].con);
713         }
714 }
715
716 /* create tables and setup data */
717 static void
718 init(void)
719 {
720         PGconn     *con;
721         PGresult   *res;
722         static char *DDLs[] = {
723                 "drop table branches",
724                 "create table branches(bid int not null,bbalance int,filler char(88))",
725                 "drop table tellers",
726                 "create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
727                 "drop table accounts",
728                 "create table accounts(aid int not null,bid int,abalance int,filler char(84))",
729                 "drop table history",
730         "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
731         static char *DDLAFTERs[] = {
732                 "alter table branches add primary key (bid)",
733                 "alter table tellers add primary key (tid)",
734         "alter table accounts add primary key (aid)"};
735
736
737         char            sql[256];
738
739         int                     i;
740
741         if ((con = doConnect()) == NULL)
742                 exit(1);
743
744         for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
745         {
746                 res = PQexec(con, DDLs[i]);
747                 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
748                 {
749                         fprintf(stderr, "%s", PQerrorMessage(con));
750                         exit(1);
751                 }
752                 PQclear(res);
753         }
754
755         res = PQexec(con, "begin");
756         if (PQresultStatus(res) != PGRES_COMMAND_OK)
757         {
758                 fprintf(stderr, "%s", PQerrorMessage(con));
759                 exit(1);
760         }
761         PQclear(res);
762
763         for (i = 0; i < nbranches * scale; i++)
764         {
765                 snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
766                 res = PQexec(con, sql);
767                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
768                 {
769                         fprintf(stderr, "%s", PQerrorMessage(con));
770                         exit(1);
771                 }
772                 PQclear(res);
773         }
774
775         for (i = 0; i < ntellers * scale; i++)
776         {
777                 snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
778                                  ,i + 1, i / ntellers + 1);
779                 res = PQexec(con, sql);
780                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
781                 {
782                         fprintf(stderr, "%s", PQerrorMessage(con));
783                         exit(1);
784                 }
785                 PQclear(res);
786         }
787
788         res = PQexec(con, "end");
789         if (PQresultStatus(res) != PGRES_COMMAND_OK)
790         {
791                 fprintf(stderr, "%s", PQerrorMessage(con));
792                 exit(1);
793         }
794         PQclear(res);
795
796         /*
797          * occupy accounts table with some data
798          */
799         fprintf(stderr, "creating tables...\n");
800         for (i = 0; i < naccounts * scale; i++)
801         {
802                 int                     j = i + 1;
803
804                 if (j % 10000 == 1)
805                 {
806                         res = PQexec(con, "copy accounts from stdin");
807                         if (PQresultStatus(res) != PGRES_COPY_IN)
808                         {
809                                 fprintf(stderr, "%s", PQerrorMessage(con));
810                                 exit(1);
811                         }
812                         PQclear(res);
813                 }
814
815                 snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
816                 if (PQputline(con, sql))
817                 {
818                         fprintf(stderr, "PQputline failed\n");
819                         exit(1);
820                 }
821
822                 if (j % 10000 == 0)
823                 {
824                         /*
825                          * every 10000 tuples, we commit the copy command. this should
826                          * avoid generating too much WAL logs
827                          */
828                         fprintf(stderr, "%d tuples done.\n", j);
829                         if (PQputline(con, "\\.\n"))
830                         {
831                                 fprintf(stderr, "very last PQputline failed\n");
832                                 exit(1);
833                         }
834
835                         if (PQendcopy(con))
836                         {
837                                 fprintf(stderr, "PQendcopy failed\n");
838                                 exit(1);
839                         }
840
841 #ifdef NOT_USED
842
843                         /*
844                          * do a checkpoint to purge the old WAL logs
845                          */
846                         res = PQexec(con, "checkpoint");
847                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
848                         {
849                                 fprintf(stderr, "%s", PQerrorMessage(con));
850                                 exit(1);
851                         }
852                         PQclear(res);
853 #endif   /* NOT_USED */
854                 }
855         }
856         fprintf(stderr, "set primary key...\n");
857         for (i = 0; i < (sizeof(DDLAFTERs) / sizeof(char *)); i++)
858         {
859                 res = PQexec(con, DDLAFTERs[i]);
860                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
861                 {
862                         fprintf(stderr, "%s", PQerrorMessage(con));
863                         exit(1);
864                 }
865                 PQclear(res);
866         }
867
868         /* vacuum */
869         fprintf(stderr, "vacuum...");
870         res = PQexec(con, "vacuum analyze");
871         if (PQresultStatus(res) != PGRES_COMMAND_OK)
872         {
873                 fprintf(stderr, "%s", PQerrorMessage(con));
874                 exit(1);
875         }
876         PQclear(res);
877         fprintf(stderr, "done.\n");
878
879         PQfinish(con);
880 }
881
882 static Command *
883 process_commands(char *buf)
884 {
885         const char      delim[] = " \f\n\r\t\v";
886
887         Command    *my_commands;
888         int                     j;
889         char       *p,
890                            *tok;
891
892         if ((p = strchr(buf, '\n')) != NULL)
893                 *p = '\0';
894
895         p = buf;
896         while (isspace((unsigned char) *p))
897                 p++;
898
899         if (*p == '\0' || strncmp(p, "--", 2) == 0)
900         {
901                 return NULL;
902         }
903
904         my_commands = (Command *) malloc(sizeof(Command));
905         if (my_commands == NULL)
906         {
907                 return NULL;
908         }
909
910         my_commands->argc = 0;
911
912         if (*p == '\\')
913         {
914                 my_commands->type = META_COMMAND;
915
916                 j = 0;
917                 tok = strtok(++p, delim);
918
919                 while (tok != NULL)
920                 {
921                         if ((my_commands->argv[j] = strdup(tok)) == NULL)
922                                 return NULL;
923
924                         my_commands->argc++;
925
926                         j++;
927                         tok = strtok(NULL, delim);
928                 }
929
930                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
931                 {
932                         if (my_commands->argc < 4)
933                         {
934                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
935                                 return NULL;
936                         }
937
938                         for (j = 4; j < my_commands->argc; j++)
939                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
940                                                 my_commands->argv[0], my_commands->argv[j]);
941                 }
942                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
943                 {
944                         if (my_commands->argc < 3)
945                         {
946                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
947                                 return NULL;
948                         }
949
950                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
951                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
952                                                 my_commands->argv[0], my_commands->argv[j]);
953                 }
954                 else
955                 {
956                         fprintf(stderr, "invalid command %s\n", my_commands->argv[0]);
957                         return NULL;
958                 }
959         }
960         else
961         {
962                 my_commands->type = SQL_COMMAND;
963
964                 if ((my_commands->argv[0] = strdup(p)) == NULL)
965                         return NULL;
966
967                 my_commands->argc++;
968         }
969
970         return my_commands;
971 }
972
973 static int
974 process_file(char *filename)
975 {
976 #define COMMANDS_ALLOC_NUM 128
977
978         Command   **my_commands;
979         FILE       *fd;
980         int                     lineno;
981         char            buf[BUFSIZ];
982         int                     alloc_num;
983
984         if (num_files >= MAX_FILES)
985         {
986                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
987                 exit(1);
988         }
989
990         alloc_num = COMMANDS_ALLOC_NUM;
991         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
992         if (my_commands == NULL)
993                 return false;
994
995         if (strcmp(filename, "-") == 0)
996                 fd = stdin;
997         else if ((fd = fopen(filename, "r")) == NULL)
998         {
999                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
1000                 return false;
1001         }
1002
1003         lineno = 0;
1004
1005         while (fgets(buf, sizeof(buf), fd) != NULL)
1006         {
1007                 Command    *commands;
1008                 int                     i;
1009
1010                 i = 0;
1011                 while (isspace((unsigned char) buf[i]))
1012                         i++;
1013
1014                 if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0)
1015                 {
1016                         commands = process_commands(&buf[i]);
1017                         if (commands == NULL)
1018                         {
1019                                 fclose(fd);
1020                                 return false;
1021                         }
1022                 }
1023                 else
1024                         continue;
1025
1026                 my_commands[lineno] = commands;
1027                 lineno++;
1028
1029                 if (lineno >= alloc_num)
1030                 {
1031                         alloc_num += COMMANDS_ALLOC_NUM;
1032                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1033                         if (my_commands == NULL)
1034                         {
1035                                 fclose(fd);
1036                                 return false;
1037                         }
1038                 }
1039         }
1040         fclose(fd);
1041
1042         my_commands[lineno] = NULL;
1043
1044         sql_files[num_files++] = my_commands;
1045
1046         return true;
1047 }
1048
1049 static Command **
1050 process_builtin(char *tb)
1051 {
1052 #define COMMANDS_ALLOC_NUM 128
1053
1054         Command   **my_commands;
1055         int                     lineno;
1056         char            buf[BUFSIZ];
1057         int                     alloc_num;
1058
1059         if (*tb == '\0')
1060                 return NULL;
1061
1062         alloc_num = COMMANDS_ALLOC_NUM;
1063         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
1064         if (my_commands == NULL)
1065                 return NULL;
1066
1067         lineno = 0;
1068
1069         for (;;)
1070         {
1071                 char       *p;
1072                 Command    *commands;
1073
1074                 p = buf;
1075                 while (*tb && *tb != '\n')
1076                         *p++ = *tb++;
1077
1078                 if (*tb == '\0')
1079                         break;
1080
1081                 if (*tb == '\n')
1082                         tb++;
1083
1084                 *p = '\0';
1085
1086                 commands = process_commands(buf);
1087                 if (commands == NULL)
1088                 {
1089                         return NULL;
1090                 }
1091
1092                 my_commands[lineno] = commands;
1093                 lineno++;
1094
1095                 if (lineno >= alloc_num)
1096                 {
1097                         alloc_num += COMMANDS_ALLOC_NUM;
1098                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1099                         if (my_commands == NULL)
1100                         {
1101                                 return NULL;
1102                         }
1103                 }
1104         }
1105
1106         my_commands[lineno] = NULL;
1107
1108         return my_commands;
1109 }
1110
1111 /* print out results */
1112 static void
1113 printResults(
1114                          int ttype, CState * state,
1115                          struct timeval * tv1, struct timeval * tv2,
1116                          struct timeval * tv3)
1117 {
1118         double          t1,
1119                                 t2;
1120         int                     i;
1121         int                     normal_xacts = 0;
1122         char       *s;
1123
1124         for (i = 0; i < nclients; i++)
1125                 normal_xacts += state[i].cnt;
1126
1127         t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
1128         t1 = normal_xacts * 1000000.0 / t1;
1129
1130         t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
1131         t2 = normal_xacts * 1000000.0 / t2;
1132
1133         if (ttype == 0)
1134                 s = "TPC-B (sort of)";
1135         else if (ttype == 2)
1136                 s = "Update only accounts";
1137         else if (ttype == 1)
1138                 s = "SELECT only";
1139         else
1140                 s = "Custom query";
1141
1142         printf("transaction type: %s\n", s);
1143         printf("scaling factor: %d\n", scale);
1144         printf("number of clients: %d\n", nclients);
1145         printf("number of transactions per client: %d\n", nxacts);
1146         printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
1147         printf("tps = %f (including connections establishing)\n", t1);
1148         printf("tps = %f (excluding connections establishing)\n", t2);
1149 }
1150
1151
1152 int
1153 main(int argc, char **argv)
1154 {
1155         int                     c;
1156         int                     is_init_mode = 0;               /* initialize mode? */
1157         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
1158         int                     is_full_vacuum = 0;             /* do full vacuum before testing? */
1159         int                     debug = 0;              /* debug flag */
1160         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
1161                                                                  * 2: skip update of branches and tellers */
1162         char       *filename = NULL;
1163
1164         CState     *state;                      /* status of clients */
1165
1166         struct timeval tv1;                     /* start up time */
1167         struct timeval tv2;                     /* after establishing all connections to the
1168                                                                  * backend */
1169         struct timeval tv3;                     /* end time */
1170
1171         int                     i;
1172
1173         fd_set          input_mask;
1174         int                     nsocks;                 /* return from select(2) */
1175         int                     maxsock;                /* max socket number to be waited */
1176
1177 #ifdef HAVE_GETRLIMIT
1178         struct rlimit rlim;
1179 #endif
1180
1181         PGconn     *con;
1182         PGresult   *res;
1183         char       *env;
1184
1185         char            val[64];
1186
1187 #ifdef WIN32
1188         /* stderr is buffered on Win32. */
1189         setvbuf(stderr, NULL, _IONBF, 0);
1190 #endif
1191
1192         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
1193                 pghost = env;
1194         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
1195                 pgport = env;
1196         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
1197                 login = env;
1198
1199         state = (CState *) malloc(sizeof(CState));
1200         if (state == NULL)
1201         {
1202                 fprintf(stderr, "Couldn't allocate memory for state\n");
1203                 exit(1);
1204         }
1205
1206         memset(state, 0, sizeof(*state));
1207
1208         while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:D:")) != -1)
1209         {
1210                 switch (c)
1211                 {
1212                         case 'i':
1213                                 is_init_mode++;
1214                                 break;
1215                         case 'h':
1216                                 pghost = optarg;
1217                                 break;
1218                         case 'n':
1219                                 is_no_vacuum++;
1220                                 break;
1221                         case 'v':
1222                                 is_full_vacuum++;
1223                                 break;
1224                         case 'p':
1225                                 pgport = optarg;
1226                                 break;
1227                         case 'd':
1228                                 debug++;
1229                                 break;
1230                         case 'S':
1231                                 ttype = 1;
1232                                 break;
1233                         case 'N':
1234                                 ttype = 2;
1235                                 break;
1236                         case 'c':
1237                                 nclients = atoi(optarg);
1238                                 if (nclients <= 0 || nclients > MAXCLIENTS)
1239                                 {
1240                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
1241                                         exit(1);
1242                                 }
1243 #ifdef HAVE_GETRLIMIT
1244 #ifdef RLIMIT_NOFILE                    /* most platforms use RLIMIT_NOFILE */
1245                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
1246 #else                                                   /* but BSD doesn't ... */
1247                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
1248 #endif   /* RLIMIT_NOFILE */
1249                                 {
1250                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
1251                                         exit(1);
1252                                 }
1253                                 if (rlim.rlim_cur <= (nclients + 2))
1254                                 {
1255                                         fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
1256                                         fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
1257                                         exit(1);
1258                                 }
1259 #endif /* HAVE_GETRLIMIT */
1260                                 break;
1261                         case 'C':
1262                                 is_connect = 1;
1263                                 break;
1264                         case 's':
1265                                 scale = atoi(optarg);
1266                                 if (scale <= 0)
1267                                 {
1268                                         fprintf(stderr, "invalid scaling factor: %d\n", scale);
1269                                         exit(1);
1270                                 }
1271                                 break;
1272                         case 't':
1273                                 nxacts = atoi(optarg);
1274                                 if (nxacts <= 0)
1275                                 {
1276                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
1277                                         exit(1);
1278                                 }
1279                                 break;
1280                         case 'U':
1281                                 login = optarg;
1282                                 break;
1283                         case 'P':
1284                                 pwd = optarg;
1285                                 break;
1286                         case 'l':
1287                                 use_log = true;
1288                                 break;
1289                         case 'f':
1290                                 ttype = 3;
1291                                 filename = optarg;
1292                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
1293                                         exit(1);
1294                                 break;
1295                         case 'D':
1296                                 {
1297                                         char       *p;
1298
1299                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
1300                                         {
1301                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
1302                                                 exit(1);
1303                                         }
1304
1305                                         *p++ = '\0';
1306                                         if (putVariable(&state[0], optarg, p) == false)
1307                                         {
1308                                                 fprintf(stderr, "Couldn't allocate memory for variable\n");
1309                                                 exit(1);
1310                                         }
1311                                 }
1312                                 break;
1313                         default:
1314                                 usage();
1315                                 exit(1);
1316                                 break;
1317                 }
1318         }
1319
1320         if (argc > optind)
1321                 dbName = argv[optind];
1322         else
1323         {
1324                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
1325                         dbName = env;
1326                 else if (login != NULL && *login != '\0')
1327                         dbName = login;
1328                 else
1329                         dbName = "";
1330         }
1331
1332         if (is_init_mode)
1333         {
1334                 init();
1335                 exit(0);
1336         }
1337
1338         remains = nclients;
1339
1340         if (getVariable(&state[0], "scale") == NULL)
1341         {
1342                 snprintf(val, sizeof(val), "%d", scale);
1343                 if (putVariable(&state[0], "scale", val) == false)
1344                 {
1345                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1346                         exit(1);
1347                 }
1348         }
1349
1350         if (nclients > 1)
1351         {
1352                 state = (CState *) realloc(state, sizeof(CState) * nclients);
1353                 if (state == NULL)
1354                 {
1355                         fprintf(stderr, "Couldn't allocate memory for state\n");
1356                         exit(1);
1357                 }
1358
1359                 memset(state + 1, 0, sizeof(*state) * (nclients - 1));
1360
1361                 snprintf(val, sizeof(val), "%d", scale);
1362
1363                 for (i = 1; i < nclients; i++)
1364                 {
1365                         int                     j;
1366
1367                         for (j = 0; j < state[0].nvariables; j++)
1368                         {
1369                                 if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
1370                                 {
1371                                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1372                                         exit(1);
1373                                 }
1374                         }
1375
1376                         if (putVariable(&state[i], "scale", val) == false)
1377                         {
1378                                 fprintf(stderr, "Couldn't allocate memory for variable\n");
1379                                 exit(1);
1380                         }
1381                 }
1382         }
1383
1384         if (use_log)
1385         {
1386                 char            logpath[64];
1387
1388                 snprintf(logpath, 64, "pgbench_log.%d", getpid());
1389                 LOGFILE = fopen(logpath, "w");
1390
1391                 if (LOGFILE == NULL)
1392                 {
1393                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
1394                         exit(1);
1395                 }
1396         }
1397
1398         if (debug)
1399         {
1400                 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
1401                            pghost, pgport, nclients, nxacts, dbName);
1402         }
1403
1404         /* opening connection... */
1405         con = doConnect();
1406         if (con == NULL)
1407                 exit(1);
1408
1409         if (PQstatus(con) == CONNECTION_BAD)
1410         {
1411                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
1412                 fprintf(stderr, "%s", PQerrorMessage(con));
1413                 exit(1);
1414         }
1415
1416         if (ttype != 3)
1417         {
1418                 /*
1419                  * get the scaling factor that should be same as count(*) from
1420                  * branches if this is not a custom query
1421                  */
1422                 res = PQexec(con, "select count(*) from branches");
1423                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1424                 {
1425                         fprintf(stderr, "%s", PQerrorMessage(con));
1426                         exit(1);
1427                 }
1428                 scale = atoi(PQgetvalue(res, 0, 0));
1429                 if (scale < 0)
1430                 {
1431                         fprintf(stderr, "count(*) from branches invalid (%d)\n", scale);
1432                         exit(1);
1433                 }
1434                 PQclear(res);
1435
1436                 snprintf(val, sizeof(val), "%d", scale);
1437                 if (putVariable(&state[0], "scale", val) == false)
1438                 {
1439                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1440                         exit(1);
1441                 }
1442
1443                 if (nclients > 1)
1444                 {
1445                         for (i = 1; i < nclients; i++)
1446                         {
1447                                 if (putVariable(&state[i], "scale", val) == false)
1448                                 {
1449                                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1450                                         exit(1);
1451                                 }
1452                         }
1453                 }
1454         }
1455
1456         if (!is_no_vacuum)
1457         {
1458                 fprintf(stderr, "starting vacuum...");
1459                 res = PQexec(con, "vacuum branches");
1460                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1461                 {
1462                         fprintf(stderr, "%s", PQerrorMessage(con));
1463                         exit(1);
1464                 }
1465                 PQclear(res);
1466
1467                 res = PQexec(con, "vacuum tellers");
1468                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1469                 {
1470                         fprintf(stderr, "%s", PQerrorMessage(con));
1471                         exit(1);
1472                 }
1473                 PQclear(res);
1474
1475                 res = PQexec(con, "delete from history");
1476                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1477                 {
1478                         fprintf(stderr, "%s", PQerrorMessage(con));
1479                         exit(1);
1480                 }
1481                 PQclear(res);
1482                 res = PQexec(con, "vacuum history");
1483                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1484                 {
1485                         fprintf(stderr, "%s", PQerrorMessage(con));
1486                         exit(1);
1487                 }
1488                 PQclear(res);
1489
1490                 fprintf(stderr, "end.\n");
1491
1492                 if (is_full_vacuum)
1493                 {
1494                         fprintf(stderr, "starting full vacuum...");
1495                         res = PQexec(con, "vacuum analyze accounts");
1496                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1497                         {
1498                                 fprintf(stderr, "%s", PQerrorMessage(con));
1499                                 exit(1);
1500                         }
1501                         PQclear(res);
1502                         fprintf(stderr, "end.\n");
1503                 }
1504         }
1505         PQfinish(con);
1506
1507         /* set random seed */
1508         gettimeofday(&tv1, NULL);
1509         srandom((unsigned int) tv1.tv_usec);
1510
1511         /* get start up time */
1512         gettimeofday(&tv1, NULL);
1513
1514         if (is_connect == 0)
1515         {
1516                 /* make connections to the database */
1517                 for (i = 0; i < nclients; i++)
1518                 {
1519                         state[i].id = i;
1520                         if ((state[i].con = doConnect()) == NULL)
1521                                 exit(1);
1522                 }
1523         }
1524
1525         /* time after connections set up */
1526         gettimeofday(&tv2, NULL);
1527
1528         /* process bultin SQL scripts */
1529         switch (ttype)
1530         {
1531                 case 0:
1532                         sql_files[0] = process_builtin(tpc_b);
1533                         num_files = 1;
1534                         break;
1535
1536                 case 1:
1537                         sql_files[0] = process_builtin(select_only);
1538                         num_files = 1;
1539                         break;
1540
1541                 case 2:
1542                         sql_files[0] = process_builtin(simple_update);
1543                         num_files = 1;
1544                         break;
1545
1546                 default:
1547                         break;
1548         }
1549
1550         /* send start up queries in async manner */
1551         for (i = 0; i < nclients; i++)
1552         {
1553                 Command   **commands = sql_files[state[i].use_file];
1554                 int                     prev_ecnt = state[i].ecnt;
1555
1556                 state[i].use_file = getrand(0, num_files - 1);
1557                 doCustom(state, i, debug);
1558
1559                 if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1560                 {
1561                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1562                         remains--;                      /* I've aborted */
1563                         PQfinish(state[i].con);
1564                         state[i].con = NULL;
1565                 }
1566         }
1567
1568         for (;;)
1569         {
1570                 if (remains <= 0)
1571                 {                                               /* all done ? */
1572                         disconnect_all(state);
1573                         /* get end time */
1574                         gettimeofday(&tv3, NULL);
1575                         printResults(ttype, state, &tv1, &tv2, &tv3);
1576                         if (LOGFILE)
1577                                 fclose(LOGFILE);
1578                         exit(0);
1579                 }
1580
1581                 FD_ZERO(&input_mask);
1582
1583                 maxsock = -1;
1584                 for (i = 0; i < nclients; i++)
1585                 {
1586                         Command   **commands = sql_files[state[i].use_file];
1587
1588                         if (state[i].con && commands[state[i].state]->type != META_COMMAND)
1589                         {
1590                                 int                     sock = PQsocket(state[i].con);
1591
1592                                 if (sock < 0)
1593                                 {
1594                                         disconnect_all(state);
1595                                         exit(1);
1596                                 }
1597                                 FD_SET(sock, &input_mask);
1598                                 if (maxsock < sock)
1599                                         maxsock = sock;
1600                         }
1601                 }
1602
1603                 if (maxsock != -1)
1604                 {
1605                         if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
1606                                                           (fd_set *) NULL, (struct timeval *) NULL)) < 0)
1607                         {
1608                                 if (errno == EINTR)
1609                                         continue;
1610                                 /* must be something wrong */
1611                                 disconnect_all(state);
1612                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
1613                                 exit(1);
1614                         }
1615                         else if (nsocks == 0)
1616                         {                                       /* timeout */
1617                                 fprintf(stderr, "select timeout\n");
1618                                 for (i = 0; i < nclients; i++)
1619                                 {
1620                                         fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
1621                                                         i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
1622                                 }
1623                                 exit(0);
1624                         }
1625                 }
1626
1627                 /* ok, backend returns reply */
1628                 for (i = 0; i < nclients; i++)
1629                 {
1630                         Command   **commands = sql_files[state[i].use_file];
1631                         int                     prev_ecnt = state[i].ecnt;
1632
1633                         if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
1634                                                   || commands[state[i].state]->type == META_COMMAND))
1635                         {
1636                                 doCustom(state, i, debug);
1637                         }
1638
1639                         if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1640                         {
1641                                 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1642                                 remains--;              /* I've aborted */
1643                                 PQfinish(state[i].con);
1644                                 state[i].con = NULL;
1645                         }
1646                 }
1647         }
1648 }