]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
New features contributed by Tomoaki Sato.
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.50 2006/07/26 07:24:50 ishii Exp $
3  *
4  * pgbench: a simple benchmark program for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2000-2005      Tatsuo Ishii
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  */
20 #include "postgres_fe.h"
21
22 #include "libpq-fe.h"
23
24 #include <ctype.h>
25
26 #ifdef WIN32
27 #include "win32.h"
28 #else
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #ifdef HAVE_GETOPT_H
33 #include <getopt.h>
34 #endif
35
36 #ifdef HAVE_SYS_SELECT_H
37 #include <sys/select.h>
38 #endif
39
40 /* for getrlimit */
41 #include <sys/resource.h>
42 #endif   /* ! WIN32 */
43
44 extern char *optarg;
45 extern int      optind;
46
47 #ifdef WIN32
48 #undef select
49 #endif
50
51
52 /********************************************************************
53  * some configurable parameters */
54
55 #define MAXCLIENTS 1024                 /* max number of clients allowed */
56
57 int                     nclients = 1;           /* default number of simulated clients */
58 int                     nxacts = 10;            /* default number of transactions per clients */
59
60 /*
61  * scaling factor. for example, tps = 10 will make 1000000 tuples of
62  * accounts table.
63  */
64 int                     tps = 1;
65
66 /*
67  * end of configurable parameters
68  *********************************************************************/
69
70 #define nbranches       1
71 #define ntellers        10
72 #define naccounts       100000
73
74 FILE       *LOGFILE = NULL;
75
76 bool            use_log;                        /* log transaction latencies to a file */
77
78 int                     remains;                        /* number of remaining clients */
79
80 int                     is_connect;                     /* establish connection  for each transaction */
81
82 char       *pghost = "";
83 char       *pgport = NULL;
84 char       *pgoptions = NULL;
85 char       *pgtty = NULL;
86 char       *login = NULL;
87 char       *pwd = NULL;
88 char       *dbName;
89
90 /* variable definitions */
91 typedef struct
92 {
93         char       *name;                       /* variable name */
94         char       *value;                      /* its value */
95 }       Variable;
96
97 /*
98  * structures used in custom query mode
99  */
100
101 typedef struct
102 {
103         PGconn     *con;                        /* connection handle to DB */
104         int                     id;                             /* client No. */
105         int                     state;                  /* state No. */
106         int                     cnt;                    /* xacts count */
107         int                     ecnt;                   /* error count */
108         int                     listen;                 /* 0 indicates that an async query has been
109                                                                  * sent */
110         Variable   *variables;          /* array of variable definitions */
111         int                     nvariables;
112         struct timeval txn_begin;       /* used for measuring latencies */
113         int                     use_file;               /* index in sql_files for this client */
114 }       CState;
115
116 /*
117  * queries read from files
118  */
119 #define SQL_COMMAND             1
120 #define META_COMMAND    2
121 #define MAX_ARGS                10
122
123 typedef struct
124 {
125         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
126         int                     argc;                   /* number of commands */
127         char       *argv[MAX_ARGS]; /* command list */
128 }       Command;
129
130 #define MAX_FILES               128             /* max number of SQL script files allowed */
131
132 Command   **sql_files[MAX_FILES];               /* SQL script files */
133 int                     num_files;                      /* its number */
134
135 /* default scenario */
136 static char *tpc_b = {
137         "\\setrandom aid 1 100000\n"
138         "\\setrandom bid 1 1\n"
139         "\\setrandom tid 1 10\n"
140         "\\setrandom delta 1 10000\n"
141         "BEGIN;\n"
142         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
143         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
144         "UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
145         "UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
146         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
147         "END;\n"
148 };
149
150 /* -N case */
151 static char *simple_update = {
152         "\\setrandom aid 1 100000\n"
153         "\\setrandom bid 1 1\n"
154         "\\setrandom tid 1 10\n"
155         "\\setrandom delta 1 10000\n"
156         "BEGIN;\n"
157         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
158         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
159         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
160         "END;\n"
161 };
162
163 /* -S case */
164 static char *select_only = {
165         "\\setrandom aid 1 100000\n"
166         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
167 };
168
169 static void
170 usage(void)
171 {
172         fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-D varname=value][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
173         fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
174 }
175
176 /* random number generator */
177 static int
178 getrand(int min, int max)
179 {
180         return min + (int) (((max - min) * (double) random()) / MAX_RANDOM_VALUE + 0.5);
181 }
182
183 /* set up a connection to the backend */
184 static PGconn *
185 doConnect(void)
186 {
187         PGconn     *con;
188         PGresult   *res;
189
190         con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
191                                            login, pwd);
192         if (con == NULL)
193         {
194                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
195                 fprintf(stderr, "Memory allocatin problem?\n");
196                 return (NULL);
197         }
198
199         if (PQstatus(con) == CONNECTION_BAD)
200         {
201                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
202
203                 if (PQerrorMessage(con))
204                         fprintf(stderr, "%s", PQerrorMessage(con));
205                 else
206                         fprintf(stderr, "No explanation from the backend\n");
207
208                 return (NULL);
209         }
210
211         res = PQexec(con, "SET search_path = public");
212         if (PQresultStatus(res) != PGRES_COMMAND_OK)
213         {
214                 fprintf(stderr, "%s", PQerrorMessage(con));
215                 exit(1);
216         }
217         PQclear(res);
218
219         return (con);
220 }
221
222 /* throw away response from backend */
223 static void
224 discard_response(CState * state)
225 {
226         PGresult   *res;
227
228         do
229         {
230                 res = PQgetResult(state->con);
231                 if (res)
232                         PQclear(res);
233         } while (res);
234 }
235
236 /* check to see if the SQL result was good */
237 static int
238 check(CState * state, PGresult *res, int n, int good)
239 {
240         CState     *st = &state[n];
241
242         if (res && PQresultStatus(res) != good)
243         {
244                 fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con));
245                 remains--;                              /* I've aborted */
246                 PQfinish(st->con);
247                 st->con = NULL;
248                 return (-1);
249         }
250         return (0);                                     /* OK */
251 }
252
253 static int
254 compareVariables(const void *v1, const void *v2)
255 {
256         return strcmp(((const Variable *) v1)->name,
257                                   ((const Variable *) v2)->name);
258 }
259
260 static char *
261 getVariable(CState * st, char *name)
262 {
263         Variable        key,
264                            *var;
265
266         /* On some versions of Solaris, bsearch of zero items dumps core */
267         if (st->nvariables <= 0)
268                 return NULL;
269
270         key.name = name;
271         var = (Variable *) bsearch((void *) &key,
272                                                            (void *) st->variables,
273                                                            st->nvariables,
274                                                            sizeof(Variable),
275                                                            compareVariables);
276         if (var != NULL)
277                 return var->value;
278         else
279                 return NULL;
280 }
281
282 static int
283 putVariable(CState * st, char *name, char *value)
284 {
285         Variable        key,
286                            *var;
287
288         key.name = name;
289         /* On some versions of Solaris, bsearch of zero items dumps core */
290         if (st->nvariables > 0)
291                 var = (Variable *) bsearch((void *) &key,
292                                                                    (void *) st->variables,
293                                                                    st->nvariables,
294                                                                    sizeof(Variable),
295                                                                    compareVariables);
296         else
297                 var = NULL;
298
299         if (var == NULL)
300         {
301                 Variable   *newvars;
302
303                 if (st->variables)
304                         newvars = (Variable *) realloc(st->variables,
305                                                                         (st->nvariables + 1) * sizeof(Variable));
306                 else
307                         newvars = (Variable *) malloc(sizeof(Variable));
308
309                 if (newvars == NULL)
310                         return false;
311
312                 st->variables = newvars;
313
314                 var = &newvars[st->nvariables];
315
316                 var->name = NULL;
317                 var->value = NULL;
318
319                 if ((var->name = strdup(name)) == NULL
320                         || (var->value = strdup(value)) == NULL)
321                 {
322                         free(var->name);
323                         free(var->value);
324                         return false;
325                 }
326
327                 st->nvariables++;
328
329                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
330                           compareVariables);
331         }
332         else
333         {
334                 if ((value = strdup(value)) == NULL)
335                         return false;
336                 free(var->value);
337                 var->value = value;
338         }
339
340         return true;
341 }
342
343 static char *
344 assignVariables(CState * st, char *sql)
345 {
346         int                     i,
347                                 j;
348         char       *p,
349                            *name,
350                            *val;
351         void       *tmp;
352
353         i = 0;
354         while ((p = strchr(&sql[i], ':')) != NULL)
355         {
356                 i = j = p - sql;
357                 do
358                 {
359                         i++;
360                 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
361                 if (i == j + 1)
362                         continue;
363
364                 name = malloc(i - j);
365                 if (name == NULL)
366                         return NULL;
367                 memcpy(name, &sql[j + 1], i - (j + 1));
368                 name[i - (j + 1)] = '\0';
369                 val = getVariable(st, name);
370                 free(name);
371                 if (val == NULL)
372                         continue;
373
374                 if (strlen(val) > i - j)
375                 {
376                         tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
377                         if (tmp == NULL)
378                         {
379                                 free(sql);
380                                 return NULL;
381                         }
382                         sql = tmp;
383                 }
384
385                 if (strlen(val) != i - j)
386                         memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
387
388                 strncpy(&sql[j], val, strlen(val));
389
390                 if (strlen(val) < i - j)
391                 {
392                         tmp = realloc(sql, strlen(sql) + 1);
393                         if (tmp == NULL)
394                         {
395                                 free(sql);
396                                 return NULL;
397                         }
398                         sql = tmp;
399                 }
400
401                 i = j + strlen(val);
402         }
403
404         return sql;
405 }
406
407 static void
408 doCustom(CState * state, int n, int debug)
409 {
410         PGresult   *res;
411         CState     *st = &state[n];
412         Command   **commands;
413
414 top:
415         commands = sql_files[st->use_file];
416
417         if (st->listen)
418         {                                                       /* are we receiver? */
419                 if (commands[st->state]->type == SQL_COMMAND)
420                 {
421                         if (debug)
422                                 fprintf(stderr, "client %d receiving\n", n);
423                         if (!PQconsumeInput(st->con))
424                         {                                       /* there's something wrong */
425                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
426                                 remains--;              /* I've aborted */
427                                 PQfinish(st->con);
428                                 st->con = NULL;
429                                 return;
430                         }
431                         if (PQisBusy(st->con))
432                                 return;                 /* don't have the whole result yet */
433                 }
434
435                 /*
436                  * transaction finished: record the time it took in the log
437                  */
438                 if (use_log && commands[st->state + 1] == NULL)
439                 {
440                         double          diff;
441                         struct timeval now;
442
443                         gettimeofday(&now, NULL);
444                         diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
445                                 (int) (now.tv_usec - st->txn_begin.tv_usec);
446
447                         fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
448                 }
449
450                 if (commands[st->state]->type == SQL_COMMAND)
451                 {
452                         res = PQgetResult(st->con);
453                         if (strncasecmp(commands[st->state]->argv[0], "select", 6) != 0)
454                         {
455                                 if (check(state, res, n, PGRES_COMMAND_OK))
456                                         return;
457                         }
458                         else
459                         {
460                                 if (check(state, res, n, PGRES_TUPLES_OK))
461                                         return;
462                         }
463                         PQclear(res);
464                         discard_response(st);
465                 }
466
467                 if (commands[st->state + 1] == NULL)
468                 {
469                         if (is_connect)
470                         {
471                                 PQfinish(st->con);
472                                 st->con = NULL;
473                         }
474
475                         if (++st->cnt >= nxacts)
476                         {
477                                 remains--;              /* I've done */
478                                 if (st->con != NULL)
479                                 {
480                                         PQfinish(st->con);
481                                         st->con = NULL;
482                                 }
483                                 return;
484                         }
485                 }
486
487                 /* increment state counter */
488                 st->state++;
489                 if (commands[st->state] == NULL)
490                 {
491                         st->state = 0;
492                         st->use_file = getrand(0, num_files - 1);
493                         commands = sql_files[st->use_file];
494                 }
495         }
496
497         if (st->con == NULL)
498         {
499                 if ((st->con = doConnect()) == NULL)
500                 {
501                         fprintf(stderr, "Client %d aborted in establishing connection.\n",
502                                         n);
503                         remains--;                      /* I've aborted */
504                         PQfinish(st->con);
505                         st->con = NULL;
506                         return;
507                 }
508         }
509
510         if (use_log && st->state == 0)
511                 gettimeofday(&(st->txn_begin), NULL);
512
513         if (commands[st->state]->type == SQL_COMMAND)
514         {
515                 char       *sql;
516
517                 if ((sql = strdup(commands[st->state]->argv[0])) == NULL
518                         || (sql = assignVariables(st, sql)) == NULL)
519                 {
520                         fprintf(stderr, "out of memory\n");
521                         st->ecnt++;
522                         return;
523                 }
524
525                 if (debug)
526                         fprintf(stderr, "client %d sending %s\n", n, sql);
527                 if (PQsendQuery(st->con, sql) == 0)
528                 {
529                         if (debug)
530                                 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
531                         st->ecnt++;
532                 }
533                 else
534                 {
535                         st->listen = 1;         /* flags that should be listened */
536                 }
537                 free(sql);
538         }
539         else if (commands[st->state]->type == META_COMMAND)
540         {
541                 int                     argc = commands[st->state]->argc,
542                                         i;
543                 char      **argv = commands[st->state]->argv;
544
545                 if (debug)
546                 {
547                         fprintf(stderr, "client %d executing \\%s", n, argv[0]);
548                         for (i = 1; i < argc; i++)
549                                 fprintf(stderr, " %s", argv[i]);
550                         fprintf(stderr, "\n");
551                 }
552
553                 if (strcasecmp(argv[0], "setrandom") == 0)
554                 {
555                         char       *var;
556                         int                     min,
557                                                 max;
558                         char            res[64];
559
560                         if (*argv[2] == ':')
561                         {
562                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
563                                 {
564                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
565                                         st->ecnt++;
566                                         return;
567                                 }
568                                 min = atoi(var);
569                         }
570                         else
571                                 min = atoi(argv[2]);
572
573                         if (min < 0)
574                         {
575                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
576                                 st->ecnt++;
577                                 return;
578                         }
579
580                         if (*argv[3] == ':')
581                         {
582                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
583                                 {
584                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
585                                         st->ecnt++;
586                                         return;
587                                 }
588                                 max = atoi(var);
589                         }
590                         else
591                                 max = atoi(argv[3]);
592
593                         if (max < min || max > MAX_RANDOM_VALUE)
594                         {
595                                 fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
596                                 st->ecnt++;
597                                 return;
598                         }
599
600                         snprintf(res, sizeof(res), "%d", getrand(min, max));
601
602                         if (putVariable(st, argv[1], res) == false)
603                         {
604                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
605                                 st->ecnt++;
606                                 return;
607                         }
608
609                         st->listen = 1;
610                 }
611                 else if (strcasecmp(argv[0], "set") == 0)
612                 {
613                         char       *var;
614                         int                     ope1,
615                                                 ope2;
616                         char            res[64];
617
618                         if (*argv[2] == ':')
619                         {
620                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
621                                 {
622                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
623                                         st->ecnt++;
624                                         return;
625                                 }
626                                 ope1 = atoi(var);
627                         }
628                         else
629                                 ope1 = atoi(argv[2]);
630
631                         if (argc < 5)
632                                 snprintf(res, sizeof(res), "%d", ope1);
633                         else
634                         {
635                                 if (*argv[4] == ':')
636                                 {
637                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
638                                         {
639                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
640                                                 st->ecnt++;
641                                                 return;
642                                         }
643                                         ope2 = atoi(var);
644                                 }
645                                 else
646                                         ope2 = atoi(argv[4]);
647
648                                 if (strcmp(argv[3], "+") == 0)
649                                         snprintf(res, sizeof(res), "%d", ope1 + ope2);
650                                 else if (strcmp(argv[3], "-") == 0)
651                                         snprintf(res, sizeof(res), "%d", ope1 - ope2);
652                                 else if (strcmp(argv[3], "*") == 0)
653                                         snprintf(res, sizeof(res), "%d", ope1 * ope2);
654                                 else if (strcmp(argv[3], "/") == 0)
655                                 {
656                                         if (ope2 == 0)
657                                         {
658                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
659                                                 st->ecnt++;
660                                                 return;
661                                         }
662                                         snprintf(res, sizeof(res), "%d", ope1 / ope2);
663                                 }
664                                 else
665                                 {
666                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
667                                         st->ecnt++;
668                                         return;
669                                 }
670                         }
671
672                         if (putVariable(st, argv[1], res) == false)
673                         {
674                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
675                                 st->ecnt++;
676                                 return;
677                         }
678
679                         st->listen = 1;
680                 }
681
682                 goto top;
683         }
684 }
685
686 /* discard connections */
687 static void
688 disconnect_all(CState * state)
689 {
690         int                     i;
691
692         for (i = 0; i < nclients; i++)
693         {
694                 if (state[i].con)
695                         PQfinish(state[i].con);
696         }
697 }
698
699 /* create tables and setup data */
700 static void
701 init(void)
702 {
703         PGconn     *con;
704         PGresult   *res;
705         static char *DDLs[] = {
706                 "drop table branches",
707                 "create table branches(bid int not null,bbalance int,filler char(88))",
708                 "drop table tellers",
709                 "create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
710                 "drop table accounts",
711                 "create table accounts(aid int not null,bid int,abalance int,filler char(84))",
712                 "drop table history",
713         "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
714         static char *DDLAFTERs[] = {
715                 "alter table branches add primary key (bid)",
716                 "alter table tellers add primary key (tid)",
717         "alter table accounts add primary key (aid)"};
718
719
720         char            sql[256];
721
722         int                     i;
723
724         if ((con = doConnect()) == NULL)
725                 exit(1);
726
727         for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
728         {
729                 res = PQexec(con, DDLs[i]);
730                 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
731                 {
732                         fprintf(stderr, "%s", PQerrorMessage(con));
733                         exit(1);
734                 }
735                 PQclear(res);
736         }
737
738         res = PQexec(con, "begin");
739         if (PQresultStatus(res) != PGRES_COMMAND_OK)
740         {
741                 fprintf(stderr, "%s", PQerrorMessage(con));
742                 exit(1);
743         }
744         PQclear(res);
745
746         for (i = 0; i < nbranches * tps; i++)
747         {
748                 snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
749                 res = PQexec(con, sql);
750                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
751                 {
752                         fprintf(stderr, "%s", PQerrorMessage(con));
753                         exit(1);
754                 }
755                 PQclear(res);
756         }
757
758         for (i = 0; i < ntellers * tps; i++)
759         {
760                 snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
761                                  ,i + 1, i / ntellers + 1);
762                 res = PQexec(con, sql);
763                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
764                 {
765                         fprintf(stderr, "%s", PQerrorMessage(con));
766                         exit(1);
767                 }
768                 PQclear(res);
769         }
770
771         res = PQexec(con, "end");
772         if (PQresultStatus(res) != PGRES_COMMAND_OK)
773         {
774                 fprintf(stderr, "%s", PQerrorMessage(con));
775                 exit(1);
776         }
777         PQclear(res);
778
779         /*
780          * occupy accounts table with some data
781          */
782         fprintf(stderr, "creating tables...\n");
783         for (i = 0; i < naccounts * tps; i++)
784         {
785                 int                     j = i + 1;
786
787                 if (j % 10000 == 1)
788                 {
789                         res = PQexec(con, "copy accounts from stdin");
790                         if (PQresultStatus(res) != PGRES_COPY_IN)
791                         {
792                                 fprintf(stderr, "%s", PQerrorMessage(con));
793                                 exit(1);
794                         }
795                         PQclear(res);
796                 }
797
798                 snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
799                 if (PQputline(con, sql))
800                 {
801                         fprintf(stderr, "PQputline failed\n");
802                         exit(1);
803                 }
804
805                 if (j % 10000 == 0)
806                 {
807                         /*
808                          * every 10000 tuples, we commit the copy command. this should
809                          * avoid generating too much WAL logs
810                          */
811                         fprintf(stderr, "%d tuples done.\n", j);
812                         if (PQputline(con, "\\.\n"))
813                         {
814                                 fprintf(stderr, "very last PQputline failed\n");
815                                 exit(1);
816                         }
817
818                         if (PQendcopy(con))
819                         {
820                                 fprintf(stderr, "PQendcopy failed\n");
821                                 exit(1);
822                         }
823
824 #ifdef NOT_USED
825
826                         /*
827                          * do a checkpoint to purge the old WAL logs
828                          */
829                         res = PQexec(con, "checkpoint");
830                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
831                         {
832                                 fprintf(stderr, "%s", PQerrorMessage(con));
833                                 exit(1);
834                         }
835                         PQclear(res);
836 #endif   /* NOT_USED */
837                 }
838         }
839         fprintf(stderr, "set primary key...\n");
840         for (i = 0; i < (sizeof(DDLAFTERs) / sizeof(char *)); i++)
841         {
842                 res = PQexec(con, DDLAFTERs[i]);
843                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
844                 {
845                         fprintf(stderr, "%s", PQerrorMessage(con));
846                         exit(1);
847                 }
848                 PQclear(res);
849         }
850
851         /* vacuum */
852         fprintf(stderr, "vacuum...");
853         res = PQexec(con, "vacuum analyze");
854         if (PQresultStatus(res) != PGRES_COMMAND_OK)
855         {
856                 fprintf(stderr, "%s", PQerrorMessage(con));
857                 exit(1);
858         }
859         PQclear(res);
860         fprintf(stderr, "done.\n");
861
862         PQfinish(con);
863 }
864
865 static Command *
866 process_commands(char *buf)
867 {
868         const char      delim[] = " \f\n\r\t\v";
869
870         Command    *my_commands;
871         int                     j;
872         char       *p,
873                            *tok;
874
875         if ((p = strchr(buf, '\n')) != NULL)
876                 *p = '\0';
877
878         p = buf;
879         while (isspace((unsigned char) *p))
880                 p++;
881
882         if (*p == '\0' || strncmp(p, "--", 2) == 0)
883         {
884                 return NULL;
885         }
886
887         my_commands = (Command *) malloc(sizeof(Command));
888         if (my_commands == NULL)
889         {
890                 return NULL;
891         }
892
893         my_commands->argc = 0;
894
895         if (*p == '\\')
896         {
897                 my_commands->type = META_COMMAND;
898
899                 j = 0;
900                 tok = strtok(++p, delim);
901
902                 while (tok != NULL)
903                 {
904                         if ((my_commands->argv[j] = strdup(tok)) == NULL)
905                                 return NULL;
906
907                         my_commands->argc++;
908
909                         j++;
910                         tok = strtok(NULL, delim);
911                 }
912
913                 if (strcasecmp(my_commands->argv[0], "setrandom") == 0)
914                 {
915                         if (my_commands->argc < 4)
916                         {
917                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
918                                 return NULL;
919                         }
920
921                         for (j = 4; j < my_commands->argc; j++)
922                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
923                                                 my_commands->argv[0], my_commands->argv[j]);
924                 }
925                 else if (strcasecmp(my_commands->argv[0], "set") == 0)
926                 {
927                         if (my_commands->argc < 3)
928                         {
929                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
930                                 return NULL;
931                         }
932
933                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
934                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
935                                                 my_commands->argv[0], my_commands->argv[j]);
936                 }
937                 else
938                 {
939                         fprintf(stderr, "invalid command %s\n", my_commands->argv[0]);
940                         return NULL;
941                 }
942         }
943         else
944         {
945                 my_commands->type = SQL_COMMAND;
946
947                 if ((my_commands->argv[0] = strdup(p)) == NULL)
948                         return NULL;
949
950                 my_commands->argc++;
951         }
952
953         return my_commands;
954 }
955
956 static int
957 process_file(char *filename)
958 {
959 #define COMMANDS_ALLOC_NUM 128
960
961         Command   **my_commands;
962         FILE       *fd;
963         int                     lineno;
964         char            buf[BUFSIZ];
965         int                     alloc_num;
966
967         if (num_files >= MAX_FILES)
968         {
969                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
970                 exit(1);
971         }
972
973         alloc_num = COMMANDS_ALLOC_NUM;
974         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
975         if (my_commands == NULL)
976                 return false;
977
978         if (strcmp(filename, "-") == 0)
979                 fd = stdin;
980         else if ((fd = fopen(filename, "r")) == NULL)
981         {
982                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
983                 return false;
984         }
985
986         lineno = 0;
987
988         while (fgets(buf, sizeof(buf), fd) != NULL)
989         {
990                 Command    *commands;
991                 int                     i;
992
993                 i = 0;
994                 while (isspace((unsigned char) buf[i]))
995                         i++;
996
997                 if (strncmp(&buf[i], "\n", 1) != 0 && strncmp(&buf[i], "--", 2) != 0) {
998                         commands = process_commands(&buf[i]);
999                         if (commands == NULL)
1000                         {
1001                                 fclose(fd);
1002                                 return false;
1003                         }
1004                 } else
1005                         continue;
1006
1007                 my_commands[lineno] = commands;
1008                 lineno++;
1009
1010                 if (lineno >= alloc_num)
1011                 {
1012                         alloc_num += COMMANDS_ALLOC_NUM;
1013                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1014                         if (my_commands == NULL)
1015                         {
1016                                 fclose(fd);
1017                                 return false;
1018                         }
1019                 }
1020         }
1021         fclose(fd);
1022
1023         my_commands[lineno] = NULL;
1024
1025         sql_files[num_files++] = my_commands;
1026
1027         return true;
1028 }
1029
1030 static Command **
1031 process_builtin(char *tb)
1032 {
1033 #define COMMANDS_ALLOC_NUM 128
1034
1035         Command   **my_commands;
1036         int                     lineno;
1037         char            buf[BUFSIZ];
1038         int                     alloc_num;
1039
1040         if (*tb == '\0')
1041                 return NULL;
1042
1043         alloc_num = COMMANDS_ALLOC_NUM;
1044         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
1045         if (my_commands == NULL)
1046                 return NULL;
1047
1048         lineno = 0;
1049
1050         for (;;)
1051         {
1052                 char       *p;
1053                 Command    *commands;
1054
1055                 p = buf;
1056                 while (*tb && *tb != '\n')
1057                         *p++ = *tb++;
1058
1059                 if (*tb == '\0')
1060                         break;
1061
1062                 if (*tb == '\n')
1063                         tb++;
1064
1065                 *p = '\0';
1066
1067                 commands = process_commands(buf);
1068                 if (commands == NULL)
1069                 {
1070                         return NULL;
1071                 }
1072
1073                 my_commands[lineno] = commands;
1074                 lineno++;
1075
1076                 if (lineno >= alloc_num)
1077                 {
1078                         alloc_num += COMMANDS_ALLOC_NUM;
1079                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1080                         if (my_commands == NULL)
1081                         {
1082                                 return NULL;
1083                         }
1084                 }
1085         }
1086
1087         my_commands[lineno] = NULL;
1088
1089         return my_commands;
1090 }
1091
1092 /* print out results */
1093 static void
1094 printResults(
1095                          int ttype, CState * state,
1096                          struct timeval * tv1, struct timeval * tv2,
1097                          struct timeval * tv3)
1098 {
1099         double          t1,
1100                                 t2;
1101         int                     i;
1102         int                     normal_xacts = 0;
1103         char       *s;
1104
1105         for (i = 0; i < nclients; i++)
1106                 normal_xacts += state[i].cnt;
1107
1108         t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
1109         t1 = normal_xacts * 1000000.0 / t1;
1110
1111         t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
1112         t2 = normal_xacts * 1000000.0 / t2;
1113
1114         if (ttype == 0)
1115                 s = "TPC-B (sort of)";
1116         else if (ttype == 2)
1117                 s = "Update only accounts";
1118         else if (ttype == 1)
1119                 s = "SELECT only";
1120         else
1121                 s = "Custom query";
1122
1123         printf("transaction type: %s\n", s);
1124         printf("scaling factor: %d\n", tps);
1125         printf("number of clients: %d\n", nclients);
1126         printf("number of transactions per client: %d\n", nxacts);
1127         printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
1128         printf("tps = %f (including connections establishing)\n", t1);
1129         printf("tps = %f (excluding connections establishing)\n", t2);
1130 }
1131
1132
1133 int
1134 main(int argc, char **argv)
1135 {
1136         int                     c;
1137         int                     is_init_mode = 0;               /* initialize mode? */
1138         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
1139         int                     is_full_vacuum = 0;             /* do full vacuum before testing? */
1140         int                     debug = 0;              /* debug flag */
1141         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
1142                                                                  * 2: skip update of branches and tellers */
1143         char       *filename = NULL;
1144
1145         CState     *state;                      /* status of clients */
1146
1147         struct timeval tv1;                     /* start up time */
1148         struct timeval tv2;                     /* after establishing all connections to the
1149                                                                  * backend */
1150         struct timeval tv3;                     /* end time */
1151
1152         int                     i;
1153
1154         fd_set          input_mask;
1155         int                     nsocks;                 /* return from select(2) */
1156         int                     maxsock;                /* max socket number to be waited */
1157
1158 #if !(defined(__CYGWIN__) || defined(__MINGW32__))
1159         struct rlimit rlim;
1160 #endif
1161
1162         PGconn     *con;
1163         PGresult   *res;
1164         char       *env;
1165
1166         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
1167                 pghost = env;
1168         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
1169                 pgport = env;
1170         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
1171                 login = env;
1172
1173         state = (CState *) malloc(sizeof(CState));
1174         if (state == NULL)
1175         {
1176                 fprintf(stderr, "Couldn't allocate memory for state\n");
1177                 exit(1);
1178         }
1179
1180         memset(state, 0, sizeof(*state));
1181
1182         while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:D:")) != -1)
1183         {
1184                 switch (c)
1185                 {
1186                         case 'i':
1187                                 is_init_mode++;
1188                                 break;
1189                         case 'h':
1190                                 pghost = optarg;
1191                                 break;
1192                         case 'n':
1193                                 is_no_vacuum++;
1194                                 break;
1195                         case 'v':
1196                                 is_full_vacuum++;
1197                                 break;
1198                         case 'p':
1199                                 pgport = optarg;
1200                                 break;
1201                         case 'd':
1202                                 debug++;
1203                                 break;
1204                         case 'S':
1205                                 ttype = 1;
1206                                 break;
1207                         case 'N':
1208                                 ttype = 2;
1209                                 break;
1210                         case 'c':
1211                                 nclients = atoi(optarg);
1212                                 if (nclients <= 0 || nclients > MAXCLIENTS)
1213                                 {
1214                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
1215                                         exit(1);
1216                                 }
1217 #if !(defined(__CYGWIN__) || defined(__MINGW32__))
1218 #ifdef RLIMIT_NOFILE                    /* most platform uses RLIMIT_NOFILE */
1219                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
1220 #else                                                   /* but BSD doesn't ... */
1221                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
1222 #endif   /* RLIMIT_NOFILE */
1223                                 {
1224                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
1225                                         exit(1);
1226                                 }
1227                                 if (rlim.rlim_cur <= (nclients + 2))
1228                                 {
1229                                         fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
1230                                         fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
1231                                         exit(1);
1232                                 }
1233 #endif
1234                                 break;
1235                         case 'C':
1236                                 is_connect = 1;
1237                                 break;
1238                         case 's':
1239                                 tps = atoi(optarg);
1240                                 if (tps <= 0)
1241                                 {
1242                                         fprintf(stderr, "invalid scaling factor: %d\n", tps);
1243                                         exit(1);
1244                                 }
1245                                 break;
1246                         case 't':
1247                                 nxacts = atoi(optarg);
1248                                 if (nxacts <= 0)
1249                                 {
1250                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
1251                                         exit(1);
1252                                 }
1253                                 break;
1254                         case 'U':
1255                                 login = optarg;
1256                                 break;
1257                         case 'P':
1258                                 pwd = optarg;
1259                                 break;
1260                         case 'l':
1261                                 use_log = true;
1262                                 break;
1263                         case 'f':
1264                                 ttype = 3;
1265                                 filename = optarg;
1266                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
1267                                         exit(1);
1268                                 break;
1269                         case 'D':
1270                                 {
1271                                         char       *p;
1272
1273                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
1274                                         {
1275                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
1276                                                 exit(1);
1277                                         }
1278
1279                                         *p++ = '\0';
1280                                         if (putVariable(&state[0], optarg, p) == false)
1281                                         {
1282                                                 fprintf(stderr, "Couldn't allocate memory for variable\n");
1283                                                 exit(1);
1284                                         }
1285                                 }
1286                                 break;
1287                         default:
1288                                 usage();
1289                                 exit(1);
1290                                 break;
1291                 }
1292         }
1293
1294         if (argc > optind)
1295                 dbName = argv[optind];
1296         else
1297         {
1298                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
1299                         dbName = env;
1300                 else if (login != NULL && *login != '\0')
1301                         dbName = login;
1302                 else
1303                         dbName = "";
1304         }
1305
1306         if (is_init_mode)
1307         {
1308                 init();
1309                 exit(0);
1310         }
1311
1312         remains = nclients;
1313
1314         if (getVariable(&state[0], "tps") == NULL)
1315         {
1316                 char            val[64];
1317
1318                 snprintf(val, sizeof(val), "%d", tps);
1319                 if (putVariable(&state[0], "tps", val) == false)
1320                 {
1321                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1322                         exit(1);
1323                 }
1324         }
1325
1326         if (nclients > 1)
1327         {
1328                 state = (CState *) realloc(state, sizeof(CState) * nclients);
1329                 if (state == NULL)
1330                 {
1331                         fprintf(stderr, "Couldn't allocate memory for state\n");
1332                         exit(1);
1333                 }
1334
1335                 memset(state + sizeof(*state), 0, sizeof(*state) * (nclients - 1));
1336
1337                 for (i = 1; i < nclients; i++)
1338                 {
1339                         int                     j;
1340
1341                         for (j = 0; j < state[0].nvariables; j++)
1342                         {
1343                                 if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
1344                                 {
1345                                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1346                                         exit(1);
1347                                 }
1348                         }
1349                 }
1350         }
1351
1352         if (use_log)
1353         {
1354                 char            logpath[64];
1355
1356                 snprintf(logpath, 64, "pgbench_log.%d", getpid());
1357                 LOGFILE = fopen(logpath, "w");
1358
1359                 if (LOGFILE == NULL)
1360                 {
1361                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
1362                         exit(1);
1363                 }
1364         }
1365
1366         if (debug)
1367         {
1368                 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
1369                            pghost, pgport, nclients, nxacts, dbName);
1370         }
1371
1372         /* opening connection... */
1373         con = doConnect();
1374         if (con == NULL)
1375                 exit(1);
1376
1377         if (PQstatus(con) == CONNECTION_BAD)
1378         {
1379                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
1380                 fprintf(stderr, "%s", PQerrorMessage(con));
1381                 exit(1);
1382         }
1383
1384         if (ttype != 3)
1385         {
1386                 /*
1387                  * get the scaling factor that should be same as count(*) from
1388                  * branches if this is not a custom query
1389                  */
1390                 res = PQexec(con, "select count(*) from branches");
1391                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1392                 {
1393                         fprintf(stderr, "%s", PQerrorMessage(con));
1394                         exit(1);
1395                 }
1396                 tps = atoi(PQgetvalue(res, 0, 0));
1397                 if (tps < 0)
1398                 {
1399                         fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
1400                         exit(1);
1401                 }
1402                 PQclear(res);
1403         }
1404
1405         if (!is_no_vacuum)
1406         {
1407                 fprintf(stderr, "starting vacuum...");
1408                 res = PQexec(con, "vacuum branches");
1409                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1410                 {
1411                         fprintf(stderr, "%s", PQerrorMessage(con));
1412                         exit(1);
1413                 }
1414                 PQclear(res);
1415
1416                 res = PQexec(con, "vacuum tellers");
1417                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1418                 {
1419                         fprintf(stderr, "%s", PQerrorMessage(con));
1420                         exit(1);
1421                 }
1422                 PQclear(res);
1423
1424                 res = PQexec(con, "delete from history");
1425                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1426                 {
1427                         fprintf(stderr, "%s", PQerrorMessage(con));
1428                         exit(1);
1429                 }
1430                 PQclear(res);
1431                 res = PQexec(con, "vacuum history");
1432                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1433                 {
1434                         fprintf(stderr, "%s", PQerrorMessage(con));
1435                         exit(1);
1436                 }
1437                 PQclear(res);
1438
1439                 fprintf(stderr, "end.\n");
1440
1441                 if (is_full_vacuum)
1442                 {
1443                         fprintf(stderr, "starting full vacuum...");
1444                         res = PQexec(con, "vacuum analyze accounts");
1445                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1446                         {
1447                                 fprintf(stderr, "%s", PQerrorMessage(con));
1448                                 exit(1);
1449                         }
1450                         PQclear(res);
1451                         fprintf(stderr, "end.\n");
1452                 }
1453         }
1454         PQfinish(con);
1455
1456         /* set random seed */
1457         gettimeofday(&tv1, NULL);
1458         srand((unsigned int) tv1.tv_usec);
1459
1460         /* get start up time */
1461         gettimeofday(&tv1, NULL);
1462
1463         if (is_connect == 0)
1464         {
1465                 /* make connections to the database */
1466                 for (i = 0; i < nclients; i++)
1467                 {
1468                         state[i].id = i;
1469                         if ((state[i].con = doConnect()) == NULL)
1470                                 exit(1);
1471                 }
1472         }
1473
1474         /* time after connections set up */
1475         gettimeofday(&tv2, NULL);
1476
1477         /* process bultin SQL scripts */
1478         switch (ttype)
1479         {
1480                         char            buf[128];
1481
1482                 case 0:
1483                         sql_files[0] = process_builtin(tpc_b);
1484                         snprintf(buf, sizeof(buf), "%d", 100000 * tps);
1485                         sql_files[0][0]->argv[3] = strdup(buf);
1486                         snprintf(buf, sizeof(buf), "%d", 1 * tps);
1487                         sql_files[0][1]->argv[3] = strdup(buf);
1488                         snprintf(buf, sizeof(buf), "%d", 10 * tps);
1489                         sql_files[0][2]->argv[3] = strdup(buf);
1490                         snprintf(buf, sizeof(buf), "%d", 10000 * tps);
1491                         sql_files[0][3]->argv[3] = strdup(buf);
1492                         num_files = 1;
1493                         break;
1494                 case 1:
1495                         sql_files[0] = process_builtin(select_only);
1496                         snprintf(buf, sizeof(buf), "%d", 100000 * tps);
1497                         sql_files[0][0]->argv[3] = strdup(buf);
1498                         num_files = 1;
1499                         break;
1500                 case 2:
1501                         sql_files[0] = process_builtin(simple_update);
1502                         snprintf(buf, sizeof(buf), "%d", 100000 * tps);
1503                         sql_files[0][0]->argv[3] = strdup(buf);
1504                         snprintf(buf, sizeof(buf), "%d", 1 * tps);
1505                         sql_files[0][1]->argv[3] = strdup(buf);
1506                         snprintf(buf, sizeof(buf), "%d", 10 * tps);
1507                         sql_files[0][2]->argv[3] = strdup(buf);
1508                         snprintf(buf, sizeof(buf), "%d", 10000 * tps);
1509                         sql_files[0][3]->argv[3] = strdup(buf);
1510                         num_files = 1;
1511                         break;
1512                 default:
1513                         break;
1514         }
1515
1516         /* send start up queries in async manner */
1517         for (i = 0; i < nclients; i++)
1518         {
1519                 Command   **commands = sql_files[state[i].use_file];
1520                 int                     prev_ecnt = state[i].ecnt;
1521
1522                 state[i].use_file = getrand(0, num_files - 1);
1523                 doCustom(state, i, debug);
1524
1525                 if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1526                 {
1527                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1528                         remains--;                              /* I've aborted */
1529                         PQfinish(state[i].con);
1530                         state[i].con = NULL;
1531                 }
1532         }
1533
1534         for (;;)
1535         {
1536                 if (remains <= 0)
1537                 {                                               /* all done ? */
1538                         disconnect_all(state);
1539                         /* get end time */
1540                         gettimeofday(&tv3, NULL);
1541                         printResults(ttype, state, &tv1, &tv2, &tv3);
1542                         if (LOGFILE)
1543                                 fclose(LOGFILE);
1544                         exit(0);
1545                 }
1546
1547                 FD_ZERO(&input_mask);
1548
1549                 maxsock = -1;
1550                 for (i = 0; i < nclients; i++)
1551                 {
1552                         Command   **commands = sql_files[state[i].use_file];
1553
1554                         if (state[i].con && commands[state[i].state]->type != META_COMMAND)
1555                         {
1556                                 int                     sock = PQsocket(state[i].con);
1557
1558                                 if (sock < 0)
1559                                 {
1560                                         disconnect_all(state);
1561                                         exit(1);
1562                                 }
1563                                 FD_SET(sock, &input_mask);
1564                                 if (maxsock < sock)
1565                                         maxsock = sock;
1566                         }
1567                 }
1568
1569                 if (maxsock != -1)
1570                 {
1571                         if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
1572                                                           (fd_set *) NULL, (struct timeval *) NULL)) < 0)
1573                         {
1574                                 if (errno == EINTR)
1575                                         continue;
1576                                 /* must be something wrong */
1577                                 disconnect_all(state);
1578                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
1579                                 exit(1);
1580                         }
1581                         else if (nsocks == 0)
1582                         {                                       /* timeout */
1583                                 fprintf(stderr, "select timeout\n");
1584                                 for (i = 0; i < nclients; i++)
1585                                 {
1586                                         fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
1587                                                         i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
1588                                 }
1589                                 exit(0);
1590                         }
1591                 }
1592
1593                 /* ok, backend returns reply */
1594                 for (i = 0; i < nclients; i++)
1595                 {
1596                         Command   **commands = sql_files[state[i].use_file];
1597                         int                     prev_ecnt = state[i].ecnt;
1598
1599                         if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
1600                                                   || commands[state[i].state]->type == META_COMMAND))
1601                         {
1602                                 doCustom(state, i, debug);
1603                         }
1604
1605                         if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1606                         {
1607                                 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1608                                 remains--;                              /* I've aborted */
1609                                 PQfinish(state[i].con);
1610                                 state[i].con = NULL;
1611                         }
1612                 }
1613         }
1614 }