]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
pgindent run for 8.2.
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.56 2006/10/04 00:29:45 momjian Exp $
3  *
4  * pgbench: a simple benchmark program for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2000-2006      Tatsuo Ishii
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  */
20 #include "postgres_fe.h"
21
22 #include "libpq-fe.h"
23
24 #include <ctype.h>
25
26 #ifdef WIN32
27 #include "win32.h"
28 #else
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #ifdef HAVE_GETOPT_H
33 #include <getopt.h>
34 #endif
35
36 #ifdef HAVE_SYS_SELECT_H
37 #include <sys/select.h>
38 #endif
39
40 /* for getrlimit */
41 #include <sys/resource.h>
42 #endif   /* ! WIN32 */
43
44 extern char *optarg;
45 extern int      optind;
46
47 #ifdef WIN32
48 #undef select
49 #endif
50
51
52 /********************************************************************
53  * some configurable parameters */
54
55 #define MAXCLIENTS 1024                 /* max number of clients allowed */
56
57 int                     nclients = 1;           /* default number of simulated clients */
58 int                     nxacts = 10;            /* default number of transactions per clients */
59
60 /*
61  * scaling factor. for example, scale = 10 will make 1000000 tuples of
62  * accounts table.
63  */
64 int                     scale = 1;
65
66 /*
67  * end of configurable parameters
68  *********************************************************************/
69
70 #define nbranches       1
71 #define ntellers        10
72 #define naccounts       100000
73
74 FILE       *LOGFILE = NULL;
75
76 bool            use_log;                        /* log transaction latencies to a file */
77
78 int                     remains;                        /* number of remaining clients */
79
80 int                     is_connect;                     /* establish connection  for each transaction */
81
82 char       *pghost = "";
83 char       *pgport = NULL;
84 char       *pgoptions = NULL;
85 char       *pgtty = NULL;
86 char       *login = NULL;
87 char       *pwd = NULL;
88 char       *dbName;
89
90 /* variable definitions */
91 typedef struct
92 {
93         char       *name;                       /* variable name */
94         char       *value;                      /* its value */
95 }       Variable;
96
97 /*
98  * structures used in custom query mode
99  */
100
101 typedef struct
102 {
103         PGconn     *con;                        /* connection handle to DB */
104         int                     id;                             /* client No. */
105         int                     state;                  /* state No. */
106         int                     cnt;                    /* xacts count */
107         int                     ecnt;                   /* error count */
108         int                     listen;                 /* 0 indicates that an async query has been
109                                                                  * sent */
110         Variable   *variables;          /* array of variable definitions */
111         int                     nvariables;
112         struct timeval txn_begin;       /* used for measuring latencies */
113         int                     use_file;               /* index in sql_files for this client */
114 }       CState;
115
116 /*
117  * queries read from files
118  */
119 #define SQL_COMMAND             1
120 #define META_COMMAND    2
121 #define MAX_ARGS                10
122
123 typedef struct
124 {
125         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
126         int                     argc;                   /* number of commands */
127         char       *argv[MAX_ARGS]; /* command list */
128 }       Command;
129
130 #define MAX_FILES               128             /* max number of SQL script files allowed */
131
132 Command   **sql_files[MAX_FILES];               /* SQL script files */
133 int                     num_files;                      /* its number */
134
135 /* default scenario */
136 static char *tpc_b = {
137         "\\set nbranches :scale\n"
138         "\\set ntellers 10 * :scale\n"
139         "\\set naccounts 100000 * :scale\n"
140         "\\setrandom aid 1 :naccounts\n"
141         "\\setrandom bid 1 :nbranches\n"
142         "\\setrandom tid 1 :ntellers\n"
143         "\\setrandom delta -5000 5000\n"
144         "BEGIN;\n"
145         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
146         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
147         "UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
148         "UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
149         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
150         "END;\n"
151 };
152
153 /* -N case */
154 static char *simple_update = {
155         "\\set nbranches :scale\n"
156         "\\set ntellers 10 * :scale\n"
157         "\\set naccounts 100000 * :scale\n"
158         "\\setrandom aid 1 :naccounts\n"
159         "\\setrandom bid 1 :nbranches\n"
160         "\\setrandom tid 1 :ntellers\n"
161         "\\setrandom delta -5000 5000\n"
162         "BEGIN;\n"
163         "UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
164         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
165         "INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
166         "END;\n"
167 };
168
169 /* -S case */
170 static char *select_only = {
171         "\\set naccounts 100000 * :scale\n"
172         "\\setrandom aid 1 :naccounts\n"
173         "SELECT abalance FROM accounts WHERE aid = :aid;\n"
174 };
175
176 static void
177 usage(void)
178 {
179         fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-D varname=value][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
180         fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
181 }
182
183 /* random number generator */
184 static int
185 getrand(int min, int max)
186 {
187         return min + (int) (((max - min) * (double) random()) / MAX_RANDOM_VALUE + 0.5);
188 }
189
190 /* set up a connection to the backend */
191 static PGconn *
192 doConnect(void)
193 {
194         PGconn     *con;
195         PGresult   *res;
196
197         con = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
198                                            login, pwd);
199         if (con == NULL)
200         {
201                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
202                 fprintf(stderr, "Memory allocatin problem?\n");
203                 return (NULL);
204         }
205
206         if (PQstatus(con) == CONNECTION_BAD)
207         {
208                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
209
210                 if (PQerrorMessage(con))
211                         fprintf(stderr, "%s", PQerrorMessage(con));
212                 else
213                         fprintf(stderr, "No explanation from the backend\n");
214
215                 return (NULL);
216         }
217
218         res = PQexec(con, "SET search_path = public");
219         if (PQresultStatus(res) != PGRES_COMMAND_OK)
220         {
221                 fprintf(stderr, "%s", PQerrorMessage(con));
222                 exit(1);
223         }
224         PQclear(res);
225
226         return (con);
227 }
228
229 /* throw away response from backend */
230 static void
231 discard_response(CState * state)
232 {
233         PGresult   *res;
234
235         do
236         {
237                 res = PQgetResult(state->con);
238                 if (res)
239                         PQclear(res);
240         } while (res);
241 }
242
243 /* check to see if the SQL result was good */
244 static int
245 check(CState * state, PGresult *res, int n, int good)
246 {
247         CState     *st = &state[n];
248
249         if (res && PQresultStatus(res) != good)
250         {
251                 fprintf(stderr, "Client %d aborted in state %d: %s", n, st->state, PQerrorMessage(st->con));
252                 remains--;                              /* I've aborted */
253                 PQfinish(st->con);
254                 st->con = NULL;
255                 return (-1);
256         }
257         return (0);                                     /* OK */
258 }
259
260 static int
261 compareVariables(const void *v1, const void *v2)
262 {
263         return strcmp(((const Variable *) v1)->name,
264                                   ((const Variable *) v2)->name);
265 }
266
267 static char *
268 getVariable(CState * st, char *name)
269 {
270         Variable        key,
271                            *var;
272
273         /* On some versions of Solaris, bsearch of zero items dumps core */
274         if (st->nvariables <= 0)
275                 return NULL;
276
277         key.name = name;
278         var = (Variable *) bsearch((void *) &key,
279                                                            (void *) st->variables,
280                                                            st->nvariables,
281                                                            sizeof(Variable),
282                                                            compareVariables);
283         if (var != NULL)
284                 return var->value;
285         else
286                 return NULL;
287 }
288
289 static int
290 putVariable(CState * st, char *name, char *value)
291 {
292         Variable        key,
293                            *var;
294
295         key.name = name;
296         /* On some versions of Solaris, bsearch of zero items dumps core */
297         if (st->nvariables > 0)
298                 var = (Variable *) bsearch((void *) &key,
299                                                                    (void *) st->variables,
300                                                                    st->nvariables,
301                                                                    sizeof(Variable),
302                                                                    compareVariables);
303         else
304                 var = NULL;
305
306         if (var == NULL)
307         {
308                 Variable   *newvars;
309
310                 if (st->variables)
311                         newvars = (Variable *) realloc(st->variables,
312                                                                         (st->nvariables + 1) * sizeof(Variable));
313                 else
314                         newvars = (Variable *) malloc(sizeof(Variable));
315
316                 if (newvars == NULL)
317                         return false;
318
319                 st->variables = newvars;
320
321                 var = &newvars[st->nvariables];
322
323                 var->name = NULL;
324                 var->value = NULL;
325
326                 if ((var->name = strdup(name)) == NULL
327                         || (var->value = strdup(value)) == NULL)
328                 {
329                         free(var->name);
330                         free(var->value);
331                         return false;
332                 }
333
334                 st->nvariables++;
335
336                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
337                           compareVariables);
338         }
339         else
340         {
341                 char       *val;
342
343                 if ((val = strdup(value)) == NULL)
344                         return false;
345
346                 free(var->value);
347                 var->value = val;
348         }
349
350         return true;
351 }
352
353 static char *
354 assignVariables(CState * st, char *sql)
355 {
356         int                     i,
357                                 j;
358         char       *p,
359                            *name,
360                            *val;
361         void       *tmp;
362
363         i = 0;
364         while ((p = strchr(&sql[i], ':')) != NULL)
365         {
366                 i = j = p - sql;
367                 do
368                 {
369                         i++;
370                 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
371                 if (i == j + 1)
372                         continue;
373
374                 name = malloc(i - j);
375                 if (name == NULL)
376                         return NULL;
377                 memcpy(name, &sql[j + 1], i - (j + 1));
378                 name[i - (j + 1)] = '\0';
379                 val = getVariable(st, name);
380                 free(name);
381                 if (val == NULL)
382                         continue;
383
384                 if (strlen(val) > i - j)
385                 {
386                         tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
387                         if (tmp == NULL)
388                         {
389                                 free(sql);
390                                 return NULL;
391                         }
392                         sql = tmp;
393                 }
394
395                 if (strlen(val) != i - j)
396                         memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
397
398                 strncpy(&sql[j], val, strlen(val));
399
400                 if (strlen(val) < i - j)
401                 {
402                         tmp = realloc(sql, strlen(sql) + 1);
403                         if (tmp == NULL)
404                         {
405                                 free(sql);
406                                 return NULL;
407                         }
408                         sql = tmp;
409                 }
410
411                 i = j + strlen(val);
412         }
413
414         return sql;
415 }
416
417 static void
418 doCustom(CState * state, int n, int debug)
419 {
420         PGresult   *res;
421         CState     *st = &state[n];
422         Command   **commands;
423
424 top:
425         commands = sql_files[st->use_file];
426
427         if (st->listen)
428         {                                                       /* are we receiver? */
429                 if (commands[st->state]->type == SQL_COMMAND)
430                 {
431                         if (debug)
432                                 fprintf(stderr, "client %d receiving\n", n);
433                         if (!PQconsumeInput(st->con))
434                         {                                       /* there's something wrong */
435                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
436                                 remains--;              /* I've aborted */
437                                 PQfinish(st->con);
438                                 st->con = NULL;
439                                 return;
440                         }
441                         if (PQisBusy(st->con))
442                                 return;                 /* don't have the whole result yet */
443                 }
444
445                 /*
446                  * transaction finished: record the time it took in the log
447                  */
448                 if (use_log && commands[st->state + 1] == NULL)
449                 {
450                         double          diff;
451                         struct timeval now;
452
453                         gettimeofday(&now, NULL);
454                         diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
455                                 (int) (now.tv_usec - st->txn_begin.tv_usec);
456
457                         fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
458                 }
459
460                 if (commands[st->state]->type == SQL_COMMAND)
461                 {
462                         res = PQgetResult(st->con);
463                         if (pg_strncasecmp(commands[st->state]->argv[0], "select", 6) != 0)
464                         {
465                                 if (check(state, res, n, PGRES_COMMAND_OK))
466                                         return;
467                         }
468                         else
469                         {
470                                 if (check(state, res, n, PGRES_TUPLES_OK))
471                                         return;
472                         }
473                         PQclear(res);
474                         discard_response(st);
475                 }
476
477                 if (commands[st->state + 1] == NULL)
478                 {
479                         if (is_connect)
480                         {
481                                 PQfinish(st->con);
482                                 st->con = NULL;
483                         }
484
485                         if (++st->cnt >= nxacts)
486                         {
487                                 remains--;              /* I've done */
488                                 if (st->con != NULL)
489                                 {
490                                         PQfinish(st->con);
491                                         st->con = NULL;
492                                 }
493                                 return;
494                         }
495                 }
496
497                 /* increment state counter */
498                 st->state++;
499                 if (commands[st->state] == NULL)
500                 {
501                         st->state = 0;
502                         st->use_file = getrand(0, num_files - 1);
503                         commands = sql_files[st->use_file];
504                 }
505         }
506
507         if (st->con == NULL)
508         {
509                 if ((st->con = doConnect()) == NULL)
510                 {
511                         fprintf(stderr, "Client %d aborted in establishing connection.\n",
512                                         n);
513                         remains--;                      /* I've aborted */
514                         PQfinish(st->con);
515                         st->con = NULL;
516                         return;
517                 }
518         }
519
520         if (use_log && st->state == 0)
521                 gettimeofday(&(st->txn_begin), NULL);
522
523         if (commands[st->state]->type == SQL_COMMAND)
524         {
525                 char       *sql;
526
527                 if ((sql = strdup(commands[st->state]->argv[0])) == NULL
528                         || (sql = assignVariables(st, sql)) == NULL)
529                 {
530                         fprintf(stderr, "out of memory\n");
531                         st->ecnt++;
532                         return;
533                 }
534
535                 if (debug)
536                         fprintf(stderr, "client %d sending %s\n", n, sql);
537                 if (PQsendQuery(st->con, sql) == 0)
538                 {
539                         if (debug)
540                                 fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
541                         st->ecnt++;
542                 }
543                 else
544                 {
545                         st->listen = 1;         /* flags that should be listened */
546                 }
547                 free(sql);
548         }
549         else if (commands[st->state]->type == META_COMMAND)
550         {
551                 int                     argc = commands[st->state]->argc,
552                                         i;
553                 char      **argv = commands[st->state]->argv;
554
555                 if (debug)
556                 {
557                         fprintf(stderr, "client %d executing \\%s", n, argv[0]);
558                         for (i = 1; i < argc; i++)
559                                 fprintf(stderr, " %s", argv[i]);
560                         fprintf(stderr, "\n");
561                 }
562
563                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
564                 {
565                         char       *var;
566                         int                     min,
567                                                 max;
568                         char            res[64];
569
570                         if (*argv[2] == ':')
571                         {
572                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
573                                 {
574                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
575                                         st->ecnt++;
576                                         return;
577                                 }
578                                 min = atoi(var);
579                         }
580                         else
581                                 min = atoi(argv[2]);
582
583 #ifdef NOT_USED
584                         if (min < 0)
585                         {
586                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
587                                 st->ecnt++;
588                                 return;
589                         }
590 #endif
591
592                         if (*argv[3] == ':')
593                         {
594                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
595                                 {
596                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
597                                         st->ecnt++;
598                                         return;
599                                 }
600                                 max = atoi(var);
601                         }
602                         else
603                                 max = atoi(argv[3]);
604
605                         if (max < min || max > MAX_RANDOM_VALUE)
606                         {
607                                 fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
608                                 st->ecnt++;
609                                 return;
610                         }
611
612 #ifdef DEBUG
613                         printf("min: %d max: %d random: %d\n", min, max, getrand(min, max));
614 #endif
615                         snprintf(res, sizeof(res), "%d", getrand(min, max));
616
617                         if (putVariable(st, argv[1], res) == false)
618                         {
619                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
620                                 st->ecnt++;
621                                 return;
622                         }
623
624                         st->listen = 1;
625                 }
626                 else if (pg_strcasecmp(argv[0], "set") == 0)
627                 {
628                         char       *var;
629                         int                     ope1,
630                                                 ope2;
631                         char            res[64];
632
633                         if (*argv[2] == ':')
634                         {
635                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
636                                 {
637                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
638                                         st->ecnt++;
639                                         return;
640                                 }
641                                 ope1 = atoi(var);
642                         }
643                         else
644                                 ope1 = atoi(argv[2]);
645
646                         if (argc < 5)
647                                 snprintf(res, sizeof(res), "%d", ope1);
648                         else
649                         {
650                                 if (*argv[4] == ':')
651                                 {
652                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
653                                         {
654                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
655                                                 st->ecnt++;
656                                                 return;
657                                         }
658                                         ope2 = atoi(var);
659                                 }
660                                 else
661                                         ope2 = atoi(argv[4]);
662
663                                 if (strcmp(argv[3], "+") == 0)
664                                         snprintf(res, sizeof(res), "%d", ope1 + ope2);
665                                 else if (strcmp(argv[3], "-") == 0)
666                                         snprintf(res, sizeof(res), "%d", ope1 - ope2);
667                                 else if (strcmp(argv[3], "*") == 0)
668                                         snprintf(res, sizeof(res), "%d", ope1 * ope2);
669                                 else if (strcmp(argv[3], "/") == 0)
670                                 {
671                                         if (ope2 == 0)
672                                         {
673                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
674                                                 st->ecnt++;
675                                                 return;
676                                         }
677                                         snprintf(res, sizeof(res), "%d", ope1 / ope2);
678                                 }
679                                 else
680                                 {
681                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
682                                         st->ecnt++;
683                                         return;
684                                 }
685                         }
686
687                         if (putVariable(st, argv[1], res) == false)
688                         {
689                                 fprintf(stderr, "%s: out of memory\n", argv[0]);
690                                 st->ecnt++;
691                                 return;
692                         }
693
694                         st->listen = 1;
695                 }
696
697                 goto top;
698         }
699 }
700
701 /* discard connections */
702 static void
703 disconnect_all(CState * state)
704 {
705         int                     i;
706
707         for (i = 0; i < nclients; i++)
708         {
709                 if (state[i].con)
710                         PQfinish(state[i].con);
711         }
712 }
713
714 /* create tables and setup data */
715 static void
716 init(void)
717 {
718         PGconn     *con;
719         PGresult   *res;
720         static char *DDLs[] = {
721                 "drop table branches",
722                 "create table branches(bid int not null,bbalance int,filler char(88))",
723                 "drop table tellers",
724                 "create table tellers(tid int not null,bid int,tbalance int,filler char(84))",
725                 "drop table accounts",
726                 "create table accounts(aid int not null,bid int,abalance int,filler char(84))",
727                 "drop table history",
728         "create table history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22))"};
729         static char *DDLAFTERs[] = {
730                 "alter table branches add primary key (bid)",
731                 "alter table tellers add primary key (tid)",
732         "alter table accounts add primary key (aid)"};
733
734
735         char            sql[256];
736
737         int                     i;
738
739         if ((con = doConnect()) == NULL)
740                 exit(1);
741
742         for (i = 0; i < (sizeof(DDLs) / sizeof(char *)); i++)
743         {
744                 res = PQexec(con, DDLs[i]);
745                 if (strncmp(DDLs[i], "drop", 4) && PQresultStatus(res) != PGRES_COMMAND_OK)
746                 {
747                         fprintf(stderr, "%s", PQerrorMessage(con));
748                         exit(1);
749                 }
750                 PQclear(res);
751         }
752
753         res = PQexec(con, "begin");
754         if (PQresultStatus(res) != PGRES_COMMAND_OK)
755         {
756                 fprintf(stderr, "%s", PQerrorMessage(con));
757                 exit(1);
758         }
759         PQclear(res);
760
761         for (i = 0; i < nbranches * scale; i++)
762         {
763                 snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
764                 res = PQexec(con, sql);
765                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
766                 {
767                         fprintf(stderr, "%s", PQerrorMessage(con));
768                         exit(1);
769                 }
770                 PQclear(res);
771         }
772
773         for (i = 0; i < ntellers * scale; i++)
774         {
775                 snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
776                                  ,i + 1, i / ntellers + 1);
777                 res = PQexec(con, sql);
778                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
779                 {
780                         fprintf(stderr, "%s", PQerrorMessage(con));
781                         exit(1);
782                 }
783                 PQclear(res);
784         }
785
786         res = PQexec(con, "end");
787         if (PQresultStatus(res) != PGRES_COMMAND_OK)
788         {
789                 fprintf(stderr, "%s", PQerrorMessage(con));
790                 exit(1);
791         }
792         PQclear(res);
793
794         /*
795          * occupy accounts table with some data
796          */
797         fprintf(stderr, "creating tables...\n");
798         for (i = 0; i < naccounts * scale; i++)
799         {
800                 int                     j = i + 1;
801
802                 if (j % 10000 == 1)
803                 {
804                         res = PQexec(con, "copy accounts from stdin");
805                         if (PQresultStatus(res) != PGRES_COPY_IN)
806                         {
807                                 fprintf(stderr, "%s", PQerrorMessage(con));
808                                 exit(1);
809                         }
810                         PQclear(res);
811                 }
812
813                 snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
814                 if (PQputline(con, sql))
815                 {
816                         fprintf(stderr, "PQputline failed\n");
817                         exit(1);
818                 }
819
820                 if (j % 10000 == 0)
821                 {
822                         /*
823                          * every 10000 tuples, we commit the copy command. this should
824                          * avoid generating too much WAL logs
825                          */
826                         fprintf(stderr, "%d tuples done.\n", j);
827                         if (PQputline(con, "\\.\n"))
828                         {
829                                 fprintf(stderr, "very last PQputline failed\n");
830                                 exit(1);
831                         }
832
833                         if (PQendcopy(con))
834                         {
835                                 fprintf(stderr, "PQendcopy failed\n");
836                                 exit(1);
837                         }
838
839 #ifdef NOT_USED
840
841                         /*
842                          * do a checkpoint to purge the old WAL logs
843                          */
844                         res = PQexec(con, "checkpoint");
845                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
846                         {
847                                 fprintf(stderr, "%s", PQerrorMessage(con));
848                                 exit(1);
849                         }
850                         PQclear(res);
851 #endif   /* NOT_USED */
852                 }
853         }
854         fprintf(stderr, "set primary key...\n");
855         for (i = 0; i < (sizeof(DDLAFTERs) / sizeof(char *)); i++)
856         {
857                 res = PQexec(con, DDLAFTERs[i]);
858                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
859                 {
860                         fprintf(stderr, "%s", PQerrorMessage(con));
861                         exit(1);
862                 }
863                 PQclear(res);
864         }
865
866         /* vacuum */
867         fprintf(stderr, "vacuum...");
868         res = PQexec(con, "vacuum analyze");
869         if (PQresultStatus(res) != PGRES_COMMAND_OK)
870         {
871                 fprintf(stderr, "%s", PQerrorMessage(con));
872                 exit(1);
873         }
874         PQclear(res);
875         fprintf(stderr, "done.\n");
876
877         PQfinish(con);
878 }
879
880 static Command *
881 process_commands(char *buf)
882 {
883         const char      delim[] = " \f\n\r\t\v";
884
885         Command    *my_commands;
886         int                     j;
887         char       *p,
888                            *tok;
889
890         if ((p = strchr(buf, '\n')) != NULL)
891                 *p = '\0';
892
893         p = buf;
894         while (isspace((unsigned char) *p))
895                 p++;
896
897         if (*p == '\0' || strncmp(p, "--", 2) == 0)
898         {
899                 return NULL;
900         }
901
902         my_commands = (Command *) malloc(sizeof(Command));
903         if (my_commands == NULL)
904         {
905                 return NULL;
906         }
907
908         my_commands->argc = 0;
909
910         if (*p == '\\')
911         {
912                 my_commands->type = META_COMMAND;
913
914                 j = 0;
915                 tok = strtok(++p, delim);
916
917                 while (tok != NULL)
918                 {
919                         if ((my_commands->argv[j] = strdup(tok)) == NULL)
920                                 return NULL;
921
922                         my_commands->argc++;
923
924                         j++;
925                         tok = strtok(NULL, delim);
926                 }
927
928                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
929                 {
930                         if (my_commands->argc < 4)
931                         {
932                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
933                                 return NULL;
934                         }
935
936                         for (j = 4; j < my_commands->argc; j++)
937                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
938                                                 my_commands->argv[0], my_commands->argv[j]);
939                 }
940                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
941                 {
942                         if (my_commands->argc < 3)
943                         {
944                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
945                                 return NULL;
946                         }
947
948                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
949                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
950                                                 my_commands->argv[0], my_commands->argv[j]);
951                 }
952                 else
953                 {
954                         fprintf(stderr, "invalid command %s\n", my_commands->argv[0]);
955                         return NULL;
956                 }
957         }
958         else
959         {
960                 my_commands->type = SQL_COMMAND;
961
962                 if ((my_commands->argv[0] = strdup(p)) == NULL)
963                         return NULL;
964
965                 my_commands->argc++;
966         }
967
968         return my_commands;
969 }
970
971 static int
972 process_file(char *filename)
973 {
974 #define COMMANDS_ALLOC_NUM 128
975
976         Command   **my_commands;
977         FILE       *fd;
978         int                     lineno;
979         char            buf[BUFSIZ];
980         int                     alloc_num;
981
982         if (num_files >= MAX_FILES)
983         {
984                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
985                 exit(1);
986         }
987
988         alloc_num = COMMANDS_ALLOC_NUM;
989         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
990         if (my_commands == NULL)
991                 return false;
992
993         if (strcmp(filename, "-") == 0)
994                 fd = stdin;
995         else if ((fd = fopen(filename, "r")) == NULL)
996         {
997                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
998                 return false;
999         }
1000
1001         lineno = 0;
1002
1003         while (fgets(buf, sizeof(buf), fd) != NULL)
1004         {
1005                 Command    *commands;
1006                 int                     i;
1007
1008                 i = 0;
1009                 while (isspace((unsigned char) buf[i]))
1010                         i++;
1011
1012                 if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0)
1013                 {
1014                         commands = process_commands(&buf[i]);
1015                         if (commands == NULL)
1016                         {
1017                                 fclose(fd);
1018                                 return false;
1019                         }
1020                 }
1021                 else
1022                         continue;
1023
1024                 my_commands[lineno] = commands;
1025                 lineno++;
1026
1027                 if (lineno >= alloc_num)
1028                 {
1029                         alloc_num += COMMANDS_ALLOC_NUM;
1030                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1031                         if (my_commands == NULL)
1032                         {
1033                                 fclose(fd);
1034                                 return false;
1035                         }
1036                 }
1037         }
1038         fclose(fd);
1039
1040         my_commands[lineno] = NULL;
1041
1042         sql_files[num_files++] = my_commands;
1043
1044         return true;
1045 }
1046
1047 static Command **
1048 process_builtin(char *tb)
1049 {
1050 #define COMMANDS_ALLOC_NUM 128
1051
1052         Command   **my_commands;
1053         int                     lineno;
1054         char            buf[BUFSIZ];
1055         int                     alloc_num;
1056
1057         if (*tb == '\0')
1058                 return NULL;
1059
1060         alloc_num = COMMANDS_ALLOC_NUM;
1061         my_commands = (Command **) malloc(sizeof(Command *) * alloc_num);
1062         if (my_commands == NULL)
1063                 return NULL;
1064
1065         lineno = 0;
1066
1067         for (;;)
1068         {
1069                 char       *p;
1070                 Command    *commands;
1071
1072                 p = buf;
1073                 while (*tb && *tb != '\n')
1074                         *p++ = *tb++;
1075
1076                 if (*tb == '\0')
1077                         break;
1078
1079                 if (*tb == '\n')
1080                         tb++;
1081
1082                 *p = '\0';
1083
1084                 commands = process_commands(buf);
1085                 if (commands == NULL)
1086                 {
1087                         return NULL;
1088                 }
1089
1090                 my_commands[lineno] = commands;
1091                 lineno++;
1092
1093                 if (lineno >= alloc_num)
1094                 {
1095                         alloc_num += COMMANDS_ALLOC_NUM;
1096                         my_commands = realloc(my_commands, sizeof(Command *) * alloc_num);
1097                         if (my_commands == NULL)
1098                         {
1099                                 return NULL;
1100                         }
1101                 }
1102         }
1103
1104         my_commands[lineno] = NULL;
1105
1106         return my_commands;
1107 }
1108
1109 /* print out results */
1110 static void
1111 printResults(
1112                          int ttype, CState * state,
1113                          struct timeval * tv1, struct timeval * tv2,
1114                          struct timeval * tv3)
1115 {
1116         double          t1,
1117                                 t2;
1118         int                     i;
1119         int                     normal_xacts = 0;
1120         char       *s;
1121
1122         for (i = 0; i < nclients; i++)
1123                 normal_xacts += state[i].cnt;
1124
1125         t1 = (tv3->tv_sec - tv1->tv_sec) * 1000000.0 + (tv3->tv_usec - tv1->tv_usec);
1126         t1 = normal_xacts * 1000000.0 / t1;
1127
1128         t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
1129         t2 = normal_xacts * 1000000.0 / t2;
1130
1131         if (ttype == 0)
1132                 s = "TPC-B (sort of)";
1133         else if (ttype == 2)
1134                 s = "Update only accounts";
1135         else if (ttype == 1)
1136                 s = "SELECT only";
1137         else
1138                 s = "Custom query";
1139
1140         printf("transaction type: %s\n", s);
1141         printf("scaling factor: %d\n", scale);
1142         printf("number of clients: %d\n", nclients);
1143         printf("number of transactions per client: %d\n", nxacts);
1144         printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
1145         printf("tps = %f (including connections establishing)\n", t1);
1146         printf("tps = %f (excluding connections establishing)\n", t2);
1147 }
1148
1149
1150 int
1151 main(int argc, char **argv)
1152 {
1153         int                     c;
1154         int                     is_init_mode = 0;               /* initialize mode? */
1155         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
1156         int                     is_full_vacuum = 0;             /* do full vacuum before testing? */
1157         int                     debug = 0;              /* debug flag */
1158         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
1159                                                                  * 2: skip update of branches and tellers */
1160         char       *filename = NULL;
1161
1162         CState     *state;                      /* status of clients */
1163
1164         struct timeval tv1;                     /* start up time */
1165         struct timeval tv2;                     /* after establishing all connections to the
1166                                                                  * backend */
1167         struct timeval tv3;                     /* end time */
1168
1169         int                     i;
1170
1171         fd_set          input_mask;
1172         int                     nsocks;                 /* return from select(2) */
1173         int                     maxsock;                /* max socket number to be waited */
1174
1175 #if !(defined(__CYGWIN__) || defined(__MINGW32__))
1176         struct rlimit rlim;
1177 #endif
1178
1179         PGconn     *con;
1180         PGresult   *res;
1181         char       *env;
1182
1183         char            val[64];
1184
1185         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
1186                 pghost = env;
1187         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
1188                 pgport = env;
1189         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
1190                 login = env;
1191
1192         state = (CState *) malloc(sizeof(CState));
1193         if (state == NULL)
1194         {
1195                 fprintf(stderr, "Couldn't allocate memory for state\n");
1196                 exit(1);
1197         }
1198
1199         memset(state, 0, sizeof(*state));
1200
1201         while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:D:")) != -1)
1202         {
1203                 switch (c)
1204                 {
1205                         case 'i':
1206                                 is_init_mode++;
1207                                 break;
1208                         case 'h':
1209                                 pghost = optarg;
1210                                 break;
1211                         case 'n':
1212                                 is_no_vacuum++;
1213                                 break;
1214                         case 'v':
1215                                 is_full_vacuum++;
1216                                 break;
1217                         case 'p':
1218                                 pgport = optarg;
1219                                 break;
1220                         case 'd':
1221                                 debug++;
1222                                 break;
1223                         case 'S':
1224                                 ttype = 1;
1225                                 break;
1226                         case 'N':
1227                                 ttype = 2;
1228                                 break;
1229                         case 'c':
1230                                 nclients = atoi(optarg);
1231                                 if (nclients <= 0 || nclients > MAXCLIENTS)
1232                                 {
1233                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
1234                                         exit(1);
1235                                 }
1236 #if !(defined(__CYGWIN__) || defined(__MINGW32__))
1237 #ifdef RLIMIT_NOFILE                    /* most platform uses RLIMIT_NOFILE */
1238                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
1239 #else                                                   /* but BSD doesn't ... */
1240                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
1241 #endif   /* RLIMIT_NOFILE */
1242                                 {
1243                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
1244                                         exit(1);
1245                                 }
1246                                 if (rlim.rlim_cur <= (nclients + 2))
1247                                 {
1248                                         fprintf(stderr, "You need at least %d open files resource but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
1249                                         fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
1250                                         exit(1);
1251                                 }
1252 #endif
1253                                 break;
1254                         case 'C':
1255                                 is_connect = 1;
1256                                 break;
1257                         case 's':
1258                                 scale = atoi(optarg);
1259                                 if (scale <= 0)
1260                                 {
1261                                         fprintf(stderr, "invalid scaling factor: %d\n", scale);
1262                                         exit(1);
1263                                 }
1264                                 break;
1265                         case 't':
1266                                 nxacts = atoi(optarg);
1267                                 if (nxacts <= 0)
1268                                 {
1269                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
1270                                         exit(1);
1271                                 }
1272                                 break;
1273                         case 'U':
1274                                 login = optarg;
1275                                 break;
1276                         case 'P':
1277                                 pwd = optarg;
1278                                 break;
1279                         case 'l':
1280                                 use_log = true;
1281                                 break;
1282                         case 'f':
1283                                 ttype = 3;
1284                                 filename = optarg;
1285                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
1286                                         exit(1);
1287                                 break;
1288                         case 'D':
1289                                 {
1290                                         char       *p;
1291
1292                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
1293                                         {
1294                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
1295                                                 exit(1);
1296                                         }
1297
1298                                         *p++ = '\0';
1299                                         if (putVariable(&state[0], optarg, p) == false)
1300                                         {
1301                                                 fprintf(stderr, "Couldn't allocate memory for variable\n");
1302                                                 exit(1);
1303                                         }
1304                                 }
1305                                 break;
1306                         default:
1307                                 usage();
1308                                 exit(1);
1309                                 break;
1310                 }
1311         }
1312
1313         if (argc > optind)
1314                 dbName = argv[optind];
1315         else
1316         {
1317                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
1318                         dbName = env;
1319                 else if (login != NULL && *login != '\0')
1320                         dbName = login;
1321                 else
1322                         dbName = "";
1323         }
1324
1325         if (is_init_mode)
1326         {
1327                 init();
1328                 exit(0);
1329         }
1330
1331         remains = nclients;
1332
1333         if (getVariable(&state[0], "scale") == NULL)
1334         {
1335                 snprintf(val, sizeof(val), "%d", scale);
1336                 if (putVariable(&state[0], "scale", val) == false)
1337                 {
1338                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1339                         exit(1);
1340                 }
1341         }
1342
1343         if (nclients > 1)
1344         {
1345                 state = (CState *) realloc(state, sizeof(CState) * nclients);
1346                 if (state == NULL)
1347                 {
1348                         fprintf(stderr, "Couldn't allocate memory for state\n");
1349                         exit(1);
1350                 }
1351
1352                 memset(state + 1, 0, sizeof(*state) * (nclients - 1));
1353
1354                 for (i = 1; i < nclients; i++)
1355                 {
1356                         int                     j;
1357
1358                         for (j = 0; j < state[0].nvariables; j++)
1359                         {
1360                                 if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
1361                                 {
1362                                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1363                                         exit(1);
1364                                 }
1365                         }
1366                 }
1367         }
1368
1369         if (use_log)
1370         {
1371                 char            logpath[64];
1372
1373                 snprintf(logpath, 64, "pgbench_log.%d", getpid());
1374                 LOGFILE = fopen(logpath, "w");
1375
1376                 if (LOGFILE == NULL)
1377                 {
1378                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
1379                         exit(1);
1380                 }
1381         }
1382
1383         if (debug)
1384         {
1385                 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
1386                            pghost, pgport, nclients, nxacts, dbName);
1387         }
1388
1389         /* opening connection... */
1390         con = doConnect();
1391         if (con == NULL)
1392                 exit(1);
1393
1394         if (PQstatus(con) == CONNECTION_BAD)
1395         {
1396                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
1397                 fprintf(stderr, "%s", PQerrorMessage(con));
1398                 exit(1);
1399         }
1400
1401         if (ttype != 3)
1402         {
1403                 /*
1404                  * get the scaling factor that should be same as count(*) from
1405                  * branches if this is not a custom query
1406                  */
1407                 res = PQexec(con, "select count(*) from branches");
1408                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1409                 {
1410                         fprintf(stderr, "%s", PQerrorMessage(con));
1411                         exit(1);
1412                 }
1413                 scale = atoi(PQgetvalue(res, 0, 0));
1414                 if (scale < 0)
1415                 {
1416                         fprintf(stderr, "count(*) from branches invalid (%d)\n", scale);
1417                         exit(1);
1418                 }
1419                 PQclear(res);
1420
1421                 snprintf(val, sizeof(val), "%d", scale);
1422                 if (putVariable(&state[0], "scale", val) == false)
1423                 {
1424                         fprintf(stderr, "Couldn't allocate memory for variable\n");
1425                         exit(1);
1426                 }
1427         }
1428
1429         if (!is_no_vacuum)
1430         {
1431                 fprintf(stderr, "starting vacuum...");
1432                 res = PQexec(con, "vacuum branches");
1433                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1434                 {
1435                         fprintf(stderr, "%s", PQerrorMessage(con));
1436                         exit(1);
1437                 }
1438                 PQclear(res);
1439
1440                 res = PQexec(con, "vacuum tellers");
1441                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1442                 {
1443                         fprintf(stderr, "%s", PQerrorMessage(con));
1444                         exit(1);
1445                 }
1446                 PQclear(res);
1447
1448                 res = PQexec(con, "delete from history");
1449                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1450                 {
1451                         fprintf(stderr, "%s", PQerrorMessage(con));
1452                         exit(1);
1453                 }
1454                 PQclear(res);
1455                 res = PQexec(con, "vacuum history");
1456                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1457                 {
1458                         fprintf(stderr, "%s", PQerrorMessage(con));
1459                         exit(1);
1460                 }
1461                 PQclear(res);
1462
1463                 fprintf(stderr, "end.\n");
1464
1465                 if (is_full_vacuum)
1466                 {
1467                         fprintf(stderr, "starting full vacuum...");
1468                         res = PQexec(con, "vacuum analyze accounts");
1469                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1470                         {
1471                                 fprintf(stderr, "%s", PQerrorMessage(con));
1472                                 exit(1);
1473                         }
1474                         PQclear(res);
1475                         fprintf(stderr, "end.\n");
1476                 }
1477         }
1478         PQfinish(con);
1479
1480         /* set random seed */
1481         gettimeofday(&tv1, NULL);
1482         srand((unsigned int) tv1.tv_usec);
1483
1484         /* get start up time */
1485         gettimeofday(&tv1, NULL);
1486
1487         if (is_connect == 0)
1488         {
1489                 /* make connections to the database */
1490                 for (i = 0; i < nclients; i++)
1491                 {
1492                         state[i].id = i;
1493                         if ((state[i].con = doConnect()) == NULL)
1494                                 exit(1);
1495                 }
1496         }
1497
1498         /* time after connections set up */
1499         gettimeofday(&tv2, NULL);
1500
1501         /* process bultin SQL scripts */
1502         switch (ttype)
1503         {
1504                 case 0:
1505                         sql_files[0] = process_builtin(tpc_b);
1506                         num_files = 1;
1507                         break;
1508
1509                 case 1:
1510                         sql_files[0] = process_builtin(select_only);
1511                         num_files = 1;
1512                         break;
1513
1514                 case 2:
1515                         sql_files[0] = process_builtin(simple_update);
1516                         num_files = 1;
1517                         break;
1518
1519                 default:
1520                         break;
1521         }
1522
1523         /* send start up queries in async manner */
1524         for (i = 0; i < nclients; i++)
1525         {
1526                 Command   **commands = sql_files[state[i].use_file];
1527                 int                     prev_ecnt = state[i].ecnt;
1528
1529                 state[i].use_file = getrand(0, num_files - 1);
1530                 doCustom(state, i, debug);
1531
1532                 if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1533                 {
1534                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1535                         remains--;                      /* I've aborted */
1536                         PQfinish(state[i].con);
1537                         state[i].con = NULL;
1538                 }
1539         }
1540
1541         for (;;)
1542         {
1543                 if (remains <= 0)
1544                 {                                               /* all done ? */
1545                         disconnect_all(state);
1546                         /* get end time */
1547                         gettimeofday(&tv3, NULL);
1548                         printResults(ttype, state, &tv1, &tv2, &tv3);
1549                         if (LOGFILE)
1550                                 fclose(LOGFILE);
1551                         exit(0);
1552                 }
1553
1554                 FD_ZERO(&input_mask);
1555
1556                 maxsock = -1;
1557                 for (i = 0; i < nclients; i++)
1558                 {
1559                         Command   **commands = sql_files[state[i].use_file];
1560
1561                         if (state[i].con && commands[state[i].state]->type != META_COMMAND)
1562                         {
1563                                 int                     sock = PQsocket(state[i].con);
1564
1565                                 if (sock < 0)
1566                                 {
1567                                         disconnect_all(state);
1568                                         exit(1);
1569                                 }
1570                                 FD_SET(sock, &input_mask);
1571                                 if (maxsock < sock)
1572                                         maxsock = sock;
1573                         }
1574                 }
1575
1576                 if (maxsock != -1)
1577                 {
1578                         if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
1579                                                           (fd_set *) NULL, (struct timeval *) NULL)) < 0)
1580                         {
1581                                 if (errno == EINTR)
1582                                         continue;
1583                                 /* must be something wrong */
1584                                 disconnect_all(state);
1585                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
1586                                 exit(1);
1587                         }
1588                         else if (nsocks == 0)
1589                         {                                       /* timeout */
1590                                 fprintf(stderr, "select timeout\n");
1591                                 for (i = 0; i < nclients; i++)
1592                                 {
1593                                         fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
1594                                                         i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
1595                                 }
1596                                 exit(0);
1597                         }
1598                 }
1599
1600                 /* ok, backend returns reply */
1601                 for (i = 0; i < nclients; i++)
1602                 {
1603                         Command   **commands = sql_files[state[i].use_file];
1604                         int                     prev_ecnt = state[i].ecnt;
1605
1606                         if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
1607                                                   || commands[state[i].state]->type == META_COMMAND))
1608                         {
1609                                 doCustom(state, i, debug);
1610                         }
1611
1612                         if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
1613                         {
1614                                 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
1615                                 remains--;              /* I've aborted */
1616                                 PQfinish(state[i].con);
1617                                 state[i].con = NULL;
1618                         }
1619                 }
1620         }
1621 }