]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
Allow total number of transactions in pgbench to exceed INT_MAX.
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * contrib/pgbench/pgbench.c
8  * Copyright (c) 2000-2014, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024                 /* set before winsock2.h is included */
32 #endif   /* ! WIN32 */
33
34 #include "postgres_fe.h"
35
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39
40 #include <ctype.h>
41 #include <math.h>
42 #include <signal.h>
43 #include <sys/time.h>
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
46 #endif
47
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h>               /* for getrlimit */
50 #endif
51
52 #ifndef INT64_MAX
53 #define INT64_MAX       INT64CONST(0x7FFFFFFFFFFFFFFF)
54 #endif
55
56 /*
57  * Multi-platform pthread implementations
58  */
59
60 #ifdef WIN32
61 /* Use native win32 threads on Windows */
62 typedef struct win32_pthread *pthread_t;
63 typedef int pthread_attr_t;
64
65 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
66 static int      pthread_join(pthread_t th, void **thread_return);
67 #elif defined(ENABLE_THREAD_SAFETY)
68 /* Use platform-dependent pthread capability */
69 #include <pthread.h>
70 #else
71 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
72 #define PTHREAD_FORK_EMULATION
73 #include <sys/wait.h>
74
75 #define pthread_t                               pg_pthread_t
76 #define pthread_attr_t                  pg_pthread_attr_t
77 #define pthread_create                  pg_pthread_create
78 #define pthread_join                    pg_pthread_join
79
80 typedef struct fork_pthread *pthread_t;
81 typedef int pthread_attr_t;
82
83 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
84 static int      pthread_join(pthread_t th, void **thread_return);
85 #endif
86
87
88 /********************************************************************
89  * some configurable parameters */
90
91 /* max number of clients allowed */
92 #ifdef FD_SETSIZE
93 #define MAXCLIENTS      (FD_SETSIZE - 10)
94 #else
95 #define MAXCLIENTS      1024
96 #endif
97
98 #define LOG_STEP_SECONDS        5       /* seconds between log messages */
99 #define DEFAULT_NXACTS  10              /* default nxacts */
100
101 int                     nxacts = 0;                     /* number of transactions per client */
102 int                     duration = 0;           /* duration in seconds */
103
104 /*
105  * scaling factor. for example, scale = 10 will make 1000000 tuples in
106  * pgbench_accounts table.
107  */
108 int                     scale = 1;
109
110 /*
111  * fillfactor. for example, fillfactor = 90 will use only 90 percent
112  * space during inserts and leave 10 percent free.
113  */
114 int                     fillfactor = 100;
115
116 /*
117  * create foreign key constraints on the tables?
118  */
119 int                     foreign_keys = 0;
120
121 /*
122  * use unlogged tables?
123  */
124 int                     unlogged_tables = 0;
125
126 /*
127  * log sampling rate (1.0 = log everything, 0.0 = option not given)
128  */
129 double          sample_rate = 0.0;
130
131 /*
132  * When threads are throttled to a given rate limit, this is the target delay
133  * to reach that rate in usec.  0 is the default and means no throttling.
134  */
135 int64           throttle_delay = 0;
136
137 /*
138  * tablespace selection
139  */
140 char       *tablespace = NULL;
141 char       *index_tablespace = NULL;
142
143 /*
144  * end of configurable parameters
145  *********************************************************************/
146
147 #define nbranches       1                       /* Makes little sense to change this.  Change
148                                                                  * -s instead */
149 #define ntellers        10
150 #define naccounts       100000
151
152 /*
153  * The scale factor at/beyond which 32bit integers are incapable of storing
154  * 64bit values.
155  *
156  * Although the actual threshold is 21474, we use 20000 because it is easier to
157  * document and remember, and isn't that far away from the real threshold.
158  */
159 #define SCALE_32BIT_THRESHOLD 20000
160
161 bool            use_log;                        /* log transaction latencies to a file */
162 bool            use_quiet;                      /* quiet logging onto stderr */
163 int                     agg_interval;           /* log aggregates instead of individual
164                                                                  * transactions */
165 int                     progress = 0;           /* thread progress report every this seconds */
166 int                     progress_nclients = 0;          /* number of clients for progress
167                                                                                  * report */
168 int                     progress_nthreads = 0;          /* number of threads for progress
169                                                                                  * report */
170 bool            is_connect;                     /* establish connection for each transaction */
171 bool            is_latencies;           /* report per-command latencies */
172 int                     main_pid;                       /* main process id used in log filename */
173
174 char       *pghost = "";
175 char       *pgport = "";
176 char       *login = NULL;
177 char       *dbName;
178 const char *progname;
179
180 volatile bool timer_exceeded = false;   /* flag from signal handler */
181
182 /* variable definitions */
183 typedef struct
184 {
185         char       *name;                       /* variable name */
186         char       *value;                      /* its value */
187 } Variable;
188
189 #define MAX_FILES               128             /* max number of SQL script files allowed */
190 #define SHELL_COMMAND_SIZE      256 /* maximum size allowed for shell command */
191
192 /*
193  * structures used in custom query mode
194  */
195
196 typedef struct
197 {
198         PGconn     *con;                        /* connection handle to DB */
199         int                     id;                             /* client No. */
200         int                     state;                  /* state No. */
201         int                     cnt;                    /* xacts count */
202         int                     ecnt;                   /* error count */
203         int                     listen;                 /* 0 indicates that an async query has been
204                                                                  * sent */
205         int                     sleeping;               /* 1 indicates that the client is napping */
206         bool            throttling;             /* whether nap is for throttling */
207         int64           until;                  /* napping until (usec) */
208         Variable   *variables;          /* array of variable definitions */
209         int                     nvariables;
210         instr_time      txn_begin;              /* used for measuring transaction latencies */
211         instr_time      stmt_begin;             /* used for measuring statement latencies */
212         int64           txn_latencies;  /* cumulated latencies */
213         int64           txn_sqlats;             /* cumulated square latencies */
214         bool            is_throttled;   /* whether transaction throttling is done */
215         int                     use_file;               /* index in sql_files for this client */
216         bool            prepared[MAX_FILES];
217 } CState;
218
219 /*
220  * Thread state and result
221  */
222 typedef struct
223 {
224         int                     tid;                    /* thread id */
225         pthread_t       thread;                 /* thread handle */
226         CState     *state;                      /* array of CState */
227         int                     nstate;                 /* length of state[] */
228         instr_time      start_time;             /* thread start time */
229         instr_time *exec_elapsed;       /* time spent executing cmds (per Command) */
230         int                *exec_count;         /* number of cmd executions (per Command) */
231         unsigned short random_state[3];         /* separate randomness for each thread */
232         int64           throttle_trigger;               /* previous/next throttling (us) */
233         int64           throttle_lag;   /* total transaction lag behind throttling */
234         int64           throttle_lag_max;               /* max transaction lag */
235 } TState;
236
237 #define INVALID_THREAD          ((pthread_t) 0)
238
239 typedef struct
240 {
241         instr_time      conn_time;
242         int64           xacts;
243         int64           latencies;
244         int64           sqlats;
245         int64           throttle_lag;
246         int64           throttle_lag_max;
247 } TResult;
248
249 /*
250  * queries read from files
251  */
252 #define SQL_COMMAND             1
253 #define META_COMMAND    2
254 #define MAX_ARGS                10
255
256 typedef enum QueryMode
257 {
258         QUERY_SIMPLE,                           /* simple query */
259         QUERY_EXTENDED,                         /* extended query */
260         QUERY_PREPARED,                         /* extended query with prepared statements */
261         NUM_QUERYMODE
262 } QueryMode;
263
264 static QueryMode querymode = QUERY_SIMPLE;
265 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
266
267 typedef struct
268 {
269         char       *line;                       /* full text of command line */
270         int                     command_num;    /* unique index of this Command struct */
271         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
272         int                     argc;                   /* number of command words */
273         char       *argv[MAX_ARGS]; /* command word list */
274 } Command;
275
276 typedef struct
277 {
278
279         long            start_time;             /* when does the interval start */
280         int                     cnt;                    /* number of transactions */
281         double          min_duration;   /* min/max durations */
282         double          max_duration;
283         double          sum;                    /* sum(duration), sum(duration^2) - for
284                                                                  * estimates */
285         double          sum2;
286
287 } AggVals;
288
289 static Command **sql_files[MAX_FILES];  /* SQL script files */
290 static int      num_files;                      /* number of script files */
291 static int      num_commands = 0;       /* total number of Command structs */
292 static int      debug = 0;                      /* debug flag */
293
294 /* default scenario */
295 static char *tpc_b = {
296         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
297         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
298         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
299         "\\setrandom aid 1 :naccounts\n"
300         "\\setrandom bid 1 :nbranches\n"
301         "\\setrandom tid 1 :ntellers\n"
302         "\\setrandom delta -5000 5000\n"
303         "BEGIN;\n"
304         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
305         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
306         "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
307         "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
308         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
309         "END;\n"
310 };
311
312 /* -N case */
313 static char *simple_update = {
314         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
315         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
316         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
317         "\\setrandom aid 1 :naccounts\n"
318         "\\setrandom bid 1 :nbranches\n"
319         "\\setrandom tid 1 :ntellers\n"
320         "\\setrandom delta -5000 5000\n"
321         "BEGIN;\n"
322         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
323         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
324         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
325         "END;\n"
326 };
327
328 /* -S case */
329 static char *select_only = {
330         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
331         "\\setrandom aid 1 :naccounts\n"
332         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
333 };
334
335 /* Function prototypes */
336 static void setalarm(int seconds);
337 static void *threadRun(void *arg);
338
339 static void
340 usage(void)
341 {
342         printf("%s is a benchmarking tool for PostgreSQL.\n\n"
343                    "Usage:\n"
344                    "  %s [OPTION]... [DBNAME]\n"
345                    "\nInitialization options:\n"
346                    "  -i, --initialize         invokes initialization mode\n"
347                    "  -F, --fillfactor=NUM     set fill factor\n"
348                 "  -n, --no-vacuum          do not run VACUUM after initialization\n"
349         "  -q, --quiet              quiet logging (one message each 5 seconds)\n"
350                    "  -s, --scale=NUM          scaling factor\n"
351                    "  --foreign-keys           create foreign key constraints between tables\n"
352                    "  --index-tablespace=TABLESPACE\n"
353         "                           create indexes in the specified tablespace\n"
354          "  --tablespace=TABLESPACE  create tables in the specified tablespace\n"
355                    "  --unlogged-tables        create tables as unlogged tables\n"
356                    "\nBenchmarking options:\n"
357                    "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
358                    "  -C, --connect            establish new connection for each transaction\n"
359                    "  -D, --define=VARNAME=VALUE\n"
360           "                           define variable for use by custom script\n"
361                  "  -f, --file=FILENAME      read transaction script from FILENAME\n"
362                    "  -j, --jobs=NUM           number of threads (default: 1)\n"
363                    "  -l, --log                write transaction times to log file\n"
364                    "  -M, --protocol=simple|extended|prepared\n"
365                    "                           protocol for submitting queries (default: simple)\n"
366                    "  -n, --no-vacuum          do not run VACUUM before tests\n"
367                    "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
368                    "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
369                    "  -r, --report-latencies   report average latency per command\n"
370                 "  -R, --rate=NUM           target rate in transactions per second\n"
371                    "  -s, --scale=NUM          report this scale factor in output\n"
372                    "  -S, --select-only        perform SELECT-only transactions\n"
373                    "  -t, --transactions=NUM   number of transactions each client runs (default: 10)\n"
374                  "  -T, --time=NUM           duration of benchmark test in seconds\n"
375                    "  -v, --vacuum-all         vacuum all four standard tables before tests\n"
376                    "  --aggregate-interval=NUM aggregate data over NUM seconds\n"
377                    "  --sampling-rate=NUM      fraction of transactions to log (e.g. 0.01 for 1%%)\n"
378                    "\nCommon options:\n"
379                    "  -d, --debug              print debugging output\n"
380           "  -h, --host=HOSTNAME      database server host or socket directory\n"
381                    "  -p, --port=PORT          database server port number\n"
382                    "  -U, --username=USERNAME  connect as specified database user\n"
383                  "  -V, --version            output version information, then exit\n"
384                    "  -?, --help               show this help, then exit\n"
385                    "\n"
386                    "Report bugs to <pgsql-bugs@postgresql.org>.\n",
387                    progname, progname);
388 }
389
390 /*
391  * strtoint64 -- convert a string to 64-bit integer
392  *
393  * This function is a modified version of scanint8() from
394  * src/backend/utils/adt/int8.c.
395  */
396 static int64
397 strtoint64(const char *str)
398 {
399         const char *ptr = str;
400         int64           result = 0;
401         int                     sign = 1;
402
403         /*
404          * Do our own scan, rather than relying on sscanf which might be broken
405          * for long long.
406          */
407
408         /* skip leading spaces */
409         while (*ptr && isspace((unsigned char) *ptr))
410                 ptr++;
411
412         /* handle sign */
413         if (*ptr == '-')
414         {
415                 ptr++;
416
417                 /*
418                  * Do an explicit check for INT64_MIN.  Ugly though this is, it's
419                  * cleaner than trying to get the loop below to handle it portably.
420                  */
421                 if (strncmp(ptr, "9223372036854775808", 19) == 0)
422                 {
423                         result = -INT64CONST(0x7fffffffffffffff) - 1;
424                         ptr += 19;
425                         goto gotdigits;
426                 }
427                 sign = -1;
428         }
429         else if (*ptr == '+')
430                 ptr++;
431
432         /* require at least one digit */
433         if (!isdigit((unsigned char) *ptr))
434                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
435
436         /* process digits */
437         while (*ptr && isdigit((unsigned char) *ptr))
438         {
439                 int64           tmp = result * 10 + (*ptr++ - '0');
440
441                 if ((tmp / 10) != result)               /* overflow? */
442                         fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
443                 result = tmp;
444         }
445
446 gotdigits:
447
448         /* allow trailing whitespace, but not other trailing chars */
449         while (*ptr != '\0' && isspace((unsigned char) *ptr))
450                 ptr++;
451
452         if (*ptr != '\0')
453                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
454
455         return ((sign < 0) ? -result : result);
456 }
457
458 /* random number generator: uniform distribution from min to max inclusive */
459 static int64
460 getrand(TState *thread, int64 min, int64 max)
461 {
462         /*
463          * Odd coding is so that min and max have approximately the same chance of
464          * being selected as do numbers between them.
465          *
466          * pg_erand48() is thread-safe and concurrent, which is why we use it
467          * rather than random(), which in glibc is non-reentrant, and therefore
468          * protected by a mutex, and therefore a bottleneck on machines with many
469          * CPUs.
470          */
471         return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
472 }
473
474 /* call PQexec() and exit() on failure */
475 static void
476 executeStatement(PGconn *con, const char *sql)
477 {
478         PGresult   *res;
479
480         res = PQexec(con, sql);
481         if (PQresultStatus(res) != PGRES_COMMAND_OK)
482         {
483                 fprintf(stderr, "%s", PQerrorMessage(con));
484                 exit(1);
485         }
486         PQclear(res);
487 }
488
489 /* set up a connection to the backend */
490 static PGconn *
491 doConnect(void)
492 {
493         PGconn     *conn;
494         static char *password = NULL;
495         bool            new_pass;
496
497         /*
498          * Start the connection.  Loop until we have a password if requested by
499          * backend.
500          */
501         do
502         {
503 #define PARAMS_ARRAY_SIZE       7
504
505                 const char *keywords[PARAMS_ARRAY_SIZE];
506                 const char *values[PARAMS_ARRAY_SIZE];
507
508                 keywords[0] = "host";
509                 values[0] = pghost;
510                 keywords[1] = "port";
511                 values[1] = pgport;
512                 keywords[2] = "user";
513                 values[2] = login;
514                 keywords[3] = "password";
515                 values[3] = password;
516                 keywords[4] = "dbname";
517                 values[4] = dbName;
518                 keywords[5] = "fallback_application_name";
519                 values[5] = progname;
520                 keywords[6] = NULL;
521                 values[6] = NULL;
522
523                 new_pass = false;
524
525                 conn = PQconnectdbParams(keywords, values, true);
526
527                 if (!conn)
528                 {
529                         fprintf(stderr, "Connection to database \"%s\" failed\n",
530                                         dbName);
531                         return NULL;
532                 }
533
534                 if (PQstatus(conn) == CONNECTION_BAD &&
535                         PQconnectionNeedsPassword(conn) &&
536                         password == NULL)
537                 {
538                         PQfinish(conn);
539                         password = simple_prompt("Password: ", 100, false);
540                         new_pass = true;
541                 }
542         } while (new_pass);
543
544         /* check to see that the backend connection was successfully made */
545         if (PQstatus(conn) == CONNECTION_BAD)
546         {
547                 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
548                                 dbName, PQerrorMessage(conn));
549                 PQfinish(conn);
550                 return NULL;
551         }
552
553         return conn;
554 }
555
556 /* throw away response from backend */
557 static void
558 discard_response(CState *state)
559 {
560         PGresult   *res;
561
562         do
563         {
564                 res = PQgetResult(state->con);
565                 if (res)
566                         PQclear(res);
567         } while (res);
568 }
569
570 static int
571 compareVariables(const void *v1, const void *v2)
572 {
573         return strcmp(((const Variable *) v1)->name,
574                                   ((const Variable *) v2)->name);
575 }
576
577 static char *
578 getVariable(CState *st, char *name)
579 {
580         Variable        key,
581                            *var;
582
583         /* On some versions of Solaris, bsearch of zero items dumps core */
584         if (st->nvariables <= 0)
585                 return NULL;
586
587         key.name = name;
588         var = (Variable *) bsearch((void *) &key,
589                                                            (void *) st->variables,
590                                                            st->nvariables,
591                                                            sizeof(Variable),
592                                                            compareVariables);
593         if (var != NULL)
594                 return var->value;
595         else
596                 return NULL;
597 }
598
599 /* check whether the name consists of alphabets, numerals and underscores. */
600 static bool
601 isLegalVariableName(const char *name)
602 {
603         int                     i;
604
605         for (i = 0; name[i] != '\0'; i++)
606         {
607                 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
608                         return false;
609         }
610
611         return true;
612 }
613
614 static int
615 putVariable(CState *st, const char *context, char *name, char *value)
616 {
617         Variable        key,
618                            *var;
619
620         key.name = name;
621         /* On some versions of Solaris, bsearch of zero items dumps core */
622         if (st->nvariables > 0)
623                 var = (Variable *) bsearch((void *) &key,
624                                                                    (void *) st->variables,
625                                                                    st->nvariables,
626                                                                    sizeof(Variable),
627                                                                    compareVariables);
628         else
629                 var = NULL;
630
631         if (var == NULL)
632         {
633                 Variable   *newvars;
634
635                 /*
636                  * Check for the name only when declaring a new variable to avoid
637                  * overhead.
638                  */
639                 if (!isLegalVariableName(name))
640                 {
641                         fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
642                         return false;
643                 }
644
645                 if (st->variables)
646                         newvars = (Variable *) pg_realloc(st->variables,
647                                                                         (st->nvariables + 1) * sizeof(Variable));
648                 else
649                         newvars = (Variable *) pg_malloc(sizeof(Variable));
650
651                 st->variables = newvars;
652
653                 var = &newvars[st->nvariables];
654
655                 var->name = pg_strdup(name);
656                 var->value = pg_strdup(value);
657
658                 st->nvariables++;
659
660                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
661                           compareVariables);
662         }
663         else
664         {
665                 char       *val;
666
667                 /* dup then free, in case value is pointing at this variable */
668                 val = pg_strdup(value);
669
670                 free(var->value);
671                 var->value = val;
672         }
673
674         return true;
675 }
676
677 static char *
678 parseVariable(const char *sql, int *eaten)
679 {
680         int                     i = 0;
681         char       *name;
682
683         do
684         {
685                 i++;
686         } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
687         if (i == 1)
688                 return NULL;
689
690         name = pg_malloc(i);
691         memcpy(name, &sql[1], i - 1);
692         name[i - 1] = '\0';
693
694         *eaten = i;
695         return name;
696 }
697
698 static char *
699 replaceVariable(char **sql, char *param, int len, char *value)
700 {
701         int                     valueln = strlen(value);
702
703         if (valueln > len)
704         {
705                 size_t          offset = param - *sql;
706
707                 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
708                 param = *sql + offset;
709         }
710
711         if (valueln != len)
712                 memmove(param + valueln, param + len, strlen(param + len) + 1);
713         strncpy(param, value, valueln);
714
715         return param + valueln;
716 }
717
718 static char *
719 assignVariables(CState *st, char *sql)
720 {
721         char       *p,
722                            *name,
723                            *val;
724
725         p = sql;
726         while ((p = strchr(p, ':')) != NULL)
727         {
728                 int                     eaten;
729
730                 name = parseVariable(p, &eaten);
731                 if (name == NULL)
732                 {
733                         while (*p == ':')
734                         {
735                                 p++;
736                         }
737                         continue;
738                 }
739
740                 val = getVariable(st, name);
741                 free(name);
742                 if (val == NULL)
743                 {
744                         p++;
745                         continue;
746                 }
747
748                 p = replaceVariable(&sql, p, eaten, val);
749         }
750
751         return sql;
752 }
753
754 static void
755 getQueryParams(CState *st, const Command *command, const char **params)
756 {
757         int                     i;
758
759         for (i = 0; i < command->argc - 1; i++)
760                 params[i] = getVariable(st, command->argv[i + 1]);
761 }
762
763 /*
764  * Run a shell command. The result is assigned to the variable if not NULL.
765  * Return true if succeeded, or false on error.
766  */
767 static bool
768 runShellCommand(CState *st, char *variable, char **argv, int argc)
769 {
770         char            command[SHELL_COMMAND_SIZE];
771         int                     i,
772                                 len = 0;
773         FILE       *fp;
774         char            res[64];
775         char       *endptr;
776         int                     retval;
777
778         /*----------
779          * Join arguments with whitespace separators. Arguments starting with
780          * exactly one colon are treated as variables:
781          *      name - append a string "name"
782          *      :var - append a variable named 'var'
783          *      ::name - append a string ":name"
784          *----------
785          */
786         for (i = 0; i < argc; i++)
787         {
788                 char       *arg;
789                 int                     arglen;
790
791                 if (argv[i][0] != ':')
792                 {
793                         arg = argv[i];          /* a string literal */
794                 }
795                 else if (argv[i][1] == ':')
796                 {
797                         arg = argv[i] + 1;      /* a string literal starting with colons */
798                 }
799                 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
800                 {
801                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
802                         return false;
803                 }
804
805                 arglen = strlen(arg);
806                 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
807                 {
808                         fprintf(stderr, "%s: too long shell command\n", argv[0]);
809                         return false;
810                 }
811
812                 if (i > 0)
813                         command[len++] = ' ';
814                 memcpy(command + len, arg, arglen);
815                 len += arglen;
816         }
817
818         command[len] = '\0';
819
820         /* Fast path for non-assignment case */
821         if (variable == NULL)
822         {
823                 if (system(command))
824                 {
825                         if (!timer_exceeded)
826                                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
827                         return false;
828                 }
829                 return true;
830         }
831
832         /* Execute the command with pipe and read the standard output. */
833         if ((fp = popen(command, "r")) == NULL)
834         {
835                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
836                 return false;
837         }
838         if (fgets(res, sizeof(res), fp) == NULL)
839         {
840                 if (!timer_exceeded)
841                         fprintf(stderr, "%s: cannot read the result\n", argv[0]);
842                 return false;
843         }
844         if (pclose(fp) < 0)
845         {
846                 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
847                 return false;
848         }
849
850         /* Check whether the result is an integer and assign it to the variable */
851         retval = (int) strtol(res, &endptr, 10);
852         while (*endptr != '\0' && isspace((unsigned char) *endptr))
853                 endptr++;
854         if (*res == '\0' || *endptr != '\0')
855         {
856                 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
857                 return false;
858         }
859         snprintf(res, sizeof(res), "%d", retval);
860         if (!putVariable(st, "setshell", variable, res))
861                 return false;
862
863 #ifdef DEBUG
864         printf("shell parameter name: %s, value: %s\n", argv[1], res);
865 #endif
866         return true;
867 }
868
869 #define MAX_PREPARE_NAME                32
870 static void
871 preparedStatementName(char *buffer, int file, int state)
872 {
873         sprintf(buffer, "P%d_%d", file, state);
874 }
875
876 static bool
877 clientDone(CState *st, bool ok)
878 {
879         (void) ok;                                      /* unused */
880
881         if (st->con != NULL)
882         {
883                 PQfinish(st->con);
884                 st->con = NULL;
885         }
886         return false;                           /* always false */
887 }
888
889 static
890 void
891 agg_vals_init(AggVals *aggs, instr_time start)
892 {
893         /* basic counters */
894         aggs->cnt = 0;                          /* number of transactions */
895         aggs->sum = 0;                          /* SUM(duration) */
896         aggs->sum2 = 0;                         /* SUM(duration*duration) */
897
898         /* min and max transaction duration */
899         aggs->min_duration = 0;
900         aggs->max_duration = 0;
901
902         /* start of the current interval */
903         aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
904 }
905
906 /* return false iff client should be disconnected */
907 static bool
908 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
909 {
910         PGresult   *res;
911         Command   **commands;
912         bool            trans_needs_throttle = false;
913
914 top:
915         commands = sql_files[st->use_file];
916
917         /*
918          * Handle throttling once per transaction by sleeping.  It is simpler to
919          * do this here rather than at the end, because so much complicated logic
920          * happens below when statements finish.
921          */
922         if (throttle_delay && !st->is_throttled)
923         {
924                 /*
925                  * Use inverse transform sampling to randomly generate a delay, such
926                  * that the series of delays will approximate a Poisson distribution
927                  * centered on the throttle_delay time.
928                  *
929                  * 10000 implies a 9.2 (-log(1/10000)) to 0.0 (log 1) delay
930                  * multiplier, and results in a 0.055 % target underestimation bias:
931                  *
932                  * SELECT 1.0/AVG(-LN(i/10000.0)) FROM generate_series(1,10000) AS i;
933                  * = 1.000552717032611116335474
934                  *
935                  * If transactions are too slow or a given wait is shorter than a
936                  * transaction, the next transaction will start right away.
937                  */
938                 int64           wait = (int64) (throttle_delay *
939                                   1.00055271703 * -log(getrand(thread, 1, 10000) / 10000.0));
940
941                 thread->throttle_trigger += wait;
942
943                 st->until = thread->throttle_trigger;
944                 st->sleeping = 1;
945                 st->throttling = true;
946                 st->is_throttled = true;
947                 if (debug)
948                         fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
949                                         st->id, wait);
950         }
951
952         if (st->sleeping)
953         {                                                       /* are we sleeping? */
954                 instr_time      now;
955                 int64           now_us;
956
957                 INSTR_TIME_SET_CURRENT(now);
958                 now_us = INSTR_TIME_GET_MICROSEC(now);
959                 if (st->until <= now_us)
960                 {
961                         st->sleeping = 0;       /* Done sleeping, go ahead with next command */
962                         if (st->throttling)
963                         {
964                                 /* Measure lag of throttled transaction relative to target */
965                                 int64           lag = now_us - st->until;
966
967                                 thread->throttle_lag += lag;
968                                 if (lag > thread->throttle_lag_max)
969                                         thread->throttle_lag_max = lag;
970                                 st->throttling = false;
971                         }
972                 }
973                 else
974                         return true;            /* Still sleeping, nothing to do here */
975         }
976
977         if (st->listen)
978         {                                                       /* are we receiver? */
979                 if (commands[st->state]->type == SQL_COMMAND)
980                 {
981                         if (debug)
982                                 fprintf(stderr, "client %d receiving\n", st->id);
983                         if (!PQconsumeInput(st->con))
984                         {                                       /* there's something wrong */
985                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
986                                 return clientDone(st, false);
987                         }
988                         if (PQisBusy(st->con))
989                                 return true;    /* don't have the whole result yet */
990                 }
991
992                 /*
993                  * command finished: accumulate per-command execution times in
994                  * thread-local data structure, if per-command latencies are requested
995                  */
996                 if (is_latencies)
997                 {
998                         instr_time      now;
999                         int                     cnum = commands[st->state]->command_num;
1000
1001                         INSTR_TIME_SET_CURRENT(now);
1002                         INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1003                                                                   now, st->stmt_begin);
1004                         thread->exec_count[cnum]++;
1005                 }
1006
1007                 /* transaction finished: record latency under progress or throttling */
1008                 if ((progress || throttle_delay) && commands[st->state + 1] == NULL)
1009                 {
1010                         instr_time      diff;
1011                         int64           latency;
1012
1013                         INSTR_TIME_SET_CURRENT(diff);
1014                         INSTR_TIME_SUBTRACT(diff, st->txn_begin);
1015                         latency = INSTR_TIME_GET_MICROSEC(diff);
1016                         st->txn_latencies += latency;
1017
1018                         /*
1019                          * XXX In a long benchmark run of high-latency transactions, this
1020                          * int64 addition eventually overflows.  For example, 100 threads
1021                          * running 10s transactions will overflow it in 2.56 hours.  With
1022                          * a more-typical OLTP workload of .1s transactions, overflow
1023                          * would take 256 hours.
1024                          */
1025                         st->txn_sqlats += latency * latency;
1026                 }
1027
1028                 /*
1029                  * if transaction finished, record the time it took in the log
1030                  */
1031                 if (logfile && commands[st->state + 1] == NULL)
1032                 {
1033                         instr_time      now;
1034                         instr_time      diff;
1035                         double          usec;
1036
1037                         /*
1038                          * write the log entry if this row belongs to the random sample,
1039                          * or no sampling rate was given which means log everything.
1040                          */
1041                         if (sample_rate == 0.0 ||
1042                                 pg_erand48(thread->random_state) <= sample_rate)
1043                         {
1044                                 INSTR_TIME_SET_CURRENT(now);
1045                                 diff = now;
1046                                 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
1047                                 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
1048
1049                                 /* should we aggregate the results or not? */
1050                                 if (agg_interval > 0)
1051                                 {
1052                                         /*
1053                                          * are we still in the same interval? if yes, accumulate
1054                                          * the values (print them otherwise)
1055                                          */
1056                                         if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
1057                                         {
1058                                                 agg->cnt += 1;
1059                                                 agg->sum += usec;
1060                                                 agg->sum2 += usec * usec;
1061
1062                                                 /* first in this aggregation interval */
1063                                                 if ((agg->cnt == 1) || (usec < agg->min_duration))
1064                                                         agg->min_duration = usec;
1065
1066                                                 if ((agg->cnt == 1) || (usec > agg->max_duration))
1067                                                         agg->max_duration = usec;
1068                                         }
1069                                         else
1070                                         {
1071                                                 /*
1072                                                  * Loop until we reach the interval of the current
1073                                                  * transaction (and print all the empty intervals in
1074                                                  * between).
1075                                                  */
1076                                                 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
1077                                                 {
1078                                                         /*
1079                                                          * This is a non-Windows branch (thanks to the
1080                                                          * ifdef in usage), so we don't need to handle
1081                                                          * this in a special way (see below).
1082                                                          */
1083                                                         fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
1084                                                                         agg->start_time,
1085                                                                         agg->cnt,
1086                                                                         agg->sum,
1087                                                                         agg->sum2,
1088                                                                         agg->min_duration,
1089                                                                         agg->max_duration);
1090
1091                                                         /* move to the next inteval */
1092                                                         agg->start_time = agg->start_time + agg_interval;
1093
1094                                                         /* reset for "no transaction" intervals */
1095                                                         agg->cnt = 0;
1096                                                         agg->min_duration = 0;
1097                                                         agg->max_duration = 0;
1098                                                         agg->sum = 0;
1099                                                         agg->sum2 = 0;
1100                                                 }
1101
1102                                                 /*
1103                                                  * and now update the reset values (include the
1104                                                  * current)
1105                                                  */
1106                                                 agg->cnt = 1;
1107                                                 agg->min_duration = usec;
1108                                                 agg->max_duration = usec;
1109                                                 agg->sum = usec;
1110                                                 agg->sum2 = usec * usec;
1111                                         }
1112                                 }
1113                                 else
1114                                 {
1115                                         /* no, print raw transactions */
1116 #ifndef WIN32
1117
1118                                         /*
1119                                          * This is more than we really ought to know about
1120                                          * instr_time
1121                                          */
1122                                         fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
1123                                                         st->id, st->cnt, usec, st->use_file,
1124                                                         (long) now.tv_sec, (long) now.tv_usec);
1125 #else
1126
1127                                         /*
1128                                          * On Windows, instr_time doesn't provide a timestamp
1129                                          * anyway
1130                                          */
1131                                         fprintf(logfile, "%d %d %.0f %d 0 0\n",
1132                                                         st->id, st->cnt, usec, st->use_file);
1133 #endif
1134                                 }
1135                         }
1136                 }
1137
1138                 if (commands[st->state]->type == SQL_COMMAND)
1139                 {
1140                         /*
1141                          * Read and discard the query result; note this is not included in
1142                          * the statement latency numbers.
1143                          */
1144                         res = PQgetResult(st->con);
1145                         switch (PQresultStatus(res))
1146                         {
1147                                 case PGRES_COMMAND_OK:
1148                                 case PGRES_TUPLES_OK:
1149                                         break;          /* OK */
1150                                 default:
1151                                         fprintf(stderr, "Client %d aborted in state %d: %s",
1152                                                         st->id, st->state, PQerrorMessage(st->con));
1153                                         PQclear(res);
1154                                         return clientDone(st, false);
1155                         }
1156                         PQclear(res);
1157                         discard_response(st);
1158                 }
1159
1160                 if (commands[st->state + 1] == NULL)
1161                 {
1162                         if (is_connect)
1163                         {
1164                                 PQfinish(st->con);
1165                                 st->con = NULL;
1166                         }
1167
1168                         ++st->cnt;
1169                         if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1170                                 return clientDone(st, true);    /* exit success */
1171                 }
1172
1173                 /* increment state counter */
1174                 st->state++;
1175                 if (commands[st->state] == NULL)
1176                 {
1177                         st->state = 0;
1178                         st->use_file = (int) getrand(thread, 0, num_files - 1);
1179                         commands = sql_files[st->use_file];
1180                         st->is_throttled = false;
1181
1182                         /*
1183                          * No transaction is underway anymore, which means there is
1184                          * nothing to listen to right now.  When throttling rate limits
1185                          * are active, a sleep will happen next, as the next transaction
1186                          * starts.  And then in any case the next SQL command will set
1187                          * listen back to 1.
1188                          */
1189                         st->listen = 0;
1190                         trans_needs_throttle = (throttle_delay > 0);
1191                 }
1192         }
1193
1194         if (st->con == NULL)
1195         {
1196                 instr_time      start,
1197                                         end;
1198
1199                 INSTR_TIME_SET_CURRENT(start);
1200                 if ((st->con = doConnect()) == NULL)
1201                 {
1202                         fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1203                         return clientDone(st, false);
1204                 }
1205                 INSTR_TIME_SET_CURRENT(end);
1206                 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1207         }
1208
1209         /*
1210          * This ensures that a throttling delay is inserted before proceeding with
1211          * sql commands, after the first transaction. The first transaction
1212          * throttling is performed when first entering doCustom.
1213          */
1214         if (trans_needs_throttle)
1215         {
1216                 trans_needs_throttle = false;
1217                 goto top;
1218         }
1219
1220         /* Record transaction start time under logging, progress or throttling */
1221         if ((logfile || progress || throttle_delay) && st->state == 0)
1222                 INSTR_TIME_SET_CURRENT(st->txn_begin);
1223
1224         /* Record statement start time if per-command latencies are requested */
1225         if (is_latencies)
1226                 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1227
1228         if (commands[st->state]->type == SQL_COMMAND)
1229         {
1230                 const Command *command = commands[st->state];
1231                 int                     r;
1232
1233                 if (querymode == QUERY_SIMPLE)
1234                 {
1235                         char       *sql;
1236
1237                         sql = pg_strdup(command->argv[0]);
1238                         sql = assignVariables(st, sql);
1239
1240                         if (debug)
1241                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1242                         r = PQsendQuery(st->con, sql);
1243                         free(sql);
1244                 }
1245                 else if (querymode == QUERY_EXTENDED)
1246                 {
1247                         const char *sql = command->argv[0];
1248                         const char *params[MAX_ARGS];
1249
1250                         getQueryParams(st, command, params);
1251
1252                         if (debug)
1253                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1254                         r = PQsendQueryParams(st->con, sql, command->argc - 1,
1255                                                                   NULL, params, NULL, NULL, 0);
1256                 }
1257                 else if (querymode == QUERY_PREPARED)
1258                 {
1259                         char            name[MAX_PREPARE_NAME];
1260                         const char *params[MAX_ARGS];
1261
1262                         if (!st->prepared[st->use_file])
1263                         {
1264                                 int                     j;
1265
1266                                 for (j = 0; commands[j] != NULL; j++)
1267                                 {
1268                                         PGresult   *res;
1269                                         char            name[MAX_PREPARE_NAME];
1270
1271                                         if (commands[j]->type != SQL_COMMAND)
1272                                                 continue;
1273                                         preparedStatementName(name, st->use_file, j);
1274                                         res = PQprepare(st->con, name,
1275                                                   commands[j]->argv[0], commands[j]->argc - 1, NULL);
1276                                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1277                                                 fprintf(stderr, "%s", PQerrorMessage(st->con));
1278                                         PQclear(res);
1279                                 }
1280                                 st->prepared[st->use_file] = true;
1281                         }
1282
1283                         getQueryParams(st, command, params);
1284                         preparedStatementName(name, st->use_file, st->state);
1285
1286                         if (debug)
1287                                 fprintf(stderr, "client %d sending %s\n", st->id, name);
1288                         r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1289                                                                         params, NULL, NULL, 0);
1290                 }
1291                 else    /* unknown sql mode */
1292                         r = 0;
1293
1294                 if (r == 0)
1295                 {
1296                         if (debug)
1297                                 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1298                         st->ecnt++;
1299                 }
1300                 else
1301                         st->listen = 1;         /* flags that should be listened */
1302         }
1303         else if (commands[st->state]->type == META_COMMAND)
1304         {
1305                 int                     argc = commands[st->state]->argc,
1306                                         i;
1307                 char      **argv = commands[st->state]->argv;
1308
1309                 if (debug)
1310                 {
1311                         fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1312                         for (i = 1; i < argc; i++)
1313                                 fprintf(stderr, " %s", argv[i]);
1314                         fprintf(stderr, "\n");
1315                 }
1316
1317                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1318                 {
1319                         char       *var;
1320                         int64           min,
1321                                                 max;
1322                         char            res[64];
1323
1324                         if (*argv[2] == ':')
1325                         {
1326                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1327                                 {
1328                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1329                                         st->ecnt++;
1330                                         return true;
1331                                 }
1332                                 min = strtoint64(var);
1333                         }
1334                         else
1335                                 min = strtoint64(argv[2]);
1336
1337 #ifdef NOT_USED
1338                         if (min < 0)
1339                         {
1340                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1341                                 st->ecnt++;
1342                                 return;
1343                         }
1344 #endif
1345
1346                         if (*argv[3] == ':')
1347                         {
1348                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1349                                 {
1350                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1351                                         st->ecnt++;
1352                                         return true;
1353                                 }
1354                                 max = strtoint64(var);
1355                         }
1356                         else
1357                                 max = strtoint64(argv[3]);
1358
1359                         if (max < min)
1360                         {
1361                                 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1362                                 st->ecnt++;
1363                                 return true;
1364                         }
1365
1366                         /*
1367                          * getrand() needs to be able to subtract max from min and add one
1368                          * to the result without overflowing.  Since we know max > min, we
1369                          * can detect overflow just by checking for a negative result. But
1370                          * we must check both that the subtraction doesn't overflow, and
1371                          * that adding one to the result doesn't overflow either.
1372                          */
1373                         if (max - min < 0 || (max - min) + 1 < 0)
1374                         {
1375                                 fprintf(stderr, "%s: range too large\n", argv[0]);
1376                                 st->ecnt++;
1377                                 return true;
1378                         }
1379
1380 #ifdef DEBUG
1381                         printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1382 #endif
1383                         snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1384
1385                         if (!putVariable(st, argv[0], argv[1], res))
1386                         {
1387                                 st->ecnt++;
1388                                 return true;
1389                         }
1390
1391                         st->listen = 1;
1392                 }
1393                 else if (pg_strcasecmp(argv[0], "set") == 0)
1394                 {
1395                         char       *var;
1396                         int64           ope1,
1397                                                 ope2;
1398                         char            res[64];
1399
1400                         if (*argv[2] == ':')
1401                         {
1402                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1403                                 {
1404                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1405                                         st->ecnt++;
1406                                         return true;
1407                                 }
1408                                 ope1 = strtoint64(var);
1409                         }
1410                         else
1411                                 ope1 = strtoint64(argv[2]);
1412
1413                         if (argc < 5)
1414                                 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
1415                         else
1416                         {
1417                                 if (*argv[4] == ':')
1418                                 {
1419                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
1420                                         {
1421                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
1422                                                 st->ecnt++;
1423                                                 return true;
1424                                         }
1425                                         ope2 = strtoint64(var);
1426                                 }
1427                                 else
1428                                         ope2 = strtoint64(argv[4]);
1429
1430                                 if (strcmp(argv[3], "+") == 0)
1431                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
1432                                 else if (strcmp(argv[3], "-") == 0)
1433                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
1434                                 else if (strcmp(argv[3], "*") == 0)
1435                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
1436                                 else if (strcmp(argv[3], "/") == 0)
1437                                 {
1438                                         if (ope2 == 0)
1439                                         {
1440                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
1441                                                 st->ecnt++;
1442                                                 return true;
1443                                         }
1444                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
1445                                 }
1446                                 else
1447                                 {
1448                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
1449                                         st->ecnt++;
1450                                         return true;
1451                                 }
1452                         }
1453
1454                         if (!putVariable(st, argv[0], argv[1], res))
1455                         {
1456                                 st->ecnt++;
1457                                 return true;
1458                         }
1459
1460                         st->listen = 1;
1461                 }
1462                 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1463                 {
1464                         char       *var;
1465                         int                     usec;
1466                         instr_time      now;
1467
1468                         if (*argv[1] == ':')
1469                         {
1470                                 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1471                                 {
1472                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1473                                         st->ecnt++;
1474                                         return true;
1475                                 }
1476                                 usec = atoi(var);
1477                         }
1478                         else
1479                                 usec = atoi(argv[1]);
1480
1481                         if (argc > 2)
1482                         {
1483                                 if (pg_strcasecmp(argv[2], "ms") == 0)
1484                                         usec *= 1000;
1485                                 else if (pg_strcasecmp(argv[2], "s") == 0)
1486                                         usec *= 1000000;
1487                         }
1488                         else
1489                                 usec *= 1000000;
1490
1491                         INSTR_TIME_SET_CURRENT(now);
1492                         st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
1493                         st->sleeping = 1;
1494
1495                         st->listen = 1;
1496                 }
1497                 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1498                 {
1499                         bool            ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1500
1501                         if (timer_exceeded) /* timeout */
1502                                 return clientDone(st, true);
1503                         else if (!ret)          /* on error */
1504                         {
1505                                 st->ecnt++;
1506                                 return true;
1507                         }
1508                         else    /* succeeded */
1509                                 st->listen = 1;
1510                 }
1511                 else if (pg_strcasecmp(argv[0], "shell") == 0)
1512                 {
1513                         bool            ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1514
1515                         if (timer_exceeded) /* timeout */
1516                                 return clientDone(st, true);
1517                         else if (!ret)          /* on error */
1518                         {
1519                                 st->ecnt++;
1520                                 return true;
1521                         }
1522                         else    /* succeeded */
1523                                 st->listen = 1;
1524                 }
1525                 goto top;
1526         }
1527
1528         return true;
1529 }
1530
1531 /* discard connections */
1532 static void
1533 disconnect_all(CState *state, int length)
1534 {
1535         int                     i;
1536
1537         for (i = 0; i < length; i++)
1538         {
1539                 if (state[i].con)
1540                 {
1541                         PQfinish(state[i].con);
1542                         state[i].con = NULL;
1543                 }
1544         }
1545 }
1546
1547 /* create tables and setup data */
1548 static void
1549 init(bool is_no_vacuum)
1550 {
1551 /*
1552  * The scale factor at/beyond which 32-bit integers are insufficient for
1553  * storing TPC-B account IDs.
1554  *
1555  * Although the actual threshold is 21474, we use 20000 because it is easier to
1556  * document and remember, and isn't that far away from the real threshold.
1557  */
1558 #define SCALE_32BIT_THRESHOLD 20000
1559
1560         /*
1561          * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1562          * fields in these table declarations were intended to comply with that.
1563          * The pgbench_accounts table complies with that because the "filler"
1564          * column is set to blank-padded empty string. But for all other tables
1565          * the columns default to NULL and so don't actually take any space.  We
1566          * could fix that by giving them non-null default values.  However, that
1567          * would completely break comparability of pgbench results with prior
1568          * versions. Since pgbench has never pretended to be fully TPC-B compliant
1569          * anyway, we stick with the historical behavior.
1570          */
1571         struct ddlinfo
1572         {
1573                 const char *table;              /* table name */
1574                 const char *smcols;             /* column decls if accountIDs are 32 bits */
1575                 const char *bigcols;    /* column decls if accountIDs are 64 bits */
1576                 int                     declare_fillfactor;
1577         };
1578         static const struct ddlinfo DDLs[] = {
1579                 {
1580                         "pgbench_history",
1581                         "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
1582                         "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1583                         0
1584                 },
1585                 {
1586                         "pgbench_tellers",
1587                         "tid int not null,bid int,tbalance int,filler char(84)",
1588                         "tid int not null,bid int,tbalance int,filler char(84)",
1589                         1
1590                 },
1591                 {
1592                         "pgbench_accounts",
1593                         "aid    int not null,bid int,abalance int,filler char(84)",
1594                         "aid bigint not null,bid int,abalance int,filler char(84)",
1595                         1
1596                 },
1597                 {
1598                         "pgbench_branches",
1599                         "bid int not null,bbalance int,filler char(88)",
1600                         "bid int not null,bbalance int,filler char(88)",
1601                         1
1602                 }
1603         };
1604         static const char *const DDLINDEXes[] = {
1605                 "alter table pgbench_branches add primary key (bid)",
1606                 "alter table pgbench_tellers add primary key (tid)",
1607                 "alter table pgbench_accounts add primary key (aid)"
1608         };
1609         static const char *const DDLKEYs[] = {
1610                 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1611                 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1612                 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1613                 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1614                 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1615         };
1616
1617         PGconn     *con;
1618         PGresult   *res;
1619         char            sql[256];
1620         int                     i;
1621         int64           k;
1622
1623         /* used to track elapsed time and estimate of the remaining time */
1624         instr_time      start,
1625                                 diff;
1626         double          elapsed_sec,
1627                                 remaining_sec;
1628         int                     log_interval = 1;
1629
1630         if ((con = doConnect()) == NULL)
1631                 exit(1);
1632
1633         for (i = 0; i < lengthof(DDLs); i++)
1634         {
1635                 char            opts[256];
1636                 char            buffer[256];
1637                 const struct ddlinfo *ddl = &DDLs[i];
1638                 const char *cols;
1639
1640                 /* Remove old table, if it exists. */
1641                 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1642                 executeStatement(con, buffer);
1643
1644                 /* Construct new create table statement. */
1645                 opts[0] = '\0';
1646                 if (ddl->declare_fillfactor)
1647                         snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1648                                          " with (fillfactor=%d)", fillfactor);
1649                 if (tablespace != NULL)
1650                 {
1651                         char       *escape_tablespace;
1652
1653                         escape_tablespace = PQescapeIdentifier(con, tablespace,
1654                                                                                                    strlen(tablespace));
1655                         snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1656                                          " tablespace %s", escape_tablespace);
1657                         PQfreemem(escape_tablespace);
1658                 }
1659
1660                 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1661
1662                 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1663                                  unlogged_tables ? " unlogged" : "",
1664                                  ddl->table, cols, opts);
1665
1666                 executeStatement(con, buffer);
1667         }
1668
1669         executeStatement(con, "begin");
1670
1671         for (i = 0; i < nbranches * scale; i++)
1672         {
1673                 /* "filler" column defaults to NULL */
1674                 snprintf(sql, sizeof(sql),
1675                                  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
1676                                  i + 1);
1677                 executeStatement(con, sql);
1678         }
1679
1680         for (i = 0; i < ntellers * scale; i++)
1681         {
1682                 /* "filler" column defaults to NULL */
1683                 snprintf(sql, sizeof(sql),
1684                         "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
1685                                  i + 1, i / ntellers + 1);
1686                 executeStatement(con, sql);
1687         }
1688
1689         executeStatement(con, "commit");
1690
1691         /*
1692          * fill the pgbench_accounts table with some data
1693          */
1694         fprintf(stderr, "creating tables...\n");
1695
1696         executeStatement(con, "begin");
1697         executeStatement(con, "truncate pgbench_accounts");
1698
1699         res = PQexec(con, "copy pgbench_accounts from stdin");
1700         if (PQresultStatus(res) != PGRES_COPY_IN)
1701         {
1702                 fprintf(stderr, "%s", PQerrorMessage(con));
1703                 exit(1);
1704         }
1705         PQclear(res);
1706
1707         INSTR_TIME_SET_CURRENT(start);
1708
1709         for (k = 0; k < (int64) naccounts * scale; k++)
1710         {
1711                 int64           j = k + 1;
1712
1713                 /* "filler" column defaults to blank padded empty string */
1714                 snprintf(sql, sizeof(sql),
1715                                  INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
1716                                  j, k / naccounts + 1, 0);
1717                 if (PQputline(con, sql))
1718                 {
1719                         fprintf(stderr, "PQputline failed\n");
1720                         exit(1);
1721                 }
1722
1723                 /*
1724                  * If we want to stick with the original logging, print a message each
1725                  * 100k inserted rows.
1726                  */
1727                 if ((!use_quiet) && (j % 100000 == 0))
1728                 {
1729                         INSTR_TIME_SET_CURRENT(diff);
1730                         INSTR_TIME_SUBTRACT(diff, start);
1731
1732                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1733                         remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
1734
1735                         fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1736                                         j, (int64) naccounts * scale,
1737                                         (int) (((int64) j * 100) / (naccounts * (int64) scale)),
1738                                         elapsed_sec, remaining_sec);
1739                 }
1740                 /* let's not call the timing for each row, but only each 100 rows */
1741                 else if (use_quiet && (j % 100 == 0))
1742                 {
1743                         INSTR_TIME_SET_CURRENT(diff);
1744                         INSTR_TIME_SUBTRACT(diff, start);
1745
1746                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1747                         remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
1748
1749                         /* have we reached the next interval (or end)? */
1750                         if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
1751                         {
1752                                 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1753                                                 j, (int64) naccounts * scale,
1754                                                 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
1755
1756                                 /* skip to the next interval */
1757                                 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
1758                         }
1759                 }
1760
1761         }
1762         if (PQputline(con, "\\.\n"))
1763         {
1764                 fprintf(stderr, "very last PQputline failed\n");
1765                 exit(1);
1766         }
1767         if (PQendcopy(con))
1768         {
1769                 fprintf(stderr, "PQendcopy failed\n");
1770                 exit(1);
1771         }
1772         executeStatement(con, "commit");
1773
1774         /* vacuum */
1775         if (!is_no_vacuum)
1776         {
1777                 fprintf(stderr, "vacuum...\n");
1778                 executeStatement(con, "vacuum analyze pgbench_branches");
1779                 executeStatement(con, "vacuum analyze pgbench_tellers");
1780                 executeStatement(con, "vacuum analyze pgbench_accounts");
1781                 executeStatement(con, "vacuum analyze pgbench_history");
1782         }
1783
1784         /*
1785          * create indexes
1786          */
1787         fprintf(stderr, "set primary keys...\n");
1788         for (i = 0; i < lengthof(DDLINDEXes); i++)
1789         {
1790                 char            buffer[256];
1791
1792                 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
1793
1794                 if (index_tablespace != NULL)
1795                 {
1796                         char       *escape_tablespace;
1797
1798                         escape_tablespace = PQescapeIdentifier(con, index_tablespace,
1799                                                                                                    strlen(index_tablespace));
1800                         snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
1801                                          " using index tablespace %s", escape_tablespace);
1802                         PQfreemem(escape_tablespace);
1803                 }
1804
1805                 executeStatement(con, buffer);
1806         }
1807
1808         /*
1809          * create foreign keys
1810          */
1811         if (foreign_keys)
1812         {
1813                 fprintf(stderr, "set foreign keys...\n");
1814                 for (i = 0; i < lengthof(DDLKEYs); i++)
1815                 {
1816                         executeStatement(con, DDLKEYs[i]);
1817                 }
1818         }
1819
1820         fprintf(stderr, "done.\n");
1821         PQfinish(con);
1822 }
1823
1824 /*
1825  * Parse the raw sql and replace :param to $n.
1826  */
1827 static bool
1828 parseQuery(Command *cmd, const char *raw_sql)
1829 {
1830         char       *sql,
1831                            *p;
1832
1833         sql = pg_strdup(raw_sql);
1834         cmd->argc = 1;
1835
1836         p = sql;
1837         while ((p = strchr(p, ':')) != NULL)
1838         {
1839                 char            var[12];
1840                 char       *name;
1841                 int                     eaten;
1842
1843                 name = parseVariable(p, &eaten);
1844                 if (name == NULL)
1845                 {
1846                         while (*p == ':')
1847                         {
1848                                 p++;
1849                         }
1850                         continue;
1851                 }
1852
1853                 if (cmd->argc >= MAX_ARGS)
1854                 {
1855                         fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
1856                         return false;
1857                 }
1858
1859                 sprintf(var, "$%d", cmd->argc);
1860                 p = replaceVariable(&sql, p, eaten, var);
1861
1862                 cmd->argv[cmd->argc] = name;
1863                 cmd->argc++;
1864         }
1865
1866         cmd->argv[0] = sql;
1867         return true;
1868 }
1869
1870 /* Parse a command; return a Command struct, or NULL if it's a comment */
1871 static Command *
1872 process_commands(char *buf)
1873 {
1874         const char      delim[] = " \f\n\r\t\v";
1875
1876         Command    *my_commands;
1877         int                     j;
1878         char       *p,
1879                            *tok;
1880
1881         /* Make the string buf end at the next newline */
1882         if ((p = strchr(buf, '\n')) != NULL)
1883                 *p = '\0';
1884
1885         /* Skip leading whitespace */
1886         p = buf;
1887         while (isspace((unsigned char) *p))
1888                 p++;
1889
1890         /* If the line is empty or actually a comment, we're done */
1891         if (*p == '\0' || strncmp(p, "--", 2) == 0)
1892                 return NULL;
1893
1894         /* Allocate and initialize Command structure */
1895         my_commands = (Command *) pg_malloc(sizeof(Command));
1896         my_commands->line = pg_strdup(buf);
1897         my_commands->command_num = num_commands++;
1898         my_commands->type = 0;          /* until set */
1899         my_commands->argc = 0;
1900
1901         if (*p == '\\')
1902         {
1903                 my_commands->type = META_COMMAND;
1904
1905                 j = 0;
1906                 tok = strtok(++p, delim);
1907
1908                 while (tok != NULL)
1909                 {
1910                         my_commands->argv[j++] = pg_strdup(tok);
1911                         my_commands->argc++;
1912                         tok = strtok(NULL, delim);
1913                 }
1914
1915                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
1916                 {
1917                         if (my_commands->argc < 4)
1918                         {
1919                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1920                                 exit(1);
1921                         }
1922
1923                         for (j = 4; j < my_commands->argc; j++)
1924                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1925                                                 my_commands->argv[0], my_commands->argv[j]);
1926                 }
1927                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
1928                 {
1929                         if (my_commands->argc < 3)
1930                         {
1931                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1932                                 exit(1);
1933                         }
1934
1935                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
1936                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1937                                                 my_commands->argv[0], my_commands->argv[j]);
1938                 }
1939                 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
1940                 {
1941                         if (my_commands->argc < 2)
1942                         {
1943                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1944                                 exit(1);
1945                         }
1946
1947                         /*
1948                          * Split argument into number and unit to allow "sleep 1ms" etc.
1949                          * We don't have to terminate the number argument with null
1950                          * because it will be parsed with atoi, which ignores trailing
1951                          * non-digit characters.
1952                          */
1953                         if (my_commands->argv[1][0] != ':')
1954                         {
1955                                 char       *c = my_commands->argv[1];
1956
1957                                 while (isdigit((unsigned char) *c))
1958                                         c++;
1959                                 if (*c)
1960                                 {
1961                                         my_commands->argv[2] = c;
1962                                         if (my_commands->argc < 3)
1963                                                 my_commands->argc = 3;
1964                                 }
1965                         }
1966
1967                         if (my_commands->argc >= 3)
1968                         {
1969                                 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
1970                                         pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
1971                                         pg_strcasecmp(my_commands->argv[2], "s") != 0)
1972                                 {
1973                                         fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
1974                                                         my_commands->argv[0], my_commands->argv[2]);
1975                                         exit(1);
1976                                 }
1977                         }
1978
1979                         for (j = 3; j < my_commands->argc; j++)
1980                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1981                                                 my_commands->argv[0], my_commands->argv[j]);
1982                 }
1983                 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
1984                 {
1985                         if (my_commands->argc < 3)
1986                         {
1987                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1988                                 exit(1);
1989                         }
1990                 }
1991                 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
1992                 {
1993                         if (my_commands->argc < 1)
1994                         {
1995                                 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
1996                                 exit(1);
1997                         }
1998                 }
1999                 else
2000                 {
2001                         fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
2002                         exit(1);
2003                 }
2004         }
2005         else
2006         {
2007                 my_commands->type = SQL_COMMAND;
2008
2009                 switch (querymode)
2010                 {
2011                         case QUERY_SIMPLE:
2012                                 my_commands->argv[0] = pg_strdup(p);
2013                                 my_commands->argc++;
2014                                 break;
2015                         case QUERY_EXTENDED:
2016                         case QUERY_PREPARED:
2017                                 if (!parseQuery(my_commands, p))
2018                                         exit(1);
2019                                 break;
2020                         default:
2021                                 exit(1);
2022                 }
2023         }
2024
2025         return my_commands;
2026 }
2027
2028 /*
2029  * Read a line from fd, and return it in a malloc'd buffer.
2030  * Return NULL at EOF.
2031  *
2032  * The buffer will typically be larger than necessary, but we don't care
2033  * in this program, because we'll free it as soon as we've parsed the line.
2034  */
2035 static char *
2036 read_line_from_file(FILE *fd)
2037 {
2038         char            tmpbuf[BUFSIZ];
2039         char       *buf;
2040         size_t          buflen = BUFSIZ;
2041         size_t          used = 0;
2042
2043         buf = (char *) palloc(buflen);
2044         buf[0] = '\0';
2045
2046         while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2047         {
2048                 size_t          thislen = strlen(tmpbuf);
2049
2050                 /* Append tmpbuf to whatever we had already */
2051                 memcpy(buf + used, tmpbuf, thislen + 1);
2052                 used += thislen;
2053
2054                 /* Done if we collected a newline */
2055                 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2056                         break;
2057
2058                 /* Else, enlarge buf to ensure we can append next bufferload */
2059                 buflen += BUFSIZ;
2060                 buf = (char *) pg_realloc(buf, buflen);
2061         }
2062
2063         if (used > 0)
2064                 return buf;
2065
2066         /* Reached EOF */
2067         free(buf);
2068         return NULL;
2069 }
2070
2071 static int
2072 process_file(char *filename)
2073 {
2074 #define COMMANDS_ALLOC_NUM 128
2075
2076         Command   **my_commands;
2077         FILE       *fd;
2078         int                     lineno;
2079         char       *buf;
2080         int                     alloc_num;
2081
2082         if (num_files >= MAX_FILES)
2083         {
2084                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
2085                 exit(1);
2086         }
2087
2088         alloc_num = COMMANDS_ALLOC_NUM;
2089         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2090
2091         if (strcmp(filename, "-") == 0)
2092                 fd = stdin;
2093         else if ((fd = fopen(filename, "r")) == NULL)
2094         {
2095                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
2096                 return false;
2097         }
2098
2099         lineno = 0;
2100
2101         while ((buf = read_line_from_file(fd)) != NULL)
2102         {
2103                 Command    *command;
2104
2105                 command = process_commands(buf);
2106
2107                 free(buf);
2108
2109                 if (command == NULL)
2110                         continue;
2111
2112                 my_commands[lineno] = command;
2113                 lineno++;
2114
2115                 if (lineno >= alloc_num)
2116                 {
2117                         alloc_num += COMMANDS_ALLOC_NUM;
2118                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2119                 }
2120         }
2121         fclose(fd);
2122
2123         my_commands[lineno] = NULL;
2124
2125         sql_files[num_files++] = my_commands;
2126
2127         return true;
2128 }
2129
2130 static Command **
2131 process_builtin(char *tb)
2132 {
2133 #define COMMANDS_ALLOC_NUM 128
2134
2135         Command   **my_commands;
2136         int                     lineno;
2137         char            buf[BUFSIZ];
2138         int                     alloc_num;
2139
2140         alloc_num = COMMANDS_ALLOC_NUM;
2141         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2142
2143         lineno = 0;
2144
2145         for (;;)
2146         {
2147                 char       *p;
2148                 Command    *command;
2149
2150                 p = buf;
2151                 while (*tb && *tb != '\n')
2152                         *p++ = *tb++;
2153
2154                 if (*tb == '\0')
2155                         break;
2156
2157                 if (*tb == '\n')
2158                         tb++;
2159
2160                 *p = '\0';
2161
2162                 command = process_commands(buf);
2163                 if (command == NULL)
2164                         continue;
2165
2166                 my_commands[lineno] = command;
2167                 lineno++;
2168
2169                 if (lineno >= alloc_num)
2170                 {
2171                         alloc_num += COMMANDS_ALLOC_NUM;
2172                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2173                 }
2174         }
2175
2176         my_commands[lineno] = NULL;
2177
2178         return my_commands;
2179 }
2180
2181 /* print out results */
2182 static void
2183 printResults(int ttype, int64 normal_xacts, int nclients,
2184                          TState *threads, int nthreads,
2185                          instr_time total_time, instr_time conn_total_time,
2186                          int64 total_latencies, int64 total_sqlats,
2187                          int64 throttle_lag, int64 throttle_lag_max)
2188 {
2189         double          time_include,
2190                                 tps_include,
2191                                 tps_exclude;
2192         char       *s;
2193
2194         time_include = INSTR_TIME_GET_DOUBLE(total_time);
2195         tps_include = normal_xacts / time_include;
2196         tps_exclude = normal_xacts / (time_include -
2197                                                 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2198
2199         if (ttype == 0)
2200                 s = "TPC-B (sort of)";
2201         else if (ttype == 2)
2202                 s = "Update only pgbench_accounts";
2203         else if (ttype == 1)
2204                 s = "SELECT only";
2205         else
2206                 s = "Custom query";
2207
2208         printf("transaction type: %s\n", s);
2209         printf("scaling factor: %d\n", scale);
2210         printf("query mode: %s\n", QUERYMODE[querymode]);
2211         printf("number of clients: %d\n", nclients);
2212         printf("number of threads: %d\n", nthreads);
2213         if (duration <= 0)
2214         {
2215                 printf("number of transactions per client: %d\n", nxacts);
2216                 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2217                            normal_xacts, (int64) nxacts * nclients);
2218         }
2219         else
2220         {
2221                 printf("duration: %d s\n", duration);
2222                 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2223                            normal_xacts);
2224         }
2225
2226         if (throttle_delay || progress)
2227         {
2228                 /* compute and show latency average and standard deviation */
2229                 double          latency = 0.001 * total_latencies / normal_xacts;
2230                 double          sqlat = (double) total_sqlats / normal_xacts;
2231
2232                 printf("latency average: %.3f ms\n"
2233                            "latency stddev: %.3f ms\n",
2234                            latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2235         }
2236         else
2237         {
2238                 /* only an average latency computed from the duration is available */
2239                 printf("latency average: %.3f ms\n",
2240                            1000.0 * duration * nclients / normal_xacts);
2241         }
2242
2243         if (throttle_delay)
2244         {
2245                 /*
2246                  * Report average transaction lag under rate limit throttling.  This
2247                  * is the delay between scheduled and actual start times for the
2248                  * transaction.  The measured lag may be caused by thread/client load,
2249                  * the database load, or the Poisson throttling process.
2250                  */
2251                 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2252                            0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2253         }
2254
2255         printf("tps = %f (including connections establishing)\n", tps_include);
2256         printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2257
2258         /* Report per-command latencies */
2259         if (is_latencies)
2260         {
2261                 int                     i;
2262
2263                 for (i = 0; i < num_files; i++)
2264                 {
2265                         Command   **commands;
2266
2267                         if (num_files > 1)
2268                                 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2269                         else
2270                                 printf("statement latencies in milliseconds:\n");
2271
2272                         for (commands = sql_files[i]; *commands != NULL; commands++)
2273                         {
2274                                 Command    *command = *commands;
2275                                 int                     cnum = command->command_num;
2276                                 double          total_time;
2277                                 instr_time      total_exec_elapsed;
2278                                 int                     total_exec_count;
2279                                 int                     t;
2280
2281                                 /* Accumulate per-thread data for command */
2282                                 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2283                                 total_exec_count = 0;
2284                                 for (t = 0; t < nthreads; t++)
2285                                 {
2286                                         TState     *thread = &threads[t];
2287
2288                                         INSTR_TIME_ADD(total_exec_elapsed,
2289                                                                    thread->exec_elapsed[cnum]);
2290                                         total_exec_count += thread->exec_count[cnum];
2291                                 }
2292
2293                                 if (total_exec_count > 0)
2294                                         total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2295                                 else
2296                                         total_time = 0.0;
2297
2298                                 printf("\t%f\t%s\n", total_time, command->line);
2299                         }
2300                 }
2301         }
2302 }
2303
2304
2305 int
2306 main(int argc, char **argv)
2307 {
2308         static struct option long_options[] = {
2309                 /* systematic long/short named options */
2310                 {"client", required_argument, NULL, 'c'},
2311                 {"connect", no_argument, NULL, 'C'},
2312                 {"debug", no_argument, NULL, 'd'},
2313                 {"define", required_argument, NULL, 'D'},
2314                 {"file", required_argument, NULL, 'f'},
2315                 {"fillfactor", required_argument, NULL, 'F'},
2316                 {"host", required_argument, NULL, 'h'},
2317                 {"initialize", no_argument, NULL, 'i'},
2318                 {"jobs", required_argument, NULL, 'j'},
2319                 {"log", no_argument, NULL, 'l'},
2320                 {"no-vacuum", no_argument, NULL, 'n'},
2321                 {"port", required_argument, NULL, 'p'},
2322                 {"progress", required_argument, NULL, 'P'},
2323                 {"protocol", required_argument, NULL, 'M'},
2324                 {"quiet", no_argument, NULL, 'q'},
2325                 {"report-latencies", no_argument, NULL, 'r'},
2326                 {"scale", required_argument, NULL, 's'},
2327                 {"select-only", no_argument, NULL, 'S'},
2328                 {"skip-some-updates", no_argument, NULL, 'N'},
2329                 {"time", required_argument, NULL, 'T'},
2330                 {"transactions", required_argument, NULL, 't'},
2331                 {"username", required_argument, NULL, 'U'},
2332                 {"vacuum-all", no_argument, NULL, 'v'},
2333                 /* long-named only options */
2334                 {"foreign-keys", no_argument, &foreign_keys, 1},
2335                 {"index-tablespace", required_argument, NULL, 3},
2336                 {"tablespace", required_argument, NULL, 2},
2337                 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2338                 {"sampling-rate", required_argument, NULL, 4},
2339                 {"aggregate-interval", required_argument, NULL, 5},
2340                 {"rate", required_argument, NULL, 'R'},
2341                 {NULL, 0, NULL, 0}
2342         };
2343
2344         int                     c;
2345         int                     nclients = 1;   /* default number of simulated clients */
2346         int                     nthreads = 1;   /* default number of threads */
2347         int                     is_init_mode = 0;               /* initialize mode? */
2348         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
2349         int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2350         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
2351                                                                  * 2: skip update of branches and tellers */
2352         int                     optindex;
2353         char       *filename = NULL;
2354         bool            scale_given = false;
2355
2356         CState     *state;                      /* status of clients */
2357         TState     *threads;            /* array of thread */
2358
2359         instr_time      start_time;             /* start up time */
2360         instr_time      total_time;
2361         instr_time      conn_total_time;
2362         int64           total_xacts = 0;
2363         int64           total_latencies = 0;
2364         int64           total_sqlats = 0;
2365         int64           throttle_lag = 0;
2366         int64           throttle_lag_max = 0;
2367
2368         int                     i;
2369
2370 #ifdef HAVE_GETRLIMIT
2371         struct rlimit rlim;
2372 #endif
2373
2374         PGconn     *con;
2375         PGresult   *res;
2376         char       *env;
2377
2378         char            val[64];
2379
2380         progname = get_progname(argv[0]);
2381
2382         if (argc > 1)
2383         {
2384                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2385                 {
2386                         usage();
2387                         exit(0);
2388                 }
2389                 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2390                 {
2391                         puts("pgbench (PostgreSQL) " PG_VERSION);
2392                         exit(0);
2393                 }
2394         }
2395
2396 #ifdef WIN32
2397         /* stderr is buffered on Win32. */
2398         setvbuf(stderr, NULL, _IONBF, 0);
2399 #endif
2400
2401         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2402                 pghost = env;
2403         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2404                 pgport = env;
2405         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2406                 login = env;
2407
2408         state = (CState *) pg_malloc(sizeof(CState));
2409         memset(state, 0, sizeof(CState));
2410
2411         while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
2412         {
2413                 switch (c)
2414                 {
2415                         case 'i':
2416                                 is_init_mode++;
2417                                 break;
2418                         case 'h':
2419                                 pghost = pg_strdup(optarg);
2420                                 break;
2421                         case 'n':
2422                                 is_no_vacuum++;
2423                                 break;
2424                         case 'v':
2425                                 do_vacuum_accounts++;
2426                                 break;
2427                         case 'p':
2428                                 pgport = pg_strdup(optarg);
2429                                 break;
2430                         case 'd':
2431                                 debug++;
2432                                 break;
2433                         case 'S':
2434                                 ttype = 1;
2435                                 break;
2436                         case 'N':
2437                                 ttype = 2;
2438                                 break;
2439                         case 'c':
2440                                 nclients = atoi(optarg);
2441                                 if (nclients <= 0 || nclients > MAXCLIENTS)
2442                                 {
2443                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
2444                                         exit(1);
2445                                 }
2446 #ifdef HAVE_GETRLIMIT
2447 #ifdef RLIMIT_NOFILE                    /* most platforms use RLIMIT_NOFILE */
2448                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2449 #else                                                   /* but BSD doesn't ... */
2450                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2451 #endif   /* RLIMIT_NOFILE */
2452                                 {
2453                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2454                                         exit(1);
2455                                 }
2456                                 if (rlim.rlim_cur <= (nclients + 2))
2457                                 {
2458                                         fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2459                                         fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2460                                         exit(1);
2461                                 }
2462 #endif   /* HAVE_GETRLIMIT */
2463                                 break;
2464                         case 'j':                       /* jobs */
2465                                 nthreads = atoi(optarg);
2466                                 if (nthreads <= 0)
2467                                 {
2468                                         fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2469                                         exit(1);
2470                                 }
2471                                 break;
2472                         case 'C':
2473                                 is_connect = true;
2474                                 break;
2475                         case 'r':
2476                                 is_latencies = true;
2477                                 break;
2478                         case 's':
2479                                 scale_given = true;
2480                                 scale = atoi(optarg);
2481                                 if (scale <= 0)
2482                                 {
2483                                         fprintf(stderr, "invalid scaling factor: %d\n", scale);
2484                                         exit(1);
2485                                 }
2486                                 break;
2487                         case 't':
2488                                 if (duration > 0)
2489                                 {
2490                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2491                                         exit(1);
2492                                 }
2493                                 nxacts = atoi(optarg);
2494                                 if (nxacts <= 0)
2495                                 {
2496                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2497                                         exit(1);
2498                                 }
2499                                 break;
2500                         case 'T':
2501                                 if (nxacts > 0)
2502                                 {
2503                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2504                                         exit(1);
2505                                 }
2506                                 duration = atoi(optarg);
2507                                 if (duration <= 0)
2508                                 {
2509                                         fprintf(stderr, "invalid duration: %d\n", duration);
2510                                         exit(1);
2511                                 }
2512                                 break;
2513                         case 'U':
2514                                 login = pg_strdup(optarg);
2515                                 break;
2516                         case 'l':
2517                                 use_log = true;
2518                                 break;
2519                         case 'q':
2520                                 use_quiet = true;
2521                                 break;
2522                         case 'f':
2523                                 ttype = 3;
2524                                 filename = pg_strdup(optarg);
2525                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2526                                         exit(1);
2527                                 break;
2528                         case 'D':
2529                                 {
2530                                         char       *p;
2531
2532                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2533                                         {
2534                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2535                                                 exit(1);
2536                                         }
2537
2538                                         *p++ = '\0';
2539                                         if (!putVariable(&state[0], "option", optarg, p))
2540                                                 exit(1);
2541                                 }
2542                                 break;
2543                         case 'F':
2544                                 fillfactor = atoi(optarg);
2545                                 if ((fillfactor < 10) || (fillfactor > 100))
2546                                 {
2547                                         fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2548                                         exit(1);
2549                                 }
2550                                 break;
2551                         case 'M':
2552                                 if (num_files > 0)
2553                                 {
2554                                         fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
2555                                         exit(1);
2556                                 }
2557                                 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2558                                         if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2559                                                 break;
2560                                 if (querymode >= NUM_QUERYMODE)
2561                                 {
2562                                         fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
2563                                         exit(1);
2564                                 }
2565                                 break;
2566                         case 'P':
2567                                 progress = atoi(optarg);
2568                                 if (progress <= 0)
2569                                 {
2570                                         fprintf(stderr,
2571                                                 "thread progress delay (-P) must be positive (%s)\n",
2572                                                         optarg);
2573                                         exit(1);
2574                                 }
2575                                 break;
2576                         case 'R':
2577                                 {
2578                                         /* get a double from the beginning of option value */
2579                                         double          throttle_value = atof(optarg);
2580
2581                                         if (throttle_value <= 0.0)
2582                                         {
2583                                                 fprintf(stderr, "invalid rate limit: %s\n", optarg);
2584                                                 exit(1);
2585                                         }
2586                                         /* Invert rate limit into a time offset */
2587                                         throttle_delay = (int64) (1000000.0 / throttle_value);
2588                                 }
2589                                 break;
2590                         case 0:
2591                                 /* This covers long options which take no argument. */
2592                                 break;
2593                         case 2:                         /* tablespace */
2594                                 tablespace = pg_strdup(optarg);
2595                                 break;
2596                         case 3:                         /* index-tablespace */
2597                                 index_tablespace = pg_strdup(optarg);
2598                                 break;
2599                         case 4:
2600                                 sample_rate = atof(optarg);
2601                                 if (sample_rate <= 0.0 || sample_rate > 1.0)
2602                                 {
2603                                         fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
2604                                         exit(1);
2605                                 }
2606                                 break;
2607                         case 5:
2608 #ifdef WIN32
2609                                 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
2610                                 exit(1);
2611 #else
2612                                 agg_interval = atoi(optarg);
2613                                 if (agg_interval <= 0)
2614                                 {
2615                                         fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
2616                                         exit(1);
2617                                 }
2618 #endif
2619                                 break;
2620                         default:
2621                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
2622                                 exit(1);
2623                                 break;
2624                 }
2625         }
2626
2627         /* compute a per thread delay */
2628         throttle_delay *= nthreads;
2629
2630         if (argc > optind)
2631                 dbName = argv[optind];
2632         else
2633         {
2634                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
2635                         dbName = env;
2636                 else if (login != NULL && *login != '\0')
2637                         dbName = login;
2638                 else
2639                         dbName = "";
2640         }
2641
2642         if (is_init_mode)
2643         {
2644                 init(is_no_vacuum);
2645                 exit(0);
2646         }
2647
2648         /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
2649         if (nxacts <= 0 && duration <= 0)
2650                 nxacts = DEFAULT_NXACTS;
2651
2652         if (nclients % nthreads != 0)
2653         {
2654                 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
2655                 exit(1);
2656         }
2657
2658         /* --sampling-rate may be used only with -l */
2659         if (sample_rate > 0.0 && !use_log)
2660         {
2661                 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
2662                 exit(1);
2663         }
2664
2665         /* -q may be used only with -i */
2666         if (use_quiet && !is_init_mode)
2667         {
2668                 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
2669                 exit(1);
2670         }
2671
2672         /* --sampling-rate may must not be used with --aggregate-interval */
2673         if (sample_rate > 0.0 && agg_interval > 0)
2674         {
2675                 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
2676                 exit(1);
2677         }
2678
2679         if (agg_interval > 0 && (!use_log))
2680         {
2681                 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
2682                 exit(1);
2683         }
2684
2685         if ((duration > 0) && (agg_interval > duration))
2686         {
2687                 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
2688                 exit(1);
2689         }
2690
2691         if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
2692         {
2693                 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
2694                 exit(1);
2695         }
2696
2697         /*
2698          * is_latencies only works with multiple threads in thread-based
2699          * implementations, not fork-based ones, because it supposes that the
2700          * parent can see changes made to the per-thread execution stats by child
2701          * threads.  It seems useful enough to accept despite this limitation, but
2702          * perhaps we should FIXME someday (by passing the stats data back up
2703          * through the parent-to-child pipes).
2704          */
2705 #ifndef ENABLE_THREAD_SAFETY
2706         if (is_latencies && nthreads > 1)
2707         {
2708                 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
2709                 exit(1);
2710         }
2711 #endif
2712
2713         /*
2714          * save main process id in the global variable because process id will be
2715          * changed after fork.
2716          */
2717         main_pid = (int) getpid();
2718         progress_nclients = nclients;
2719         progress_nthreads = nthreads;
2720
2721         if (nclients > 1)
2722         {
2723                 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
2724                 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
2725
2726                 /* copy any -D switch values to all clients */
2727                 for (i = 1; i < nclients; i++)
2728                 {
2729                         int                     j;
2730
2731                         state[i].id = i;
2732                         for (j = 0; j < state[0].nvariables; j++)
2733                         {
2734                                 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
2735                                         exit(1);
2736                         }
2737                 }
2738         }
2739
2740         if (debug)
2741         {
2742                 if (duration <= 0)
2743                         printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
2744                                    pghost, pgport, nclients, nxacts, dbName);
2745                 else
2746                         printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
2747                                    pghost, pgport, nclients, duration, dbName);
2748         }
2749
2750         /* opening connection... */
2751         con = doConnect();
2752         if (con == NULL)
2753                 exit(1);
2754
2755         if (PQstatus(con) == CONNECTION_BAD)
2756         {
2757                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
2758                 fprintf(stderr, "%s", PQerrorMessage(con));
2759                 exit(1);
2760         }
2761
2762         if (ttype != 3)
2763         {
2764                 /*
2765                  * get the scaling factor that should be same as count(*) from
2766                  * pgbench_branches if this is not a custom query
2767                  */
2768                 res = PQexec(con, "select count(*) from pgbench_branches");
2769                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2770                 {
2771                         fprintf(stderr, "%s", PQerrorMessage(con));
2772                         exit(1);
2773                 }
2774                 scale = atoi(PQgetvalue(res, 0, 0));
2775                 if (scale < 0)
2776                 {
2777                         fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
2778                         exit(1);
2779                 }
2780                 PQclear(res);
2781
2782                 /* warn if we override user-given -s switch */
2783                 if (scale_given)
2784                         fprintf(stderr,
2785                         "Scale option ignored, using pgbench_branches table count = %d\n",
2786                                         scale);
2787         }
2788
2789         /*
2790          * :scale variables normally get -s or database scale, but don't override
2791          * an explicit -D switch
2792          */
2793         if (getVariable(&state[0], "scale") == NULL)
2794         {
2795                 snprintf(val, sizeof(val), "%d", scale);
2796                 for (i = 0; i < nclients; i++)
2797                 {
2798                         if (!putVariable(&state[i], "startup", "scale", val))
2799                                 exit(1);
2800                 }
2801         }
2802
2803         /*
2804          * Define a :client_id variable that is unique per connection. But don't
2805          * override an explicit -D switch.
2806          */
2807         if (getVariable(&state[0], "client_id") == NULL)
2808         {
2809                 for (i = 0; i < nclients; i++)
2810                 {
2811                         snprintf(val, sizeof(val), "%d", i);
2812                         if (!putVariable(&state[i], "startup", "client_id", val))
2813                                 exit(1);
2814                 }
2815         }
2816
2817         if (!is_no_vacuum)
2818         {
2819                 fprintf(stderr, "starting vacuum...");
2820                 executeStatement(con, "vacuum pgbench_branches");
2821                 executeStatement(con, "vacuum pgbench_tellers");
2822                 executeStatement(con, "truncate pgbench_history");
2823                 fprintf(stderr, "end.\n");
2824
2825                 if (do_vacuum_accounts)
2826                 {
2827                         fprintf(stderr, "starting vacuum pgbench_accounts...");
2828                         executeStatement(con, "vacuum analyze pgbench_accounts");
2829                         fprintf(stderr, "end.\n");
2830                 }
2831         }
2832         PQfinish(con);
2833
2834         /* set random seed */
2835         INSTR_TIME_SET_CURRENT(start_time);
2836         srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
2837
2838         /* process builtin SQL scripts */
2839         switch (ttype)
2840         {
2841                 case 0:
2842                         sql_files[0] = process_builtin(tpc_b);
2843                         num_files = 1;
2844                         break;
2845
2846                 case 1:
2847                         sql_files[0] = process_builtin(select_only);
2848                         num_files = 1;
2849                         break;
2850
2851                 case 2:
2852                         sql_files[0] = process_builtin(simple_update);
2853                         num_files = 1;
2854                         break;
2855
2856                 default:
2857                         break;
2858         }
2859
2860         /* set up thread data structures */
2861         threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
2862         for (i = 0; i < nthreads; i++)
2863         {
2864                 TState     *thread = &threads[i];
2865
2866                 thread->tid = i;
2867                 thread->state = &state[nclients / nthreads * i];
2868                 thread->nstate = nclients / nthreads;
2869                 thread->random_state[0] = random();
2870                 thread->random_state[1] = random();
2871                 thread->random_state[2] = random();
2872
2873                 if (is_latencies)
2874                 {
2875                         /* Reserve memory for the thread to store per-command latencies */
2876                         int                     t;
2877
2878                         thread->exec_elapsed = (instr_time *)
2879                                 pg_malloc(sizeof(instr_time) * num_commands);
2880                         thread->exec_count = (int *)
2881                                 pg_malloc(sizeof(int) * num_commands);
2882
2883                         for (t = 0; t < num_commands; t++)
2884                         {
2885                                 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
2886                                 thread->exec_count[t] = 0;
2887                         }
2888                 }
2889                 else
2890                 {
2891                         thread->exec_elapsed = NULL;
2892                         thread->exec_count = NULL;
2893                 }
2894         }
2895
2896         /* get start up time */
2897         INSTR_TIME_SET_CURRENT(start_time);
2898
2899         /* set alarm if duration is specified. */
2900         if (duration > 0)
2901                 setalarm(duration);
2902
2903         /* start threads */
2904         for (i = 0; i < nthreads; i++)
2905         {
2906                 TState     *thread = &threads[i];
2907
2908                 INSTR_TIME_SET_CURRENT(thread->start_time);
2909
2910                 /* the first thread (i = 0) is executed by main thread */
2911                 if (i > 0)
2912                 {
2913                         int                     err = pthread_create(&thread->thread, NULL, threadRun, thread);
2914
2915                         if (err != 0 || thread->thread == INVALID_THREAD)
2916                         {
2917                                 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
2918                                 exit(1);
2919                         }
2920                 }
2921                 else
2922                 {
2923                         thread->thread = INVALID_THREAD;
2924                 }
2925         }
2926
2927         /* wait for threads and accumulate results */
2928         INSTR_TIME_SET_ZERO(conn_total_time);
2929         for (i = 0; i < nthreads; i++)
2930         {
2931                 void       *ret = NULL;
2932
2933                 if (threads[i].thread == INVALID_THREAD)
2934                         ret = threadRun(&threads[i]);
2935                 else
2936                         pthread_join(threads[i].thread, &ret);
2937
2938                 if (ret != NULL)
2939                 {
2940                         TResult    *r = (TResult *) ret;
2941
2942                         total_xacts += r->xacts;
2943                         total_latencies += r->latencies;
2944                         total_sqlats += r->sqlats;
2945                         throttle_lag += r->throttle_lag;
2946                         if (r->throttle_lag_max > throttle_lag_max)
2947                                 throttle_lag_max = r->throttle_lag_max;
2948                         INSTR_TIME_ADD(conn_total_time, r->conn_time);
2949                         free(ret);
2950                 }
2951         }
2952         disconnect_all(state, nclients);
2953
2954         /*
2955          * XXX We compute results as though every client of every thread started
2956          * and finished at the same time.  That model can diverge noticeably from
2957          * reality for a short benchmark run involving relatively many threads.
2958          * The first thread may process notably many transactions before the last
2959          * thread begins.  Improving the model alone would bring limited benefit,
2960          * because performance during those periods of partial thread count can
2961          * easily exceed steady state performance.  This is one of the many ways
2962          * short runs convey deceptive performance figures.
2963          */
2964         INSTR_TIME_SET_CURRENT(total_time);
2965         INSTR_TIME_SUBTRACT(total_time, start_time);
2966         printResults(ttype, total_xacts, nclients, threads, nthreads,
2967                                  total_time, conn_total_time, total_latencies, total_sqlats,
2968                                  throttle_lag, throttle_lag_max);
2969
2970         return 0;
2971 }
2972
2973 static void *
2974 threadRun(void *arg)
2975 {
2976         TState     *thread = (TState *) arg;
2977         CState     *state = thread->state;
2978         TResult    *result;
2979         FILE       *logfile = NULL; /* per-thread log file */
2980         instr_time      start,
2981                                 end;
2982         int                     nstate = thread->nstate;
2983         int                     remains = nstate;               /* number of remaining clients */
2984         int                     i;
2985
2986         /* for reporting progress: */
2987         int64           thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
2988         int64           last_report = thread_start;
2989         int64           next_report = last_report + (int64) progress * 1000000;
2990         int64           last_count = 0,
2991                                 last_lats = 0,
2992                                 last_sqlats = 0,
2993                                 last_lags = 0;
2994
2995         AggVals         aggs;
2996
2997         /*
2998          * Initialize throttling rate target for all of the thread's clients.  It
2999          * might be a little more accurate to reset thread->start_time here too.
3000          * The possible drift seems too small relative to typical throttle delay
3001          * times to worry about it.
3002          */
3003         INSTR_TIME_SET_CURRENT(start);
3004         thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3005         thread->throttle_lag = 0;
3006         thread->throttle_lag_max = 0;
3007
3008         result = pg_malloc(sizeof(TResult));
3009
3010         INSTR_TIME_SET_ZERO(result->conn_time);
3011
3012         /* open log file if requested */
3013         if (use_log)
3014         {
3015                 char            logpath[64];
3016
3017                 if (thread->tid == 0)
3018                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3019                 else
3020                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3021                 logfile = fopen(logpath, "w");
3022
3023                 if (logfile == NULL)
3024                 {
3025                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
3026                         goto done;
3027                 }
3028         }
3029
3030         if (!is_connect)
3031         {
3032                 /* make connections to the database */
3033                 for (i = 0; i < nstate; i++)
3034                 {
3035                         if ((state[i].con = doConnect()) == NULL)
3036                                 goto done;
3037                 }
3038         }
3039
3040         /* time after thread and connections set up */
3041         INSTR_TIME_SET_CURRENT(result->conn_time);
3042         INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
3043
3044         agg_vals_init(&aggs, thread->start_time);
3045
3046         /* send start up queries in async manner */
3047         for (i = 0; i < nstate; i++)
3048         {
3049                 CState     *st = &state[i];
3050                 Command   **commands = sql_files[st->use_file];
3051                 int                     prev_ecnt = st->ecnt;
3052
3053                 st->use_file = getrand(thread, 0, num_files - 1);
3054                 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3055                         remains--;                      /* I've aborted */
3056
3057                 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3058                 {
3059                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
3060                         remains--;                      /* I've aborted */
3061                         PQfinish(st->con);
3062                         st->con = NULL;
3063                 }
3064         }
3065
3066         while (remains > 0)
3067         {
3068                 fd_set          input_mask;
3069                 int                     maxsock;        /* max socket number to be waited */
3070                 int64           now_usec = 0;
3071                 int64           min_usec;
3072
3073                 FD_ZERO(&input_mask);
3074
3075                 maxsock = -1;
3076                 min_usec = INT64_MAX;
3077                 for (i = 0; i < nstate; i++)
3078                 {
3079                         CState     *st = &state[i];
3080                         Command   **commands = sql_files[st->use_file];
3081                         int                     sock;
3082
3083                         if (st->con == NULL)
3084                         {
3085                                 continue;
3086                         }
3087                         else if (st->sleeping)
3088                         {
3089                                 if (st->throttling && timer_exceeded)
3090                                 {
3091                                         /* interrupt client which has not started a transaction */
3092                                         remains--;
3093                                         st->sleeping = 0;
3094                                         st->throttling = false;
3095                                         PQfinish(st->con);
3096                                         st->con = NULL;
3097                                         continue;
3098                                 }
3099                                 else    /* just a nap from the script */
3100                                 {
3101                                         int                     this_usec;
3102
3103                                         if (min_usec == INT64_MAX)
3104                                         {
3105                                                 instr_time      now;
3106
3107                                                 INSTR_TIME_SET_CURRENT(now);
3108                                                 now_usec = INSTR_TIME_GET_MICROSEC(now);
3109                                         }
3110
3111                                         this_usec = st->until - now_usec;
3112                                         if (min_usec > this_usec)
3113                                                 min_usec = this_usec;
3114                                 }
3115                         }
3116                         else if (commands[st->state]->type == META_COMMAND)
3117                         {
3118                                 min_usec = 0;   /* the connection is ready to run */
3119                                 break;
3120                         }
3121
3122                         sock = PQsocket(st->con);
3123                         if (sock < 0)
3124                         {
3125                                 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3126                                 goto done;
3127                         }
3128
3129                         FD_SET(sock, &input_mask);
3130
3131                         if (maxsock < sock)
3132                                 maxsock = sock;
3133                 }
3134
3135                 if (min_usec > 0 && maxsock != -1)
3136                 {
3137                         int                     nsocks; /* return from select(2) */
3138
3139                         if (min_usec != INT64_MAX)
3140                         {
3141                                 struct timeval timeout;
3142
3143                                 timeout.tv_sec = min_usec / 1000000;
3144                                 timeout.tv_usec = min_usec % 1000000;
3145                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3146                         }
3147                         else
3148                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3149                         if (nsocks < 0)
3150                         {
3151                                 if (errno == EINTR)
3152                                         continue;
3153                                 /* must be something wrong */
3154                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
3155                                 goto done;
3156                         }
3157                 }
3158
3159                 /* ok, backend returns reply */
3160                 for (i = 0; i < nstate; i++)
3161                 {
3162                         CState     *st = &state[i];
3163                         Command   **commands = sql_files[st->use_file];
3164                         int                     prev_ecnt = st->ecnt;
3165
3166                         if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3167                                                         || commands[st->state]->type == META_COMMAND))
3168                         {
3169                                 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3170                                         remains--;      /* I've aborted */
3171                         }
3172
3173                         if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3174                         {
3175                                 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
3176                                 remains--;              /* I've aborted */
3177                                 PQfinish(st->con);
3178                                 st->con = NULL;
3179                         }
3180                 }
3181
3182 #ifdef PTHREAD_FORK_EMULATION
3183                 /* each process reports its own progression */
3184                 if (progress)
3185                 {
3186                         instr_time      now_time;
3187                         int64           now;
3188
3189                         INSTR_TIME_SET_CURRENT(now_time);
3190                         now = INSTR_TIME_GET_MICROSEC(now_time);
3191                         if (now >= next_report)
3192                         {
3193                                 /* generate and show report */
3194                                 int64           count = 0,
3195                                                         lats = 0,
3196                                                         sqlats = 0;
3197                                 int64           lags = thread->throttle_lag;
3198                                 int64           run = now - last_report;
3199                                 double          tps,
3200                                                         total_run,
3201                                                         latency,
3202                                                         sqlat,
3203                                                         stdev,
3204                                                         lag;
3205
3206                                 for (i = 0; i < nstate; i++)
3207                                 {
3208                                         count += state[i].cnt;
3209                                         lats += state[i].txn_latencies;
3210                                         sqlats += state[i].txn_sqlats;
3211                                 }
3212
3213                                 total_run = (now - thread_start) / 1000000.0;
3214                                 tps = 1000000.0 * (count - last_count) / run;
3215                                 latency = 0.001 * (lats - last_lats) / (count - last_count);
3216                                 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3217                                 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3218                                 lag = 0.001 * (lags - last_lags) / (count - last_count);
3219
3220                                 if (throttle_delay)
3221                                         fprintf(stderr,
3222                                                         "progress %d: %.1f s, %.1f tps, "
3223                                                         "lat %.3f ms stddev %.3f, lag %.3f ms\n",
3224                                                         thread->tid, total_run, tps, latency, stdev, lag);
3225                                 else
3226                                         fprintf(stderr,
3227                                                         "progress %d: %.1f s, %.1f tps, "
3228                                                         "lat %.3f ms stddev %.3f\n",
3229                                                         thread->tid, total_run, tps, latency, stdev);
3230
3231                                 last_count = count;
3232                                 last_lats = lats;
3233                                 last_sqlats = sqlats;
3234                                 last_lags = lags;
3235                                 last_report = now;
3236                                 next_report += (int64) progress *1000000;
3237                         }
3238                 }
3239 #else
3240                 /* progress report by thread 0 for all threads */
3241                 if (progress && thread->tid == 0)
3242                 {
3243                         instr_time      now_time;
3244                         int64           now;
3245
3246                         INSTR_TIME_SET_CURRENT(now_time);
3247                         now = INSTR_TIME_GET_MICROSEC(now_time);
3248                         if (now >= next_report)
3249                         {
3250                                 /* generate and show report */
3251                                 int64           count = 0,
3252                                                         lats = 0,
3253                                                         sqlats = 0,
3254                                                         lags = 0;
3255                                 int64           run = now - last_report;
3256                                 double          tps,
3257                                                         total_run,
3258                                                         latency,
3259                                                         sqlat,
3260                                                         lag,
3261                                                         stdev;
3262
3263                                 for (i = 0; i < progress_nclients; i++)
3264                                 {
3265                                         count += state[i].cnt;
3266                                         lats += state[i].txn_latencies;
3267                                         sqlats += state[i].txn_sqlats;
3268                                 }
3269
3270                                 for (i = 0; i < progress_nthreads; i++)
3271                                         lags += thread[i].throttle_lag;
3272
3273                                 total_run = (now - thread_start) / 1000000.0;
3274                                 tps = 1000000.0 * (count - last_count) / run;
3275                                 latency = 0.001 * (lats - last_lats) / (count - last_count);
3276                                 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3277                                 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3278                                 lag = 0.001 * (lags - last_lags) / (count - last_count);
3279
3280                                 if (throttle_delay)
3281                                         fprintf(stderr,
3282                                                         "progress: %.1f s, %.1f tps, "
3283                                                         "lat %.3f ms stddev %.3f, lag %.3f ms\n",
3284                                                         total_run, tps, latency, stdev, lag);
3285                                 else
3286                                         fprintf(stderr,
3287                                                         "progress: %.1f s, %.1f tps, "
3288                                                         "lat %.3f ms stddev %.3f\n",
3289                                                         total_run, tps, latency, stdev);
3290
3291                                 last_count = count;
3292                                 last_lats = lats;
3293                                 last_sqlats = sqlats;
3294                                 last_lags = lags;
3295                                 last_report = now;
3296                                 next_report += (int64) progress *1000000;
3297                         }
3298                 }
3299 #endif   /* PTHREAD_FORK_EMULATION */
3300         }
3301
3302 done:
3303         INSTR_TIME_SET_CURRENT(start);
3304         disconnect_all(state, nstate);
3305         result->xacts = 0;
3306         result->latencies = 0;
3307         result->sqlats = 0;
3308         for (i = 0; i < nstate; i++)
3309         {
3310                 result->xacts += state[i].cnt;
3311                 result->latencies += state[i].txn_latencies;
3312                 result->sqlats += state[i].txn_sqlats;
3313         }
3314         result->throttle_lag = thread->throttle_lag;
3315         result->throttle_lag_max = thread->throttle_lag_max;
3316         INSTR_TIME_SET_CURRENT(end);
3317         INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
3318         if (logfile)
3319                 fclose(logfile);
3320         return result;
3321 }
3322
3323 /*
3324  * Support for duration option: set timer_exceeded after so many seconds.
3325  */
3326
3327 #ifndef WIN32
3328
3329 static void
3330 handle_sig_alarm(SIGNAL_ARGS)
3331 {
3332         timer_exceeded = true;
3333 }
3334
3335 static void
3336 setalarm(int seconds)
3337 {
3338         pqsignal(SIGALRM, handle_sig_alarm);
3339         alarm(seconds);
3340 }
3341
3342 #ifndef ENABLE_THREAD_SAFETY
3343
3344 /*
3345  * implements pthread using fork.
3346  */
3347
3348 typedef struct fork_pthread
3349 {
3350         pid_t           pid;
3351         int                     pipes[2];
3352 }       fork_pthread;
3353
3354 static int
3355 pthread_create(pthread_t *thread,
3356                            pthread_attr_t *attr,
3357                            void *(*start_routine) (void *),
3358                            void *arg)
3359 {
3360         fork_pthread *th;
3361         void       *ret;
3362         int                     rc;
3363
3364         th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
3365         if (pipe(th->pipes) < 0)
3366         {
3367                 free(th);
3368                 return errno;
3369         }
3370
3371         th->pid = fork();
3372         if (th->pid == -1)                      /* error */
3373         {
3374                 free(th);
3375                 return errno;
3376         }
3377         if (th->pid != 0)                       /* in parent process */
3378         {
3379                 close(th->pipes[1]);
3380                 *thread = th;
3381                 return 0;
3382         }
3383
3384         /* in child process */
3385         close(th->pipes[0]);
3386
3387         /* set alarm again because the child does not inherit timers */
3388         if (duration > 0)
3389                 setalarm(duration);
3390
3391         ret = start_routine(arg);
3392         rc = write(th->pipes[1], ret, sizeof(TResult));
3393         (void) rc;
3394         close(th->pipes[1]);
3395         free(th);
3396         exit(0);
3397 }
3398
3399 static int
3400 pthread_join(pthread_t th, void **thread_return)
3401 {
3402         int                     status;
3403
3404         while (waitpid(th->pid, &status, 0) != th->pid)
3405         {
3406                 if (errno != EINTR)
3407                         return errno;
3408         }
3409
3410         if (thread_return != NULL)
3411         {
3412                 /* assume result is TResult */
3413                 *thread_return = pg_malloc(sizeof(TResult));
3414                 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
3415                 {
3416                         free(*thread_return);
3417                         *thread_return = NULL;
3418                 }
3419         }
3420         close(th->pipes[0]);
3421
3422         free(th);
3423         return 0;
3424 }
3425 #endif
3426 #else                                                   /* WIN32 */
3427
3428 static VOID CALLBACK
3429 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3430 {
3431         timer_exceeded = true;
3432 }
3433
3434 static void
3435 setalarm(int seconds)
3436 {
3437         HANDLE          queue;
3438         HANDLE          timer;
3439
3440         /* This function will be called at most once, so we can cheat a bit. */
3441         queue = CreateTimerQueue();
3442         if (seconds > ((DWORD) -1) / 1000 ||
3443                 !CreateTimerQueueTimer(&timer, queue,
3444                                                            win32_timer_callback, NULL, seconds * 1000, 0,
3445                                                            WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3446         {
3447                 fprintf(stderr, "Failed to set timer\n");
3448                 exit(1);
3449         }
3450 }
3451
3452 /* partial pthread implementation for Windows */
3453
3454 typedef struct win32_pthread
3455 {
3456         HANDLE          handle;
3457         void       *(*routine) (void *);
3458         void       *arg;
3459         void       *result;
3460 } win32_pthread;
3461
3462 static unsigned __stdcall
3463 win32_pthread_run(void *arg)
3464 {
3465         win32_pthread *th = (win32_pthread *) arg;
3466
3467         th->result = th->routine(th->arg);
3468
3469         return 0;
3470 }
3471
3472 static int
3473 pthread_create(pthread_t *thread,
3474                            pthread_attr_t *attr,
3475                            void *(*start_routine) (void *),
3476                            void *arg)
3477 {
3478         int                     save_errno;
3479         win32_pthread *th;
3480
3481         th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3482         th->routine = start_routine;
3483         th->arg = arg;
3484         th->result = NULL;
3485
3486         th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3487         if (th->handle == NULL)
3488         {
3489                 save_errno = errno;
3490                 free(th);
3491                 return save_errno;
3492         }
3493
3494         *thread = th;
3495         return 0;
3496 }
3497
3498 static int
3499 pthread_join(pthread_t th, void **thread_return)
3500 {
3501         if (th == NULL || th->handle == NULL)
3502                 return errno = EINVAL;
3503
3504         if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3505         {
3506                 _dosmaperr(GetLastError());
3507                 return errno;
3508         }
3509
3510         if (thread_return)
3511                 *thread_return = th->result;
3512
3513         CloseHandle(th->handle);
3514         free(th);
3515         return 0;
3516 }
3517
3518 #endif   /* WIN32 */