]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
pgbench: Fix inadvertent inconsistency in help message.
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * contrib/pgbench/pgbench.c
8  * Copyright (c) 2000-2013, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024                 /* set before winsock2.h is included */
32 #endif   /* ! WIN32 */
33
34 #include "postgres_fe.h"
35
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39
40 #include <ctype.h>
41 #include <math.h>
42 #include <signal.h>
43
44 #ifndef WIN32
45 #include <sys/time.h>
46 #include <unistd.h>
47 #endif   /* ! WIN32 */
48
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
51 #endif
52
53 #ifdef HAVE_SYS_RESOURCE_H
54 #include <sys/resource.h>               /* for getrlimit */
55 #endif
56
57 #ifndef INT64_MAX
58 #define INT64_MAX       INT64CONST(0x7FFFFFFFFFFFFFFF)
59 #endif
60
61 /*
62  * Multi-platform pthread implementations
63  */
64
65 #ifdef WIN32
66 /* Use native win32 threads on Windows */
67 typedef struct win32_pthread *pthread_t;
68 typedef int pthread_attr_t;
69
70 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
71 static int      pthread_join(pthread_t th, void **thread_return);
72 #elif defined(ENABLE_THREAD_SAFETY)
73 /* Use platform-dependent pthread capability */
74 #include <pthread.h>
75 #else
76 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
77
78 #include <sys/wait.h>
79
80 #define pthread_t                               pg_pthread_t
81 #define pthread_attr_t                  pg_pthread_attr_t
82 #define pthread_create                  pg_pthread_create
83 #define pthread_join                    pg_pthread_join
84
85 typedef struct fork_pthread *pthread_t;
86 typedef int pthread_attr_t;
87
88 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
89 static int      pthread_join(pthread_t th, void **thread_return);
90 #endif
91
92 extern char *optarg;
93 extern int      optind;
94
95
96 /********************************************************************
97  * some configurable parameters */
98
99 /* max number of clients allowed */
100 #ifdef FD_SETSIZE
101 #define MAXCLIENTS      (FD_SETSIZE - 10)
102 #else
103 #define MAXCLIENTS      1024
104 #endif
105
106 #define LOG_STEP_SECONDS        5       /* seconds between log messages */
107 #define DEFAULT_NXACTS  10              /* default nxacts */
108
109 int                     nxacts = 0;                     /* number of transactions per client */
110 int                     duration = 0;           /* duration in seconds */
111
112 /*
113  * scaling factor. for example, scale = 10 will make 1000000 tuples in
114  * pgbench_accounts table.
115  */
116 int                     scale = 1;
117
118 /*
119  * fillfactor. for example, fillfactor = 90 will use only 90 percent
120  * space during inserts and leave 10 percent free.
121  */
122 int                     fillfactor = 100;
123
124 /*
125  * create foreign key constraints on the tables?
126  */
127 int                     foreign_keys = 0;
128
129 /*
130  * use unlogged tables?
131  */
132 int                     unlogged_tables = 0;
133
134 /*
135  * log sampling rate (1.0 = log everything, 0.0 = option not given)
136  */
137 double          sample_rate = 0.0;
138
139 /*
140  * tablespace selection
141  */
142 char       *tablespace = NULL;
143 char       *index_tablespace = NULL;
144
145 /*
146  * end of configurable parameters
147  *********************************************************************/
148
149 #define nbranches       1                       /* Makes little sense to change this.  Change
150                                                                  * -s instead */
151 #define ntellers        10
152 #define naccounts       100000
153
154 /*
155  * The scale factor at/beyond which 32bit integers are incapable of storing
156  * 64bit values.
157  *
158  * Although the actual threshold is 21474, we use 20000 because it is easier to
159  * document and remember, and isn't that far away from the real threshold.
160  */
161 #define SCALE_32BIT_THRESHOLD 20000
162
163 bool            use_log;                        /* log transaction latencies to a file */
164 bool            use_quiet;                      /* quiet logging onto stderr */
165 int                     agg_interval;           /* log aggregates instead of individual
166                                                                  * transactions */
167 bool            is_connect;                     /* establish connection for each transaction */
168 bool            is_latencies;           /* report per-command latencies */
169 int                     main_pid;                       /* main process id used in log filename */
170
171 char       *pghost = "";
172 char       *pgport = "";
173 char       *login = NULL;
174 char       *dbName;
175 const char *progname;
176
177 volatile bool timer_exceeded = false;   /* flag from signal handler */
178
179 /* variable definitions */
180 typedef struct
181 {
182         char       *name;                       /* variable name */
183         char       *value;                      /* its value */
184 } Variable;
185
186 #define MAX_FILES               128             /* max number of SQL script files allowed */
187 #define SHELL_COMMAND_SIZE      256 /* maximum size allowed for shell command */
188
189 /*
190  * structures used in custom query mode
191  */
192
193 typedef struct
194 {
195         PGconn     *con;                        /* connection handle to DB */
196         int                     id;                             /* client No. */
197         int                     state;                  /* state No. */
198         int                     cnt;                    /* xacts count */
199         int                     ecnt;                   /* error count */
200         int                     listen;                 /* 0 indicates that an async query has been
201                                                                  * sent */
202         int                     sleeping;               /* 1 indicates that the client is napping */
203         int64           until;                  /* napping until (usec) */
204         Variable   *variables;          /* array of variable definitions */
205         int                     nvariables;
206         instr_time      txn_begin;              /* used for measuring transaction latencies */
207         instr_time      stmt_begin;             /* used for measuring statement latencies */
208         int                     use_file;               /* index in sql_files for this client */
209         bool            prepared[MAX_FILES];
210 } CState;
211
212 /*
213  * Thread state and result
214  */
215 typedef struct
216 {
217         int                     tid;                    /* thread id */
218         pthread_t       thread;                 /* thread handle */
219         CState     *state;                      /* array of CState */
220         int                     nstate;                 /* length of state[] */
221         instr_time      start_time;             /* thread start time */
222         instr_time *exec_elapsed;       /* time spent executing cmds (per Command) */
223         int                *exec_count;         /* number of cmd executions (per Command) */
224         unsigned short random_state[3];         /* separate randomness for each thread */
225 } TState;
226
227 #define INVALID_THREAD          ((pthread_t) 0)
228
229 typedef struct
230 {
231         instr_time      conn_time;
232         int                     xacts;
233 } TResult;
234
235 /*
236  * queries read from files
237  */
238 #define SQL_COMMAND             1
239 #define META_COMMAND    2
240 #define MAX_ARGS                10
241
242 typedef enum QueryMode
243 {
244         QUERY_SIMPLE,                           /* simple query */
245         QUERY_EXTENDED,                         /* extended query */
246         QUERY_PREPARED,                         /* extended query with prepared statements */
247         NUM_QUERYMODE
248 } QueryMode;
249
250 static QueryMode querymode = QUERY_SIMPLE;
251 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
252
253 typedef struct
254 {
255         char       *line;                       /* full text of command line */
256         int                     command_num;    /* unique index of this Command struct */
257         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
258         int                     argc;                   /* number of command words */
259         char       *argv[MAX_ARGS]; /* command word list */
260 } Command;
261
262 typedef struct
263 {
264
265         long            start_time;             /* when does the interval start */
266         int                     cnt;                    /* number of transactions */
267         double          min_duration;   /* min/max durations */
268         double          max_duration;
269         double          sum;                    /* sum(duration), sum(duration^2) - for
270                                                                  * estimates */
271         double          sum2;
272
273 } AggVals;
274
275 static Command **sql_files[MAX_FILES];  /* SQL script files */
276 static int      num_files;                      /* number of script files */
277 static int      num_commands = 0;       /* total number of Command structs */
278 static int      debug = 0;                      /* debug flag */
279
280 /* default scenario */
281 static char *tpc_b = {
282         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
283         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
284         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
285         "\\setrandom aid 1 :naccounts\n"
286         "\\setrandom bid 1 :nbranches\n"
287         "\\setrandom tid 1 :ntellers\n"
288         "\\setrandom delta -5000 5000\n"
289         "BEGIN;\n"
290         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
291         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
292         "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
293         "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
294         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
295         "END;\n"
296 };
297
298 /* -N case */
299 static char *simple_update = {
300         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
301         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
302         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
303         "\\setrandom aid 1 :naccounts\n"
304         "\\setrandom bid 1 :nbranches\n"
305         "\\setrandom tid 1 :ntellers\n"
306         "\\setrandom delta -5000 5000\n"
307         "BEGIN;\n"
308         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
309         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
310         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
311         "END;\n"
312 };
313
314 /* -S case */
315 static char *select_only = {
316         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
317         "\\setrandom aid 1 :naccounts\n"
318         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
319 };
320
321 /* Function prototypes */
322 static void setalarm(int seconds);
323 static void *threadRun(void *arg);
324
325 static void
326 usage(void)
327 {
328         printf("%s is a benchmarking tool for PostgreSQL.\n\n"
329                    "Usage:\n"
330                    "  %s [OPTION]... [DBNAME]\n"
331                    "\nInitialization options:\n"
332                    "  -i, --initialize         invokes initialization mode\n"
333                    "  -F, --fillfactor=NUM     set fill factor\n"
334                    "  -n, --no-vacuum          do not run VACUUM after initialization\n"
335                    "  -q, --quiet              quiet logging (one message each 5 seconds)\n"
336                    "  -s, --scale=NUM          scaling factor\n"
337                    "  --foreign-keys           create foreign key constraints between tables\n"
338                    "  --index-tablespace=TABLESPACE\n"
339                    "                           create indexes in the specified tablespace\n"
340                    "  --tablespace=TABLESPACE  create tables in the specified tablespace\n"
341                    "  --unlogged-tables        create tables as unlogged tables\n"
342                    "\nBenchmarking options:\n"
343                    "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
344                    "  -C, --connect            establish new connection for each transaction\n"
345                    "  -D, --define=VARNAME=VALUE\n"
346                    "                           define variable for use by custom script\n"
347                    "  -f, --file=FILENAME      read transaction script from FILENAME\n"
348                    "  -j, --jobs=NUM           number of threads (default: 1)\n"
349                    "  -l, --log                write transaction times to log file\n"
350                    "  -M, --protocol=simple|extended|prepared\n"
351                    "                           protocol for submitting queries "
352                                                            "(default: simple)\n"
353                    "  -n, --no-vacuum          do not run VACUUM before tests\n"
354                    "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
355                    "  -r, --report-latencies   report average latency per command\n"
356                    "  -s, --scale=NUM          report this scale factor in output\n"
357                    "  -S, --select-only        perform SELECT-only transactions\n"
358                    "  -t, --transactions       number of transactions each client runs "
359                                                              "(default: 10)\n"
360                    "  -T, --time=NUM           duration of benchmark test in seconds\n"
361                    "  -v, --vacuum-all         vacuum all four standard tables before tests\n"
362                    "  --aggregate-interval=NUM aggregate data over NUM seconds\n"
363                    "  --sampling-rate=NUM      fraction of transactions to log (e.g. 0.01 for 1%%)\n"
364                    "\nCommon options:\n"
365                    "  -d, --debug              print debugging output\n"
366                    "  -h, --host=HOSTNAME      database server host or socket directory\n"
367                    "  -p, --port=PORT          database server port number\n"
368                    "  -U, --username=USERNAME  connect as specified database user\n"
369                    "  -V, --version            output version information, then exit\n"
370                    "  -?, --help               show this help, then exit\n"
371                    "\n"
372                    "Report bugs to <pgsql-bugs@postgresql.org>.\n",
373                    progname, progname);
374 }
375
376 /*
377  * strtoint64 -- convert a string to 64-bit integer
378  *
379  * This function is a modified version of scanint8() from
380  * src/backend/utils/adt/int8.c.
381  */
382 static int64
383 strtoint64(const char *str)
384 {
385         const char *ptr = str;
386         int64           result = 0;
387         int                     sign = 1;
388
389         /*
390          * Do our own scan, rather than relying on sscanf which might be broken
391          * for long long.
392          */
393
394         /* skip leading spaces */
395         while (*ptr && isspace((unsigned char) *ptr))
396                 ptr++;
397
398         /* handle sign */
399         if (*ptr == '-')
400         {
401                 ptr++;
402
403                 /*
404                  * Do an explicit check for INT64_MIN.  Ugly though this is, it's
405                  * cleaner than trying to get the loop below to handle it portably.
406                  */
407                 if (strncmp(ptr, "9223372036854775808", 19) == 0)
408                 {
409                         result = -INT64CONST(0x7fffffffffffffff) - 1;
410                         ptr += 19;
411                         goto gotdigits;
412                 }
413                 sign = -1;
414         }
415         else if (*ptr == '+')
416                 ptr++;
417
418         /* require at least one digit */
419         if (!isdigit((unsigned char) *ptr))
420                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
421
422         /* process digits */
423         while (*ptr && isdigit((unsigned char) *ptr))
424         {
425                 int64           tmp = result * 10 + (*ptr++ - '0');
426
427                 if ((tmp / 10) != result)               /* overflow? */
428                         fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
429                 result = tmp;
430         }
431
432 gotdigits:
433
434         /* allow trailing whitespace, but not other trailing chars */
435         while (*ptr != '\0' && isspace((unsigned char) *ptr))
436                 ptr++;
437
438         if (*ptr != '\0')
439                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
440
441         return ((sign < 0) ? -result : result);
442 }
443
444 /* random number generator: uniform distribution from min to max inclusive */
445 static int64
446 getrand(TState *thread, int64 min, int64 max)
447 {
448         /*
449          * Odd coding is so that min and max have approximately the same chance of
450          * being selected as do numbers between them.
451          *
452          * pg_erand48() is thread-safe and concurrent, which is why we use it
453          * rather than random(), which in glibc is non-reentrant, and therefore
454          * protected by a mutex, and therefore a bottleneck on machines with many
455          * CPUs.
456          */
457         return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
458 }
459
460 /* call PQexec() and exit() on failure */
461 static void
462 executeStatement(PGconn *con, const char *sql)
463 {
464         PGresult   *res;
465
466         res = PQexec(con, sql);
467         if (PQresultStatus(res) != PGRES_COMMAND_OK)
468         {
469                 fprintf(stderr, "%s", PQerrorMessage(con));
470                 exit(1);
471         }
472         PQclear(res);
473 }
474
475 /* set up a connection to the backend */
476 static PGconn *
477 doConnect(void)
478 {
479         PGconn     *conn;
480         static char *password = NULL;
481         bool            new_pass;
482
483         /*
484          * Start the connection.  Loop until we have a password if requested by
485          * backend.
486          */
487         do
488         {
489 #define PARAMS_ARRAY_SIZE       7
490
491                 const char *keywords[PARAMS_ARRAY_SIZE];
492                 const char *values[PARAMS_ARRAY_SIZE];
493
494                 keywords[0] = "host";
495                 values[0] = pghost;
496                 keywords[1] = "port";
497                 values[1] = pgport;
498                 keywords[2] = "user";
499                 values[2] = login;
500                 keywords[3] = "password";
501                 values[3] = password;
502                 keywords[4] = "dbname";
503                 values[4] = dbName;
504                 keywords[5] = "fallback_application_name";
505                 values[5] = progname;
506                 keywords[6] = NULL;
507                 values[6] = NULL;
508
509                 new_pass = false;
510
511                 conn = PQconnectdbParams(keywords, values, true);
512
513                 if (!conn)
514                 {
515                         fprintf(stderr, "Connection to database \"%s\" failed\n",
516                                         dbName);
517                         return NULL;
518                 }
519
520                 if (PQstatus(conn) == CONNECTION_BAD &&
521                         PQconnectionNeedsPassword(conn) &&
522                         password == NULL)
523                 {
524                         PQfinish(conn);
525                         password = simple_prompt("Password: ", 100, false);
526                         new_pass = true;
527                 }
528         } while (new_pass);
529
530         /* check to see that the backend connection was successfully made */
531         if (PQstatus(conn) == CONNECTION_BAD)
532         {
533                 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
534                                 dbName, PQerrorMessage(conn));
535                 PQfinish(conn);
536                 return NULL;
537         }
538
539         return conn;
540 }
541
542 /* throw away response from backend */
543 static void
544 discard_response(CState *state)
545 {
546         PGresult   *res;
547
548         do
549         {
550                 res = PQgetResult(state->con);
551                 if (res)
552                         PQclear(res);
553         } while (res);
554 }
555
556 static int
557 compareVariables(const void *v1, const void *v2)
558 {
559         return strcmp(((const Variable *) v1)->name,
560                                   ((const Variable *) v2)->name);
561 }
562
563 static char *
564 getVariable(CState *st, char *name)
565 {
566         Variable        key,
567                            *var;
568
569         /* On some versions of Solaris, bsearch of zero items dumps core */
570         if (st->nvariables <= 0)
571                 return NULL;
572
573         key.name = name;
574         var = (Variable *) bsearch((void *) &key,
575                                                            (void *) st->variables,
576                                                            st->nvariables,
577                                                            sizeof(Variable),
578                                                            compareVariables);
579         if (var != NULL)
580                 return var->value;
581         else
582                 return NULL;
583 }
584
585 /* check whether the name consists of alphabets, numerals and underscores. */
586 static bool
587 isLegalVariableName(const char *name)
588 {
589         int                     i;
590
591         for (i = 0; name[i] != '\0'; i++)
592         {
593                 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
594                         return false;
595         }
596
597         return true;
598 }
599
600 static int
601 putVariable(CState *st, const char *context, char *name, char *value)
602 {
603         Variable        key,
604                            *var;
605
606         key.name = name;
607         /* On some versions of Solaris, bsearch of zero items dumps core */
608         if (st->nvariables > 0)
609                 var = (Variable *) bsearch((void *) &key,
610                                                                    (void *) st->variables,
611                                                                    st->nvariables,
612                                                                    sizeof(Variable),
613                                                                    compareVariables);
614         else
615                 var = NULL;
616
617         if (var == NULL)
618         {
619                 Variable   *newvars;
620
621                 /*
622                  * Check for the name only when declaring a new variable to avoid
623                  * overhead.
624                  */
625                 if (!isLegalVariableName(name))
626                 {
627                         fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
628                         return false;
629                 }
630
631                 if (st->variables)
632                         newvars = (Variable *) pg_realloc(st->variables,
633                                                                         (st->nvariables + 1) * sizeof(Variable));
634                 else
635                         newvars = (Variable *) pg_malloc(sizeof(Variable));
636
637                 st->variables = newvars;
638
639                 var = &newvars[st->nvariables];
640
641                 var->name = pg_strdup(name);
642                 var->value = pg_strdup(value);
643
644                 st->nvariables++;
645
646                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
647                           compareVariables);
648         }
649         else
650         {
651                 char       *val;
652
653                 /* dup then free, in case value is pointing at this variable */
654                 val = pg_strdup(value);
655
656                 free(var->value);
657                 var->value = val;
658         }
659
660         return true;
661 }
662
663 static char *
664 parseVariable(const char *sql, int *eaten)
665 {
666         int                     i = 0;
667         char       *name;
668
669         do
670         {
671                 i++;
672         } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
673         if (i == 1)
674                 return NULL;
675
676         name = pg_malloc(i);
677         memcpy(name, &sql[1], i - 1);
678         name[i - 1] = '\0';
679
680         *eaten = i;
681         return name;
682 }
683
684 static char *
685 replaceVariable(char **sql, char *param, int len, char *value)
686 {
687         int                     valueln = strlen(value);
688
689         if (valueln > len)
690         {
691                 size_t          offset = param - *sql;
692
693                 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
694                 param = *sql + offset;
695         }
696
697         if (valueln != len)
698                 memmove(param + valueln, param + len, strlen(param + len) + 1);
699         strncpy(param, value, valueln);
700
701         return param + valueln;
702 }
703
704 static char *
705 assignVariables(CState *st, char *sql)
706 {
707         char       *p,
708                            *name,
709                            *val;
710
711         p = sql;
712         while ((p = strchr(p, ':')) != NULL)
713         {
714                 int                     eaten;
715
716                 name = parseVariable(p, &eaten);
717                 if (name == NULL)
718                 {
719                         while (*p == ':')
720                         {
721                                 p++;
722                         }
723                         continue;
724                 }
725
726                 val = getVariable(st, name);
727                 free(name);
728                 if (val == NULL)
729                 {
730                         p++;
731                         continue;
732                 }
733
734                 p = replaceVariable(&sql, p, eaten, val);
735         }
736
737         return sql;
738 }
739
740 static void
741 getQueryParams(CState *st, const Command *command, const char **params)
742 {
743         int                     i;
744
745         for (i = 0; i < command->argc - 1; i++)
746                 params[i] = getVariable(st, command->argv[i + 1]);
747 }
748
749 /*
750  * Run a shell command. The result is assigned to the variable if not NULL.
751  * Return true if succeeded, or false on error.
752  */
753 static bool
754 runShellCommand(CState *st, char *variable, char **argv, int argc)
755 {
756         char            command[SHELL_COMMAND_SIZE];
757         int                     i,
758                                 len = 0;
759         FILE       *fp;
760         char            res[64];
761         char       *endptr;
762         int                     retval;
763
764         /*----------
765          * Join arguments with whitespace separators. Arguments starting with
766          * exactly one colon are treated as variables:
767          *      name - append a string "name"
768          *      :var - append a variable named 'var'
769          *      ::name - append a string ":name"
770          *----------
771          */
772         for (i = 0; i < argc; i++)
773         {
774                 char       *arg;
775                 int                     arglen;
776
777                 if (argv[i][0] != ':')
778                 {
779                         arg = argv[i];          /* a string literal */
780                 }
781                 else if (argv[i][1] == ':')
782                 {
783                         arg = argv[i] + 1;      /* a string literal starting with colons */
784                 }
785                 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
786                 {
787                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
788                         return false;
789                 }
790
791                 arglen = strlen(arg);
792                 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
793                 {
794                         fprintf(stderr, "%s: too long shell command\n", argv[0]);
795                         return false;
796                 }
797
798                 if (i > 0)
799                         command[len++] = ' ';
800                 memcpy(command + len, arg, arglen);
801                 len += arglen;
802         }
803
804         command[len] = '\0';
805
806         /* Fast path for non-assignment case */
807         if (variable == NULL)
808         {
809                 if (system(command))
810                 {
811                         if (!timer_exceeded)
812                                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
813                         return false;
814                 }
815                 return true;
816         }
817
818         /* Execute the command with pipe and read the standard output. */
819         if ((fp = popen(command, "r")) == NULL)
820         {
821                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
822                 return false;
823         }
824         if (fgets(res, sizeof(res), fp) == NULL)
825         {
826                 if (!timer_exceeded)
827                         fprintf(stderr, "%s: cannot read the result\n", argv[0]);
828                 return false;
829         }
830         if (pclose(fp) < 0)
831         {
832                 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
833                 return false;
834         }
835
836         /* Check whether the result is an integer and assign it to the variable */
837         retval = (int) strtol(res, &endptr, 10);
838         while (*endptr != '\0' && isspace((unsigned char) *endptr))
839                 endptr++;
840         if (*res == '\0' || *endptr != '\0')
841         {
842                 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
843                 return false;
844         }
845         snprintf(res, sizeof(res), "%d", retval);
846         if (!putVariable(st, "setshell", variable, res))
847                 return false;
848
849 #ifdef DEBUG
850         printf("shell parameter name: %s, value: %s\n", argv[1], res);
851 #endif
852         return true;
853 }
854
855 #define MAX_PREPARE_NAME                32
856 static void
857 preparedStatementName(char *buffer, int file, int state)
858 {
859         sprintf(buffer, "P%d_%d", file, state);
860 }
861
862 static bool
863 clientDone(CState *st, bool ok)
864 {
865         (void) ok;                                      /* unused */
866
867         if (st->con != NULL)
868         {
869                 PQfinish(st->con);
870                 st->con = NULL;
871         }
872         return false;                           /* always false */
873 }
874
875 static
876 void
877 agg_vals_init(AggVals *aggs, instr_time start)
878 {
879         /* basic counters */
880         aggs->cnt = 0;                          /* number of transactions */
881         aggs->sum = 0;                          /* SUM(duration) */
882         aggs->sum2 = 0;                         /* SUM(duration*duration) */
883
884         /* min and max transaction duration */
885         aggs->min_duration = 0;
886         aggs->max_duration = 0;
887
888         /* start of the current interval */
889         aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
890 }
891
892 /* return false iff client should be disconnected */
893 static bool
894 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
895 {
896         PGresult   *res;
897         Command   **commands;
898
899 top:
900         commands = sql_files[st->use_file];
901
902         if (st->sleeping)
903         {                                                       /* are we sleeping? */
904                 instr_time      now;
905
906                 INSTR_TIME_SET_CURRENT(now);
907                 if (st->until <= INSTR_TIME_GET_MICROSEC(now))
908                         st->sleeping = 0;       /* Done sleeping, go ahead with next command */
909                 else
910                         return true;            /* Still sleeping, nothing to do here */
911         }
912
913         if (st->listen)
914         {                                                       /* are we receiver? */
915                 if (commands[st->state]->type == SQL_COMMAND)
916                 {
917                         if (debug)
918                                 fprintf(stderr, "client %d receiving\n", st->id);
919                         if (!PQconsumeInput(st->con))
920                         {                                       /* there's something wrong */
921                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
922                                 return clientDone(st, false);
923                         }
924                         if (PQisBusy(st->con))
925                                 return true;    /* don't have the whole result yet */
926                 }
927
928                 /*
929                  * command finished: accumulate per-command execution times in
930                  * thread-local data structure, if per-command latencies are requested
931                  */
932                 if (is_latencies)
933                 {
934                         instr_time      now;
935                         int                     cnum = commands[st->state]->command_num;
936
937                         INSTR_TIME_SET_CURRENT(now);
938                         INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
939                                                                   now, st->stmt_begin);
940                         thread->exec_count[cnum]++;
941                 }
942
943                 /*
944                  * if transaction finished, record the time it took in the log
945                  */
946                 if (logfile && commands[st->state + 1] == NULL)
947                 {
948                         instr_time      now;
949                         instr_time      diff;
950                         double          usec;
951
952                         /*
953                          * write the log entry if this row belongs to the random sample,
954                          * or no sampling rate was given which means log everything.
955                          */
956                         if (sample_rate == 0.0 ||
957                                 pg_erand48(thread->random_state) <= sample_rate)
958                         {
959                                 INSTR_TIME_SET_CURRENT(now);
960                                 diff = now;
961                                 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
962                                 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
963
964                                 /* should we aggregate the results or not? */
965                                 if (agg_interval > 0)
966                                 {
967                                         /*
968                                          * are we still in the same interval? if yes, accumulate
969                                          * the values (print them otherwise)
970                                          */
971                                         if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
972                                         {
973                                                 agg->cnt += 1;
974                                                 agg->sum += usec;
975                                                 agg->sum2 += usec * usec;
976
977                                                 /* first in this aggregation interval */
978                                                 if ((agg->cnt == 1) || (usec < agg->min_duration))
979                                                         agg->min_duration = usec;
980
981                                                 if ((agg->cnt == 1) || (usec > agg->max_duration))
982                                                         agg->max_duration = usec;
983                                         }
984                                         else
985                                         {
986                                                 /*
987                                                  * Loop until we reach the interval of the current
988                                                  * transaction (and print all the empty intervals in
989                                                  * between).
990                                                  */
991                                                 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
992                                                 {
993                                                         /*
994                                                          * This is a non-Windows branch (thanks to the
995                                                          * ifdef in usage), so we don't need to handle
996                                                          * this in a special way (see below).
997                                                          */
998                                                         fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
999                                                                         agg->start_time,
1000                                                                         agg->cnt,
1001                                                                         agg->sum,
1002                                                                         agg->sum2,
1003                                                                         agg->min_duration,
1004                                                                         agg->max_duration);
1005
1006                                                         /* move to the next inteval */
1007                                                         agg->start_time = agg->start_time + agg_interval;
1008
1009                                                         /* reset for "no transaction" intervals */
1010                                                         agg->cnt = 0;
1011                                                         agg->min_duration = 0;
1012                                                         agg->max_duration = 0;
1013                                                         agg->sum = 0;
1014                                                         agg->sum2 = 0;
1015                                                 }
1016
1017                                                 /*
1018                                                  * and now update the reset values (include the
1019                                                  * current)
1020                                                  */
1021                                                 agg->cnt = 1;
1022                                                 agg->min_duration = usec;
1023                                                 agg->max_duration = usec;
1024                                                 agg->sum = usec;
1025                                                 agg->sum2 = usec * usec;
1026                                         }
1027                                 }
1028                                 else
1029                                 {
1030                                         /* no, print raw transactions */
1031 #ifndef WIN32
1032
1033                                         /*
1034                                          * This is more than we really ought to know about
1035                                          * instr_time
1036                                          */
1037                                         fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
1038                                                         st->id, st->cnt, usec, st->use_file,
1039                                                         (long) now.tv_sec, (long) now.tv_usec);
1040 #else
1041
1042                                         /*
1043                                          * On Windows, instr_time doesn't provide a timestamp
1044                                          * anyway
1045                                          */
1046                                         fprintf(logfile, "%d %d %.0f %d 0 0\n",
1047                                                         st->id, st->cnt, usec, st->use_file);
1048 #endif
1049                                 }
1050                         }
1051                 }
1052
1053                 if (commands[st->state]->type == SQL_COMMAND)
1054                 {
1055                         /*
1056                          * Read and discard the query result; note this is not included in
1057                          * the statement latency numbers.
1058                          */
1059                         res = PQgetResult(st->con);
1060                         switch (PQresultStatus(res))
1061                         {
1062                                 case PGRES_COMMAND_OK:
1063                                 case PGRES_TUPLES_OK:
1064                                         break;          /* OK */
1065                                 default:
1066                                         fprintf(stderr, "Client %d aborted in state %d: %s",
1067                                                         st->id, st->state, PQerrorMessage(st->con));
1068                                         PQclear(res);
1069                                         return clientDone(st, false);
1070                         }
1071                         PQclear(res);
1072                         discard_response(st);
1073                 }
1074
1075                 if (commands[st->state + 1] == NULL)
1076                 {
1077                         if (is_connect)
1078                         {
1079                                 PQfinish(st->con);
1080                                 st->con = NULL;
1081                         }
1082
1083                         ++st->cnt;
1084                         if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1085                                 return clientDone(st, true);    /* exit success */
1086                 }
1087
1088                 /* increment state counter */
1089                 st->state++;
1090                 if (commands[st->state] == NULL)
1091                 {
1092                         st->state = 0;
1093                         st->use_file = (int) getrand(thread, 0, num_files - 1);
1094                         commands = sql_files[st->use_file];
1095                 }
1096         }
1097
1098         if (st->con == NULL)
1099         {
1100                 instr_time      start,
1101                                         end;
1102
1103                 INSTR_TIME_SET_CURRENT(start);
1104                 if ((st->con = doConnect()) == NULL)
1105                 {
1106                         fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1107                         return clientDone(st, false);
1108                 }
1109                 INSTR_TIME_SET_CURRENT(end);
1110                 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1111         }
1112
1113         /* Record transaction start time if logging is enabled */
1114         if (logfile && st->state == 0)
1115                 INSTR_TIME_SET_CURRENT(st->txn_begin);
1116
1117         /* Record statement start time if per-command latencies are requested */
1118         if (is_latencies)
1119                 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1120
1121         if (commands[st->state]->type == SQL_COMMAND)
1122         {
1123                 const Command *command = commands[st->state];
1124                 int                     r;
1125
1126                 if (querymode == QUERY_SIMPLE)
1127                 {
1128                         char       *sql;
1129
1130                         sql = pg_strdup(command->argv[0]);
1131                         sql = assignVariables(st, sql);
1132
1133                         if (debug)
1134                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1135                         r = PQsendQuery(st->con, sql);
1136                         free(sql);
1137                 }
1138                 else if (querymode == QUERY_EXTENDED)
1139                 {
1140                         const char *sql = command->argv[0];
1141                         const char *params[MAX_ARGS];
1142
1143                         getQueryParams(st, command, params);
1144
1145                         if (debug)
1146                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1147                         r = PQsendQueryParams(st->con, sql, command->argc - 1,
1148                                                                   NULL, params, NULL, NULL, 0);
1149                 }
1150                 else if (querymode == QUERY_PREPARED)
1151                 {
1152                         char            name[MAX_PREPARE_NAME];
1153                         const char *params[MAX_ARGS];
1154
1155                         if (!st->prepared[st->use_file])
1156                         {
1157                                 int                     j;
1158
1159                                 for (j = 0; commands[j] != NULL; j++)
1160                                 {
1161                                         PGresult   *res;
1162                                         char            name[MAX_PREPARE_NAME];
1163
1164                                         if (commands[j]->type != SQL_COMMAND)
1165                                                 continue;
1166                                         preparedStatementName(name, st->use_file, j);
1167                                         res = PQprepare(st->con, name,
1168                                                   commands[j]->argv[0], commands[j]->argc - 1, NULL);
1169                                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1170                                                 fprintf(stderr, "%s", PQerrorMessage(st->con));
1171                                         PQclear(res);
1172                                 }
1173                                 st->prepared[st->use_file] = true;
1174                         }
1175
1176                         getQueryParams(st, command, params);
1177                         preparedStatementName(name, st->use_file, st->state);
1178
1179                         if (debug)
1180                                 fprintf(stderr, "client %d sending %s\n", st->id, name);
1181                         r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1182                                                                         params, NULL, NULL, 0);
1183                 }
1184                 else    /* unknown sql mode */
1185                         r = 0;
1186
1187                 if (r == 0)
1188                 {
1189                         if (debug)
1190                                 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1191                         st->ecnt++;
1192                 }
1193                 else
1194                         st->listen = 1;         /* flags that should be listened */
1195         }
1196         else if (commands[st->state]->type == META_COMMAND)
1197         {
1198                 int                     argc = commands[st->state]->argc,
1199                                         i;
1200                 char      **argv = commands[st->state]->argv;
1201
1202                 if (debug)
1203                 {
1204                         fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1205                         for (i = 1; i < argc; i++)
1206                                 fprintf(stderr, " %s", argv[i]);
1207                         fprintf(stderr, "\n");
1208                 }
1209
1210                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1211                 {
1212                         char       *var;
1213                         int64           min,
1214                                                 max;
1215                         char            res[64];
1216
1217                         if (*argv[2] == ':')
1218                         {
1219                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1220                                 {
1221                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1222                                         st->ecnt++;
1223                                         return true;
1224                                 }
1225                                 min = strtoint64(var);
1226                         }
1227                         else
1228                                 min = strtoint64(argv[2]);
1229
1230 #ifdef NOT_USED
1231                         if (min < 0)
1232                         {
1233                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1234                                 st->ecnt++;
1235                                 return;
1236                         }
1237 #endif
1238
1239                         if (*argv[3] == ':')
1240                         {
1241                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1242                                 {
1243                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1244                                         st->ecnt++;
1245                                         return true;
1246                                 }
1247                                 max = strtoint64(var);
1248                         }
1249                         else
1250                                 max = strtoint64(argv[3]);
1251
1252                         if (max < min)
1253                         {
1254                                 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1255                                 st->ecnt++;
1256                                 return true;
1257                         }
1258
1259                         /*
1260                          * getrand() needs to be able to subtract max from min and add one
1261                          * to the result without overflowing.  Since we know max > min, we
1262                          * can detect overflow just by checking for a negative result. But
1263                          * we must check both that the subtraction doesn't overflow, and
1264                          * that adding one to the result doesn't overflow either.
1265                          */
1266                         if (max - min < 0 || (max - min) + 1 < 0)
1267                         {
1268                                 fprintf(stderr, "%s: range too large\n", argv[0]);
1269                                 st->ecnt++;
1270                                 return true;
1271                         }
1272
1273 #ifdef DEBUG
1274                         printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1275 #endif
1276                         snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1277
1278                         if (!putVariable(st, argv[0], argv[1], res))
1279                         {
1280                                 st->ecnt++;
1281                                 return true;
1282                         }
1283
1284                         st->listen = 1;
1285                 }
1286                 else if (pg_strcasecmp(argv[0], "set") == 0)
1287                 {
1288                         char       *var;
1289                         int64           ope1,
1290                                                 ope2;
1291                         char            res[64];
1292
1293                         if (*argv[2] == ':')
1294                         {
1295                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1296                                 {
1297                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1298                                         st->ecnt++;
1299                                         return true;
1300                                 }
1301                                 ope1 = strtoint64(var);
1302                         }
1303                         else
1304                                 ope1 = strtoint64(argv[2]);
1305
1306                         if (argc < 5)
1307                                 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
1308                         else
1309                         {
1310                                 if (*argv[4] == ':')
1311                                 {
1312                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
1313                                         {
1314                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
1315                                                 st->ecnt++;
1316                                                 return true;
1317                                         }
1318                                         ope2 = strtoint64(var);
1319                                 }
1320                                 else
1321                                         ope2 = strtoint64(argv[4]);
1322
1323                                 if (strcmp(argv[3], "+") == 0)
1324                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
1325                                 else if (strcmp(argv[3], "-") == 0)
1326                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
1327                                 else if (strcmp(argv[3], "*") == 0)
1328                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
1329                                 else if (strcmp(argv[3], "/") == 0)
1330                                 {
1331                                         if (ope2 == 0)
1332                                         {
1333                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
1334                                                 st->ecnt++;
1335                                                 return true;
1336                                         }
1337                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
1338                                 }
1339                                 else
1340                                 {
1341                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
1342                                         st->ecnt++;
1343                                         return true;
1344                                 }
1345                         }
1346
1347                         if (!putVariable(st, argv[0], argv[1], res))
1348                         {
1349                                 st->ecnt++;
1350                                 return true;
1351                         }
1352
1353                         st->listen = 1;
1354                 }
1355                 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1356                 {
1357                         char       *var;
1358                         int                     usec;
1359                         instr_time      now;
1360
1361                         if (*argv[1] == ':')
1362                         {
1363                                 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1364                                 {
1365                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1366                                         st->ecnt++;
1367                                         return true;
1368                                 }
1369                                 usec = atoi(var);
1370                         }
1371                         else
1372                                 usec = atoi(argv[1]);
1373
1374                         if (argc > 2)
1375                         {
1376                                 if (pg_strcasecmp(argv[2], "ms") == 0)
1377                                         usec *= 1000;
1378                                 else if (pg_strcasecmp(argv[2], "s") == 0)
1379                                         usec *= 1000000;
1380                         }
1381                         else
1382                                 usec *= 1000000;
1383
1384                         INSTR_TIME_SET_CURRENT(now);
1385                         st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
1386                         st->sleeping = 1;
1387
1388                         st->listen = 1;
1389                 }
1390                 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1391                 {
1392                         bool            ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1393
1394                         if (timer_exceeded) /* timeout */
1395                                 return clientDone(st, true);
1396                         else if (!ret)          /* on error */
1397                         {
1398                                 st->ecnt++;
1399                                 return true;
1400                         }
1401                         else    /* succeeded */
1402                                 st->listen = 1;
1403                 }
1404                 else if (pg_strcasecmp(argv[0], "shell") == 0)
1405                 {
1406                         bool            ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1407
1408                         if (timer_exceeded) /* timeout */
1409                                 return clientDone(st, true);
1410                         else if (!ret)          /* on error */
1411                         {
1412                                 st->ecnt++;
1413                                 return true;
1414                         }
1415                         else    /* succeeded */
1416                                 st->listen = 1;
1417                 }
1418                 goto top;
1419         }
1420
1421         return true;
1422 }
1423
1424 /* discard connections */
1425 static void
1426 disconnect_all(CState *state, int length)
1427 {
1428         int                     i;
1429
1430         for (i = 0; i < length; i++)
1431         {
1432                 if (state[i].con)
1433                 {
1434                         PQfinish(state[i].con);
1435                         state[i].con = NULL;
1436                 }
1437         }
1438 }
1439
1440 /* create tables and setup data */
1441 static void
1442 init(bool is_no_vacuum)
1443 {
1444 /* The scale factor at/beyond which 32bit integers are incapable of storing
1445  * 64bit values.
1446  *
1447  * Although the actual threshold is 21474, we use 20000 because it is easier to
1448  * document and remember, and isn't that far away from the real threshold.
1449  */
1450 #define SCALE_32BIT_THRESHOLD 20000
1451
1452         /*
1453          * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1454          * fields in these table declarations were intended to comply with that.
1455          * But because they default to NULLs, they don't actually take any space.
1456          * We could fix that by giving them non-null default values. However, that
1457          * would completely break comparability of pgbench results with prior
1458          * versions.  Since pgbench has never pretended to be fully TPC-B
1459          * compliant anyway, we stick with the historical behavior.
1460          */
1461         struct ddlinfo
1462         {
1463                 char       *table;
1464                 char       *cols;
1465                 int                     declare_fillfactor;
1466         };
1467         struct ddlinfo DDLs[] = {
1468                 {
1469                         "pgbench_history",
1470                         scale >= SCALE_32BIT_THRESHOLD
1471                         ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
1472                         : "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
1473                         0
1474                 },
1475                 {
1476                         "pgbench_tellers",
1477                         "tid int not null,bid int,tbalance int,filler char(84)",
1478                         1
1479                 },
1480                 {
1481                         "pgbench_accounts",
1482                         scale >= SCALE_32BIT_THRESHOLD
1483                         ? "aid bigint not null,bid int,abalance int,filler char(84)"
1484                         : "aid    int not null,bid int,abalance int,filler char(84)",
1485                         1
1486                 },
1487                 {
1488                         "pgbench_branches",
1489                         "bid int not null,bbalance int,filler char(88)",
1490                         1
1491                 }
1492         };
1493         static char *DDLAFTERs[] = {
1494                 "alter table pgbench_branches add primary key (bid)",
1495                 "alter table pgbench_tellers add primary key (tid)",
1496                 "alter table pgbench_accounts add primary key (aid)"
1497         };
1498         static char *DDLKEYs[] = {
1499                 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1500                 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1501                 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1502                 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1503                 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1504         };
1505
1506         PGconn     *con;
1507         PGresult   *res;
1508         char            sql[256];
1509         int                     i;
1510         int64           k;
1511
1512         /* used to track elapsed time and estimate of the remaining time */
1513         instr_time      start,
1514                                 diff;
1515         double          elapsed_sec,
1516                                 remaining_sec;
1517         int                     log_interval = 1;
1518
1519         if ((con = doConnect()) == NULL)
1520                 exit(1);
1521
1522         for (i = 0; i < lengthof(DDLs); i++)
1523         {
1524                 char            opts[256];
1525                 char            buffer[256];
1526                 struct ddlinfo *ddl = &DDLs[i];
1527
1528                 /* Remove old table, if it exists. */
1529                 snprintf(buffer, 256, "drop table if exists %s", ddl->table);
1530                 executeStatement(con, buffer);
1531
1532                 /* Construct new create table statement. */
1533                 opts[0] = '\0';
1534                 if (ddl->declare_fillfactor)
1535                         snprintf(opts + strlen(opts), 256 - strlen(opts),
1536                                          " with (fillfactor=%d)", fillfactor);
1537                 if (tablespace != NULL)
1538                 {
1539                         char       *escape_tablespace;
1540
1541                         escape_tablespace = PQescapeIdentifier(con, tablespace,
1542                                                                                                    strlen(tablespace));
1543                         snprintf(opts + strlen(opts), 256 - strlen(opts),
1544                                          " tablespace %s", escape_tablespace);
1545                         PQfreemem(escape_tablespace);
1546                 }
1547                 snprintf(buffer, 256, "create%s table %s(%s)%s",
1548                                  unlogged_tables ? " unlogged" : "",
1549                                  ddl->table, ddl->cols, opts);
1550
1551                 executeStatement(con, buffer);
1552         }
1553
1554         executeStatement(con, "begin");
1555
1556         for (i = 0; i < nbranches * scale; i++)
1557         {
1558                 snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
1559                 executeStatement(con, sql);
1560         }
1561
1562         for (i = 0; i < ntellers * scale; i++)
1563         {
1564                 snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
1565                                  i + 1, i / ntellers + 1);
1566                 executeStatement(con, sql);
1567         }
1568
1569         executeStatement(con, "commit");
1570
1571         /*
1572          * fill the pgbench_accounts table with some data
1573          */
1574         fprintf(stderr, "creating tables...\n");
1575
1576         executeStatement(con, "begin");
1577         executeStatement(con, "truncate pgbench_accounts");
1578
1579         res = PQexec(con, "copy pgbench_accounts from stdin");
1580         if (PQresultStatus(res) != PGRES_COPY_IN)
1581         {
1582                 fprintf(stderr, "%s", PQerrorMessage(con));
1583                 exit(1);
1584         }
1585         PQclear(res);
1586
1587         INSTR_TIME_SET_CURRENT(start);
1588
1589         for (k = 0; k < (int64) naccounts * scale; k++)
1590         {
1591                 int64           j = k + 1;
1592
1593                 snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
1594                 if (PQputline(con, sql))
1595                 {
1596                         fprintf(stderr, "PQputline failed\n");
1597                         exit(1);
1598                 }
1599
1600                 /*
1601                  * If we want to stick with the original logging, print a message each
1602                  * 100k inserted rows.
1603                  */
1604                 if ((!use_quiet) && (j % 100000 == 0))
1605                 {
1606                         INSTR_TIME_SET_CURRENT(diff);
1607                         INSTR_TIME_SUBTRACT(diff, start);
1608
1609                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1610                         remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1611
1612                         fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1613                                         j, (int64) naccounts * scale,
1614                                         (int) (((int64) j * 100) / (naccounts * scale)),
1615                                         elapsed_sec, remaining_sec);
1616                 }
1617                 /* let's not call the timing for each row, but only each 100 rows */
1618                 else if (use_quiet && (j % 100 == 0))
1619                 {
1620                         INSTR_TIME_SET_CURRENT(diff);
1621                         INSTR_TIME_SUBTRACT(diff, start);
1622
1623                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1624                         remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1625
1626                         /* have we reached the next interval (or end)? */
1627                         if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
1628                         {
1629                                 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1630                                                 j, (int64) naccounts * scale,
1631                                                 (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);
1632
1633                                 /* skip to the next interval */
1634                                 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
1635                         }
1636                 }
1637
1638         }
1639         if (PQputline(con, "\\.\n"))
1640         {
1641                 fprintf(stderr, "very last PQputline failed\n");
1642                 exit(1);
1643         }
1644         if (PQendcopy(con))
1645         {
1646                 fprintf(stderr, "PQendcopy failed\n");
1647                 exit(1);
1648         }
1649         executeStatement(con, "commit");
1650
1651         /* vacuum */
1652         if (!is_no_vacuum)
1653         {
1654                 fprintf(stderr, "vacuum...\n");
1655                 executeStatement(con, "vacuum analyze pgbench_branches");
1656                 executeStatement(con, "vacuum analyze pgbench_tellers");
1657                 executeStatement(con, "vacuum analyze pgbench_accounts");
1658                 executeStatement(con, "vacuum analyze pgbench_history");
1659         }
1660
1661         /*
1662          * create indexes
1663          */
1664         fprintf(stderr, "set primary keys...\n");
1665         for (i = 0; i < lengthof(DDLAFTERs); i++)
1666         {
1667                 char            buffer[256];
1668
1669                 strncpy(buffer, DDLAFTERs[i], 256);
1670
1671                 if (index_tablespace != NULL)
1672                 {
1673                         char       *escape_tablespace;
1674
1675                         escape_tablespace = PQescapeIdentifier(con, index_tablespace,
1676                                                                                                    strlen(index_tablespace));
1677                         snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
1678                                          " using index tablespace %s", escape_tablespace);
1679                         PQfreemem(escape_tablespace);
1680                 }
1681
1682                 executeStatement(con, buffer);
1683         }
1684
1685         /*
1686          * create foreign keys
1687          */
1688         if (foreign_keys)
1689         {
1690                 fprintf(stderr, "set foreign keys...\n");
1691                 for (i = 0; i < lengthof(DDLKEYs); i++)
1692                 {
1693                         executeStatement(con, DDLKEYs[i]);
1694                 }
1695         }
1696
1697
1698         fprintf(stderr, "done.\n");
1699         PQfinish(con);
1700 }
1701
1702 /*
1703  * Parse the raw sql and replace :param to $n.
1704  */
1705 static bool
1706 parseQuery(Command *cmd, const char *raw_sql)
1707 {
1708         char       *sql,
1709                            *p;
1710
1711         sql = pg_strdup(raw_sql);
1712         cmd->argc = 1;
1713
1714         p = sql;
1715         while ((p = strchr(p, ':')) != NULL)
1716         {
1717                 char            var[12];
1718                 char       *name;
1719                 int                     eaten;
1720
1721                 name = parseVariable(p, &eaten);
1722                 if (name == NULL)
1723                 {
1724                         while (*p == ':')
1725                         {
1726                                 p++;
1727                         }
1728                         continue;
1729                 }
1730
1731                 if (cmd->argc >= MAX_ARGS)
1732                 {
1733                         fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
1734                         return false;
1735                 }
1736
1737                 sprintf(var, "$%d", cmd->argc);
1738                 p = replaceVariable(&sql, p, eaten, var);
1739
1740                 cmd->argv[cmd->argc] = name;
1741                 cmd->argc++;
1742         }
1743
1744         cmd->argv[0] = sql;
1745         return true;
1746 }
1747
1748 /* Parse a command; return a Command struct, or NULL if it's a comment */
1749 static Command *
1750 process_commands(char *buf)
1751 {
1752         const char      delim[] = " \f\n\r\t\v";
1753
1754         Command    *my_commands;
1755         int                     j;
1756         char       *p,
1757                            *tok;
1758
1759         /* Make the string buf end at the next newline */
1760         if ((p = strchr(buf, '\n')) != NULL)
1761                 *p = '\0';
1762
1763         /* Skip leading whitespace */
1764         p = buf;
1765         while (isspace((unsigned char) *p))
1766                 p++;
1767
1768         /* If the line is empty or actually a comment, we're done */
1769         if (*p == '\0' || strncmp(p, "--", 2) == 0)
1770                 return NULL;
1771
1772         /* Allocate and initialize Command structure */
1773         my_commands = (Command *) pg_malloc(sizeof(Command));
1774         my_commands->line = pg_strdup(buf);
1775         my_commands->command_num = num_commands++;
1776         my_commands->type = 0;          /* until set */
1777         my_commands->argc = 0;
1778
1779         if (*p == '\\')
1780         {
1781                 my_commands->type = META_COMMAND;
1782
1783                 j = 0;
1784                 tok = strtok(++p, delim);
1785
1786                 while (tok != NULL)
1787                 {
1788                         my_commands->argv[j++] = pg_strdup(tok);
1789                         my_commands->argc++;
1790                         tok = strtok(NULL, delim);
1791                 }
1792
1793                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
1794                 {
1795                         if (my_commands->argc < 4)
1796                         {
1797                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1798                                 exit(1);
1799                         }
1800
1801                         for (j = 4; j < my_commands->argc; j++)
1802                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1803                                                 my_commands->argv[0], my_commands->argv[j]);
1804                 }
1805                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
1806                 {
1807                         if (my_commands->argc < 3)
1808                         {
1809                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1810                                 exit(1);
1811                         }
1812
1813                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
1814                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1815                                                 my_commands->argv[0], my_commands->argv[j]);
1816                 }
1817                 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
1818                 {
1819                         if (my_commands->argc < 2)
1820                         {
1821                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1822                                 exit(1);
1823                         }
1824
1825                         /*
1826                          * Split argument into number and unit to allow "sleep 1ms" etc.
1827                          * We don't have to terminate the number argument with null
1828                          * because it will be parsed with atoi, which ignores trailing
1829                          * non-digit characters.
1830                          */
1831                         if (my_commands->argv[1][0] != ':')
1832                         {
1833                                 char       *c = my_commands->argv[1];
1834
1835                                 while (isdigit((unsigned char) *c))
1836                                         c++;
1837                                 if (*c)
1838                                 {
1839                                         my_commands->argv[2] = c;
1840                                         if (my_commands->argc < 3)
1841                                                 my_commands->argc = 3;
1842                                 }
1843                         }
1844
1845                         if (my_commands->argc >= 3)
1846                         {
1847                                 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
1848                                         pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
1849                                         pg_strcasecmp(my_commands->argv[2], "s") != 0)
1850                                 {
1851                                         fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
1852                                                         my_commands->argv[0], my_commands->argv[2]);
1853                                         exit(1);
1854                                 }
1855                         }
1856
1857                         for (j = 3; j < my_commands->argc; j++)
1858                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1859                                                 my_commands->argv[0], my_commands->argv[j]);
1860                 }
1861                 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
1862                 {
1863                         if (my_commands->argc < 3)
1864                         {
1865                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1866                                 exit(1);
1867                         }
1868                 }
1869                 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
1870                 {
1871                         if (my_commands->argc < 1)
1872                         {
1873                                 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
1874                                 exit(1);
1875                         }
1876                 }
1877                 else
1878                 {
1879                         fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
1880                         exit(1);
1881                 }
1882         }
1883         else
1884         {
1885                 my_commands->type = SQL_COMMAND;
1886
1887                 switch (querymode)
1888                 {
1889                         case QUERY_SIMPLE:
1890                                 my_commands->argv[0] = pg_strdup(p);
1891                                 my_commands->argc++;
1892                                 break;
1893                         case QUERY_EXTENDED:
1894                         case QUERY_PREPARED:
1895                                 if (!parseQuery(my_commands, p))
1896                                         exit(1);
1897                                 break;
1898                         default:
1899                                 exit(1);
1900                 }
1901         }
1902
1903         return my_commands;
1904 }
1905
1906 static int
1907 process_file(char *filename)
1908 {
1909 #define COMMANDS_ALLOC_NUM 128
1910
1911         Command   **my_commands;
1912         FILE       *fd;
1913         int                     lineno;
1914         char            buf[BUFSIZ];
1915         int                     alloc_num;
1916
1917         if (num_files >= MAX_FILES)
1918         {
1919                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
1920                 exit(1);
1921         }
1922
1923         alloc_num = COMMANDS_ALLOC_NUM;
1924         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1925
1926         if (strcmp(filename, "-") == 0)
1927                 fd = stdin;
1928         else if ((fd = fopen(filename, "r")) == NULL)
1929         {
1930                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
1931                 return false;
1932         }
1933
1934         lineno = 0;
1935
1936         while (fgets(buf, sizeof(buf), fd) != NULL)
1937         {
1938                 Command    *command;
1939
1940                 command = process_commands(buf);
1941                 if (command == NULL)
1942                         continue;
1943
1944                 my_commands[lineno] = command;
1945                 lineno++;
1946
1947                 if (lineno >= alloc_num)
1948                 {
1949                         alloc_num += COMMANDS_ALLOC_NUM;
1950                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
1951                 }
1952         }
1953         fclose(fd);
1954
1955         my_commands[lineno] = NULL;
1956
1957         sql_files[num_files++] = my_commands;
1958
1959         return true;
1960 }
1961
1962 static Command **
1963 process_builtin(char *tb)
1964 {
1965 #define COMMANDS_ALLOC_NUM 128
1966
1967         Command   **my_commands;
1968         int                     lineno;
1969         char            buf[BUFSIZ];
1970         int                     alloc_num;
1971
1972         alloc_num = COMMANDS_ALLOC_NUM;
1973         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1974
1975         lineno = 0;
1976
1977         for (;;)
1978         {
1979                 char       *p;
1980                 Command    *command;
1981
1982                 p = buf;
1983                 while (*tb && *tb != '\n')
1984                         *p++ = *tb++;
1985
1986                 if (*tb == '\0')
1987                         break;
1988
1989                 if (*tb == '\n')
1990                         tb++;
1991
1992                 *p = '\0';
1993
1994                 command = process_commands(buf);
1995                 if (command == NULL)
1996                         continue;
1997
1998                 my_commands[lineno] = command;
1999                 lineno++;
2000
2001                 if (lineno >= alloc_num)
2002                 {
2003                         alloc_num += COMMANDS_ALLOC_NUM;
2004                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2005                 }
2006         }
2007
2008         my_commands[lineno] = NULL;
2009
2010         return my_commands;
2011 }
2012
2013 /* print out results */
2014 static void
2015 printResults(int ttype, int normal_xacts, int nclients,
2016                          TState *threads, int nthreads,
2017                          instr_time total_time, instr_time conn_total_time)
2018 {
2019         double          time_include,
2020                                 tps_include,
2021                                 tps_exclude;
2022         char       *s;
2023
2024         time_include = INSTR_TIME_GET_DOUBLE(total_time);
2025         tps_include = normal_xacts / time_include;
2026         tps_exclude = normal_xacts / (time_include -
2027                                                 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2028
2029         if (ttype == 0)
2030                 s = "TPC-B (sort of)";
2031         else if (ttype == 2)
2032                 s = "Update only pgbench_accounts";
2033         else if (ttype == 1)
2034                 s = "SELECT only";
2035         else
2036                 s = "Custom query";
2037
2038         printf("transaction type: %s\n", s);
2039         printf("scaling factor: %d\n", scale);
2040         printf("query mode: %s\n", QUERYMODE[querymode]);
2041         printf("number of clients: %d\n", nclients);
2042         printf("number of threads: %d\n", nthreads);
2043         if (duration <= 0)
2044         {
2045                 printf("number of transactions per client: %d\n", nxacts);
2046                 printf("number of transactions actually processed: %d/%d\n",
2047                            normal_xacts, nxacts * nclients);
2048         }
2049         else
2050         {
2051                 printf("duration: %d s\n", duration);
2052                 printf("number of transactions actually processed: %d\n",
2053                            normal_xacts);
2054         }
2055         printf("tps = %f (including connections establishing)\n", tps_include);
2056         printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2057
2058         /* Report per-command latencies */
2059         if (is_latencies)
2060         {
2061                 int                     i;
2062
2063                 for (i = 0; i < num_files; i++)
2064                 {
2065                         Command   **commands;
2066
2067                         if (num_files > 1)
2068                                 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2069                         else
2070                                 printf("statement latencies in milliseconds:\n");
2071
2072                         for (commands = sql_files[i]; *commands != NULL; commands++)
2073                         {
2074                                 Command    *command = *commands;
2075                                 int                     cnum = command->command_num;
2076                                 double          total_time;
2077                                 instr_time      total_exec_elapsed;
2078                                 int                     total_exec_count;
2079                                 int                     t;
2080
2081                                 /* Accumulate per-thread data for command */
2082                                 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2083                                 total_exec_count = 0;
2084                                 for (t = 0; t < nthreads; t++)
2085                                 {
2086                                         TState     *thread = &threads[t];
2087
2088                                         INSTR_TIME_ADD(total_exec_elapsed,
2089                                                                    thread->exec_elapsed[cnum]);
2090                                         total_exec_count += thread->exec_count[cnum];
2091                                 }
2092
2093                                 if (total_exec_count > 0)
2094                                         total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2095                                 else
2096                                         total_time = 0.0;
2097
2098                                 printf("\t%f\t%s\n", total_time, command->line);
2099                         }
2100                 }
2101         }
2102 }
2103
2104
2105 int
2106 main(int argc, char **argv)
2107 {
2108         static struct option long_options[] = {
2109                 /* systematic long/short named options*/
2110                 {"client", required_argument, NULL, 'c'},
2111                 {"connect", no_argument, NULL, 'C'},
2112                 {"debug", no_argument, NULL, 'd'},
2113                 {"define", required_argument, NULL, 'D'},
2114                 {"file", required_argument, NULL, 'f'},
2115                 {"fillfactor", required_argument, NULL, 'F'},
2116                 {"host", required_argument, NULL, 'h'},
2117                 {"initialize", no_argument, NULL, 'i'},
2118                 {"jobs", required_argument, NULL, 'j'},
2119                 {"log", no_argument, NULL, 'l'},
2120                 {"no-vacuum", no_argument, NULL, 'n'},
2121                 {"port", required_argument, NULL, 'p'},
2122                 {"protocol", required_argument, NULL, 'M'},
2123                 {"quiet", no_argument, NULL, 'q'},
2124                 {"report-latencies", no_argument, NULL, 'r'},
2125                 {"scale", required_argument, NULL, 's'},
2126                 {"select-only", no_argument, NULL, 'S'},
2127                 {"skip-some-updates", no_argument, NULL, 'N'},
2128                 {"time", required_argument, NULL, 'T'},
2129                 {"transactions", required_argument, NULL, 't'},
2130                 {"username", required_argument, NULL, 'U'},
2131                 {"vacuum-all", no_argument, NULL, 'v'},
2132                 /* long-named only options */
2133                 {"foreign-keys", no_argument, &foreign_keys, 1},
2134                 {"index-tablespace", required_argument, NULL, 3},
2135                 {"tablespace", required_argument, NULL, 2},
2136                 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2137                 {"sampling-rate", required_argument, NULL, 4},
2138                 {"aggregate-interval", required_argument, NULL, 5},
2139                 {NULL, 0, NULL, 0}
2140         };
2141
2142         int                     c;
2143         int                     nclients = 1;   /* default number of simulated clients */
2144         int                     nthreads = 1;   /* default number of threads */
2145         int                     is_init_mode = 0;               /* initialize mode? */
2146         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
2147         int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2148         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
2149                                                                  * 2: skip update of branches and tellers */
2150         int                     optindex;
2151         char       *filename = NULL;
2152         bool            scale_given = false;
2153
2154         CState     *state;                      /* status of clients */
2155         TState     *threads;            /* array of thread */
2156
2157         instr_time      start_time;             /* start up time */
2158         instr_time      total_time;
2159         instr_time      conn_total_time;
2160         int                     total_xacts;
2161
2162         int                     i;
2163
2164 #ifdef HAVE_GETRLIMIT
2165         struct rlimit rlim;
2166 #endif
2167
2168         PGconn     *con;
2169         PGresult   *res;
2170         char       *env;
2171
2172         char            val[64];
2173
2174         progname = get_progname(argv[0]);
2175
2176         if (argc > 1)
2177         {
2178                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2179                 {
2180                         usage();
2181                         exit(0);
2182                 }
2183                 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2184                 {
2185                         puts("pgbench (PostgreSQL) " PG_VERSION);
2186                         exit(0);
2187                 }
2188         }
2189
2190 #ifdef WIN32
2191         /* stderr is buffered on Win32. */
2192         setvbuf(stderr, NULL, _IONBF, 0);
2193 #endif
2194
2195         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2196                 pghost = env;
2197         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2198                 pgport = env;
2199         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2200                 login = env;
2201
2202         state = (CState *) pg_malloc(sizeof(CState));
2203         memset(state, 0, sizeof(CState));
2204
2205         while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
2206         {
2207                 switch (c)
2208                 {
2209                         case 'i':
2210                                 is_init_mode++;
2211                                 break;
2212                         case 'h':
2213                                 pghost = pg_strdup(optarg);
2214                                 break;
2215                         case 'n':
2216                                 is_no_vacuum++;
2217                                 break;
2218                         case 'v':
2219                                 do_vacuum_accounts++;
2220                                 break;
2221                         case 'p':
2222                                 pgport = pg_strdup(optarg);
2223                                 break;
2224                         case 'd':
2225                                 debug++;
2226                                 break;
2227                         case 'S':
2228                                 ttype = 1;
2229                                 break;
2230                         case 'N':
2231                                 ttype = 2;
2232                                 break;
2233                         case 'c':
2234                                 nclients = atoi(optarg);
2235                                 if (nclients <= 0 || nclients > MAXCLIENTS)
2236                                 {
2237                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
2238                                         exit(1);
2239                                 }
2240 #ifdef HAVE_GETRLIMIT
2241 #ifdef RLIMIT_NOFILE                    /* most platforms use RLIMIT_NOFILE */
2242                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2243 #else                                                   /* but BSD doesn't ... */
2244                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2245 #endif   /* RLIMIT_NOFILE */
2246                                 {
2247                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2248                                         exit(1);
2249                                 }
2250                                 if (rlim.rlim_cur <= (nclients + 2))
2251                                 {
2252                                         fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2253                                         fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2254                                         exit(1);
2255                                 }
2256 #endif   /* HAVE_GETRLIMIT */
2257                                 break;
2258                         case 'j':                       /* jobs */
2259                                 nthreads = atoi(optarg);
2260                                 if (nthreads <= 0)
2261                                 {
2262                                         fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2263                                         exit(1);
2264                                 }
2265                                 break;
2266                         case 'C':
2267                                 is_connect = true;
2268                                 break;
2269                         case 'r':
2270                                 is_latencies = true;
2271                                 break;
2272                         case 's':
2273                                 scale_given = true;
2274                                 scale = atoi(optarg);
2275                                 if (scale <= 0)
2276                                 {
2277                                         fprintf(stderr, "invalid scaling factor: %d\n", scale);
2278                                         exit(1);
2279                                 }
2280                                 break;
2281                         case 't':
2282                                 if (duration > 0)
2283                                 {
2284                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2285                                         exit(1);
2286                                 }
2287                                 nxacts = atoi(optarg);
2288                                 if (nxacts <= 0)
2289                                 {
2290                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2291                                         exit(1);
2292                                 }
2293                                 break;
2294                         case 'T':
2295                                 if (nxacts > 0)
2296                                 {
2297                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2298                                         exit(1);
2299                                 }
2300                                 duration = atoi(optarg);
2301                                 if (duration <= 0)
2302                                 {
2303                                         fprintf(stderr, "invalid duration: %d\n", duration);
2304                                         exit(1);
2305                                 }
2306                                 break;
2307                         case 'U':
2308                                 login = pg_strdup(optarg);
2309                                 break;
2310                         case 'l':
2311                                 use_log = true;
2312                                 break;
2313                         case 'q':
2314                                 use_quiet = true;
2315                                 break;
2316                         case 'f':
2317                                 ttype = 3;
2318                                 filename = pg_strdup(optarg);
2319                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2320                                         exit(1);
2321                                 break;
2322                         case 'D':
2323                                 {
2324                                         char       *p;
2325
2326                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2327                                         {
2328                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2329                                                 exit(1);
2330                                         }
2331
2332                                         *p++ = '\0';
2333                                         if (!putVariable(&state[0], "option", optarg, p))
2334                                                 exit(1);
2335                                 }
2336                                 break;
2337                         case 'F':
2338                                 fillfactor = atoi(optarg);
2339                                 if ((fillfactor < 10) || (fillfactor > 100))
2340                                 {
2341                                         fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2342                                         exit(1);
2343                                 }
2344                                 break;
2345                         case 'M':
2346                                 if (num_files > 0)
2347                                 {
2348                                         fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
2349                                         exit(1);
2350                                 }
2351                                 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2352                                         if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2353                                                 break;
2354                                 if (querymode >= NUM_QUERYMODE)
2355                                 {
2356                                         fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
2357                                         exit(1);
2358                                 }
2359                                 break;
2360                         case 0:
2361                                 /* This covers long options which take no argument. */
2362                                 break;
2363                         case 2:                         /* tablespace */
2364                                 tablespace = pg_strdup(optarg);
2365                                 break;
2366                         case 3:                         /* index-tablespace */
2367                                 index_tablespace = pg_strdup(optarg);
2368                                 break;
2369                         case 4:
2370                                 sample_rate = atof(optarg);
2371                                 if (sample_rate <= 0.0 || sample_rate > 1.0)
2372                                 {
2373                                         fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
2374                                         exit(1);
2375                                 }
2376                                 break;
2377                         case 5:
2378 #ifdef WIN32
2379                                 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
2380                                 exit(1);
2381 #else
2382                                 agg_interval = atoi(optarg);
2383                                 if (agg_interval <= 0)
2384                                 {
2385                                         fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
2386                                         exit(1);
2387                                 }
2388 #endif
2389                                 break;
2390                         default:
2391                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
2392                                 exit(1);
2393                                 break;
2394                 }
2395         }
2396
2397         if (argc > optind)
2398                 dbName = argv[optind];
2399         else
2400         {
2401                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
2402                         dbName = env;
2403                 else if (login != NULL && *login != '\0')
2404                         dbName = login;
2405                 else
2406                         dbName = "";
2407         }
2408
2409         if (is_init_mode)
2410         {
2411                 init(is_no_vacuum);
2412                 exit(0);
2413         }
2414
2415         /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
2416         if (nxacts <= 0 && duration <= 0)
2417                 nxacts = DEFAULT_NXACTS;
2418
2419         if (nclients % nthreads != 0)
2420         {
2421                 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
2422                 exit(1);
2423         }
2424
2425         /* --sampling-rate may be used only with -l */
2426         if (sample_rate > 0.0 && !use_log)
2427         {
2428                 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
2429                 exit(1);
2430         }
2431
2432         /* -q may be used only with -i */
2433         if (use_quiet && !is_init_mode)
2434         {
2435                 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
2436                 exit(1);
2437         }
2438
2439         /* --sampling-rate may must not be used with --aggregate-interval */
2440         if (sample_rate > 0.0 && agg_interval > 0)
2441         {
2442                 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
2443                 exit(1);
2444         }
2445
2446         if (agg_interval > 0 && (!use_log))
2447         {
2448                 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
2449                 exit(1);
2450         }
2451
2452         if ((duration > 0) && (agg_interval > duration))
2453         {
2454                 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
2455                 exit(1);
2456         }
2457
2458         if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
2459         {
2460                 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
2461                 exit(1);
2462         }
2463
2464         /*
2465          * is_latencies only works with multiple threads in thread-based
2466          * implementations, not fork-based ones, because it supposes that the
2467          * parent can see changes made to the per-thread execution stats by child
2468          * threads.  It seems useful enough to accept despite this limitation, but
2469          * perhaps we should FIXME someday (by passing the stats data back up
2470          * through the parent-to-child pipes).
2471          */
2472 #ifndef ENABLE_THREAD_SAFETY
2473         if (is_latencies && nthreads > 1)
2474         {
2475                 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
2476                 exit(1);
2477         }
2478 #endif
2479
2480         /*
2481          * save main process id in the global variable because process id will be
2482          * changed after fork.
2483          */
2484         main_pid = (int) getpid();
2485
2486         if (nclients > 1)
2487         {
2488                 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
2489                 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
2490
2491                 /* copy any -D switch values to all clients */
2492                 for (i = 1; i < nclients; i++)
2493                 {
2494                         int                     j;
2495
2496                         state[i].id = i;
2497                         for (j = 0; j < state[0].nvariables; j++)
2498                         {
2499                                 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
2500                                         exit(1);
2501                         }
2502                 }
2503         }
2504
2505         if (debug)
2506         {
2507                 if (duration <= 0)
2508                         printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
2509                                    pghost, pgport, nclients, nxacts, dbName);
2510                 else
2511                         printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
2512                                    pghost, pgport, nclients, duration, dbName);
2513         }
2514
2515         /* opening connection... */
2516         con = doConnect();
2517         if (con == NULL)
2518                 exit(1);
2519
2520         if (PQstatus(con) == CONNECTION_BAD)
2521         {
2522                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
2523                 fprintf(stderr, "%s", PQerrorMessage(con));
2524                 exit(1);
2525         }
2526
2527         if (ttype != 3)
2528         {
2529                 /*
2530                  * get the scaling factor that should be same as count(*) from
2531                  * pgbench_branches if this is not a custom query
2532                  */
2533                 res = PQexec(con, "select count(*) from pgbench_branches");
2534                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2535                 {
2536                         fprintf(stderr, "%s", PQerrorMessage(con));
2537                         exit(1);
2538                 }
2539                 scale = atoi(PQgetvalue(res, 0, 0));
2540                 if (scale < 0)
2541                 {
2542                         fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
2543                         exit(1);
2544                 }
2545                 PQclear(res);
2546
2547                 /* warn if we override user-given -s switch */
2548                 if (scale_given)
2549                         fprintf(stderr,
2550                         "Scale option ignored, using pgbench_branches table count = %d\n",
2551                                         scale);
2552         }
2553
2554         /*
2555          * :scale variables normally get -s or database scale, but don't override
2556          * an explicit -D switch
2557          */
2558         if (getVariable(&state[0], "scale") == NULL)
2559         {
2560                 snprintf(val, sizeof(val), "%d", scale);
2561                 for (i = 0; i < nclients; i++)
2562                 {
2563                         if (!putVariable(&state[i], "startup", "scale", val))
2564                                 exit(1);
2565                 }
2566         }
2567
2568         /*
2569          * Define a :client_id variable that is unique per connection. But don't
2570          * override an explicit -D switch.
2571          */
2572         if (getVariable(&state[0], "client_id") == NULL)
2573         {
2574                 for (i = 0; i < nclients; i++)
2575                 {
2576                         snprintf(val, sizeof(val), "%d", i);
2577                         if (!putVariable(&state[i], "startup", "client_id", val))
2578                                 exit(1);
2579                 }
2580         }
2581
2582         if (!is_no_vacuum)
2583         {
2584                 fprintf(stderr, "starting vacuum...");
2585                 executeStatement(con, "vacuum pgbench_branches");
2586                 executeStatement(con, "vacuum pgbench_tellers");
2587                 executeStatement(con, "truncate pgbench_history");
2588                 fprintf(stderr, "end.\n");
2589
2590                 if (do_vacuum_accounts)
2591                 {
2592                         fprintf(stderr, "starting vacuum pgbench_accounts...");
2593                         executeStatement(con, "vacuum analyze pgbench_accounts");
2594                         fprintf(stderr, "end.\n");
2595                 }
2596         }
2597         PQfinish(con);
2598
2599         /* set random seed */
2600         INSTR_TIME_SET_CURRENT(start_time);
2601         srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
2602
2603         /* process builtin SQL scripts */
2604         switch (ttype)
2605         {
2606                 case 0:
2607                         sql_files[0] = process_builtin(tpc_b);
2608                         num_files = 1;
2609                         break;
2610
2611                 case 1:
2612                         sql_files[0] = process_builtin(select_only);
2613                         num_files = 1;
2614                         break;
2615
2616                 case 2:
2617                         sql_files[0] = process_builtin(simple_update);
2618                         num_files = 1;
2619                         break;
2620
2621                 default:
2622                         break;
2623         }
2624
2625         /* set up thread data structures */
2626         threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
2627         for (i = 0; i < nthreads; i++)
2628         {
2629                 TState     *thread = &threads[i];
2630
2631                 thread->tid = i;
2632                 thread->state = &state[nclients / nthreads * i];
2633                 thread->nstate = nclients / nthreads;
2634                 thread->random_state[0] = random();
2635                 thread->random_state[1] = random();
2636                 thread->random_state[2] = random();
2637
2638                 if (is_latencies)
2639                 {
2640                         /* Reserve memory for the thread to store per-command latencies */
2641                         int                     t;
2642
2643                         thread->exec_elapsed = (instr_time *)
2644                                 pg_malloc(sizeof(instr_time) * num_commands);
2645                         thread->exec_count = (int *)
2646                                 pg_malloc(sizeof(int) * num_commands);
2647
2648                         for (t = 0; t < num_commands; t++)
2649                         {
2650                                 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
2651                                 thread->exec_count[t] = 0;
2652                         }
2653                 }
2654                 else
2655                 {
2656                         thread->exec_elapsed = NULL;
2657                         thread->exec_count = NULL;
2658                 }
2659         }
2660
2661         /* get start up time */
2662         INSTR_TIME_SET_CURRENT(start_time);
2663
2664         /* set alarm if duration is specified. */
2665         if (duration > 0)
2666                 setalarm(duration);
2667
2668         /* start threads */
2669         for (i = 0; i < nthreads; i++)
2670         {
2671                 TState     *thread = &threads[i];
2672
2673                 INSTR_TIME_SET_CURRENT(thread->start_time);
2674
2675                 /* the first thread (i = 0) is executed by main thread */
2676                 if (i > 0)
2677                 {
2678                         int                     err = pthread_create(&thread->thread, NULL, threadRun, thread);
2679
2680                         if (err != 0 || thread->thread == INVALID_THREAD)
2681                         {
2682                                 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
2683                                 exit(1);
2684                         }
2685                 }
2686                 else
2687                 {
2688                         thread->thread = INVALID_THREAD;
2689                 }
2690         }
2691
2692         /* wait for threads and accumulate results */
2693         total_xacts = 0;
2694         INSTR_TIME_SET_ZERO(conn_total_time);
2695         for (i = 0; i < nthreads; i++)
2696         {
2697                 void       *ret = NULL;
2698
2699                 if (threads[i].thread == INVALID_THREAD)
2700                         ret = threadRun(&threads[i]);
2701                 else
2702                         pthread_join(threads[i].thread, &ret);
2703
2704                 if (ret != NULL)
2705                 {
2706                         TResult    *r = (TResult *) ret;
2707
2708                         total_xacts += r->xacts;
2709                         INSTR_TIME_ADD(conn_total_time, r->conn_time);
2710                         free(ret);
2711                 }
2712         }
2713         disconnect_all(state, nclients);
2714
2715         /* get end time */
2716         INSTR_TIME_SET_CURRENT(total_time);
2717         INSTR_TIME_SUBTRACT(total_time, start_time);
2718         printResults(ttype, total_xacts, nclients, threads, nthreads,
2719                                  total_time, conn_total_time);
2720
2721         return 0;
2722 }
2723
2724 static void *
2725 threadRun(void *arg)
2726 {
2727         TState     *thread = (TState *) arg;
2728         CState     *state = thread->state;
2729         TResult    *result;
2730         FILE       *logfile = NULL; /* per-thread log file */
2731         instr_time      start,
2732                                 end;
2733         int                     nstate = thread->nstate;
2734         int                     remains = nstate;               /* number of remaining clients */
2735         int                     i;
2736
2737         AggVals         aggs;
2738
2739         result = pg_malloc(sizeof(TResult));
2740
2741         INSTR_TIME_SET_ZERO(result->conn_time);
2742
2743         /* open log file if requested */
2744         if (use_log)
2745         {
2746                 char            logpath[64];
2747
2748                 if (thread->tid == 0)
2749                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
2750                 else
2751                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
2752                 logfile = fopen(logpath, "w");
2753
2754                 if (logfile == NULL)
2755                 {
2756                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
2757                         goto done;
2758                 }
2759         }
2760
2761         if (!is_connect)
2762         {
2763                 /* make connections to the database */
2764                 for (i = 0; i < nstate; i++)
2765                 {
2766                         if ((state[i].con = doConnect()) == NULL)
2767                                 goto done;
2768                 }
2769         }
2770
2771         /* time after thread and connections set up */
2772         INSTR_TIME_SET_CURRENT(result->conn_time);
2773         INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
2774
2775         agg_vals_init(&aggs, thread->start_time);
2776
2777         /* send start up queries in async manner */
2778         for (i = 0; i < nstate; i++)
2779         {
2780                 CState     *st = &state[i];
2781                 Command   **commands = sql_files[st->use_file];
2782                 int                     prev_ecnt = st->ecnt;
2783
2784                 st->use_file = getrand(thread, 0, num_files - 1);
2785                 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
2786                         remains--;                      /* I've aborted */
2787
2788                 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2789                 {
2790                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
2791                         remains--;                      /* I've aborted */
2792                         PQfinish(st->con);
2793                         st->con = NULL;
2794                 }
2795         }
2796
2797         while (remains > 0)
2798         {
2799                 fd_set          input_mask;
2800                 int                     maxsock;        /* max socket number to be waited */
2801                 int64           now_usec = 0;
2802                 int64           min_usec;
2803
2804                 FD_ZERO(&input_mask);
2805
2806                 maxsock = -1;
2807                 min_usec = INT64_MAX;
2808                 for (i = 0; i < nstate; i++)
2809                 {
2810                         CState     *st = &state[i];
2811                         Command   **commands = sql_files[st->use_file];
2812                         int                     sock;
2813
2814                         if (st->sleeping)
2815                         {
2816                                 int                     this_usec;
2817
2818                                 if (min_usec == INT64_MAX)
2819                                 {
2820                                         instr_time      now;
2821
2822                                         INSTR_TIME_SET_CURRENT(now);
2823                                         now_usec = INSTR_TIME_GET_MICROSEC(now);
2824                                 }
2825
2826                                 this_usec = st->until - now_usec;
2827                                 if (min_usec > this_usec)
2828                                         min_usec = this_usec;
2829                         }
2830                         else if (st->con == NULL)
2831                         {
2832                                 continue;
2833                         }
2834                         else if (commands[st->state]->type == META_COMMAND)
2835                         {
2836                                 min_usec = 0;   /* the connection is ready to run */
2837                                 break;
2838                         }
2839
2840                         sock = PQsocket(st->con);
2841                         if (sock < 0)
2842                         {
2843                                 fprintf(stderr, "bad socket: %s\n", strerror(errno));
2844                                 goto done;
2845                         }
2846
2847                         FD_SET(sock, &input_mask);
2848
2849                         if (maxsock < sock)
2850                                 maxsock = sock;
2851                 }
2852
2853                 if (min_usec > 0 && maxsock != -1)
2854                 {
2855                         int                     nsocks; /* return from select(2) */
2856
2857                         if (min_usec != INT64_MAX)
2858                         {
2859                                 struct timeval timeout;
2860
2861                                 timeout.tv_sec = min_usec / 1000000;
2862                                 timeout.tv_usec = min_usec % 1000000;
2863                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
2864                         }
2865                         else
2866                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
2867                         if (nsocks < 0)
2868                         {
2869                                 if (errno == EINTR)
2870                                         continue;
2871                                 /* must be something wrong */
2872                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
2873                                 goto done;
2874                         }
2875                 }
2876
2877                 /* ok, backend returns reply */
2878                 for (i = 0; i < nstate; i++)
2879                 {
2880                         CState     *st = &state[i];
2881                         Command   **commands = sql_files[st->use_file];
2882                         int                     prev_ecnt = st->ecnt;
2883
2884                         if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
2885                                                         || commands[st->state]->type == META_COMMAND))
2886                         {
2887                                 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
2888                                         remains--;      /* I've aborted */
2889                         }
2890
2891                         if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2892                         {
2893                                 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
2894                                 remains--;              /* I've aborted */
2895                                 PQfinish(st->con);
2896                                 st->con = NULL;
2897                         }
2898                 }
2899         }
2900
2901 done:
2902         INSTR_TIME_SET_CURRENT(start);
2903         disconnect_all(state, nstate);
2904         result->xacts = 0;
2905         for (i = 0; i < nstate; i++)
2906                 result->xacts += state[i].cnt;
2907         INSTR_TIME_SET_CURRENT(end);
2908         INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
2909         if (logfile)
2910                 fclose(logfile);
2911         return result;
2912 }
2913
2914 /*
2915  * Support for duration option: set timer_exceeded after so many seconds.
2916  */
2917
2918 #ifndef WIN32
2919
2920 static void
2921 handle_sig_alarm(SIGNAL_ARGS)
2922 {
2923         timer_exceeded = true;
2924 }
2925
2926 static void
2927 setalarm(int seconds)
2928 {
2929         pqsignal(SIGALRM, handle_sig_alarm);
2930         alarm(seconds);
2931 }
2932
2933 #ifndef ENABLE_THREAD_SAFETY
2934
2935 /*
2936  * implements pthread using fork.
2937  */
2938
2939 typedef struct fork_pthread
2940 {
2941         pid_t           pid;
2942         int                     pipes[2];
2943 }       fork_pthread;
2944
2945 static int
2946 pthread_create(pthread_t *thread,
2947                            pthread_attr_t *attr,
2948                            void *(*start_routine) (void *),
2949                            void *arg)
2950 {
2951         fork_pthread *th;
2952         void       *ret;
2953
2954         th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
2955         if (pipe(th->pipes) < 0)
2956         {
2957                 free(th);
2958                 return errno;
2959         }
2960
2961         th->pid = fork();
2962         if (th->pid == -1)                      /* error */
2963         {
2964                 free(th);
2965                 return errno;
2966         }
2967         if (th->pid != 0)                       /* in parent process */
2968         {
2969                 close(th->pipes[1]);
2970                 *thread = th;
2971                 return 0;
2972         }
2973
2974         /* in child process */
2975         close(th->pipes[0]);
2976
2977         /* set alarm again because the child does not inherit timers */
2978         if (duration > 0)
2979                 setalarm(duration);
2980
2981         ret = start_routine(arg);
2982         write(th->pipes[1], ret, sizeof(TResult));
2983         close(th->pipes[1]);
2984         free(th);
2985         exit(0);
2986 }
2987
2988 static int
2989 pthread_join(pthread_t th, void **thread_return)
2990 {
2991         int                     status;
2992
2993         while (waitpid(th->pid, &status, 0) != th->pid)
2994         {
2995                 if (errno != EINTR)
2996                         return errno;
2997         }
2998
2999         if (thread_return != NULL)
3000         {
3001                 /* assume result is TResult */
3002                 *thread_return = pg_malloc(sizeof(TResult));
3003                 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
3004                 {
3005                         free(*thread_return);
3006                         *thread_return = NULL;
3007                 }
3008         }
3009         close(th->pipes[0]);
3010
3011         free(th);
3012         return 0;
3013 }
3014 #endif
3015 #else                                                   /* WIN32 */
3016
3017 static VOID CALLBACK
3018 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3019 {
3020         timer_exceeded = true;
3021 }
3022
3023 static void
3024 setalarm(int seconds)
3025 {
3026         HANDLE          queue;
3027         HANDLE          timer;
3028
3029         /* This function will be called at most once, so we can cheat a bit. */
3030         queue = CreateTimerQueue();
3031         if (seconds > ((DWORD) -1) / 1000 ||
3032                 !CreateTimerQueueTimer(&timer, queue,
3033                                                            win32_timer_callback, NULL, seconds * 1000, 0,
3034                                                            WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3035         {
3036                 fprintf(stderr, "Failed to set timer\n");
3037                 exit(1);
3038         }
3039 }
3040
3041 /* partial pthread implementation for Windows */
3042
3043 typedef struct win32_pthread
3044 {
3045         HANDLE          handle;
3046         void       *(*routine) (void *);
3047         void       *arg;
3048         void       *result;
3049 } win32_pthread;
3050
3051 static unsigned __stdcall
3052 win32_pthread_run(void *arg)
3053 {
3054         win32_pthread *th = (win32_pthread *) arg;
3055
3056         th->result = th->routine(th->arg);
3057
3058         return 0;
3059 }
3060
3061 static int
3062 pthread_create(pthread_t *thread,
3063                            pthread_attr_t *attr,
3064                            void *(*start_routine) (void *),
3065                            void *arg)
3066 {
3067         int                     save_errno;
3068         win32_pthread *th;
3069
3070         th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3071         th->routine = start_routine;
3072         th->arg = arg;
3073         th->result = NULL;
3074
3075         th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3076         if (th->handle == NULL)
3077         {
3078                 save_errno = errno;
3079                 free(th);
3080                 return save_errno;
3081         }
3082
3083         *thread = th;
3084         return 0;
3085 }
3086
3087 static int
3088 pthread_join(pthread_t th, void **thread_return)
3089 {
3090         if (th == NULL || th->handle == NULL)
3091                 return errno = EINVAL;
3092
3093         if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3094         {
3095                 _dosmaperr(GetLastError());
3096                 return errno;
3097         }
3098
3099         if (thread_return)
3100                 *thread_return = th->result;
3101
3102         CloseHandle(th->handle);
3103         free(th);
3104         return 0;
3105 }
3106
3107 #endif   /* WIN32 */