]> granicus.if.org Git - postgresql/blob - src/bin/pgbench/pgbench.c
Review program help output for wording and formatting
[postgresql] / src / bin / pgbench / pgbench.c
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2015, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024                 /* set before winsock2.h is included */
32 #endif   /* ! WIN32 */
33
34 #include "postgres_fe.h"
35
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39
40 #include <ctype.h>
41 #include <math.h>
42 #include <signal.h>
43 #include <sys/time.h>
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
46 #endif
47
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h>               /* for getrlimit */
50 #endif
51
52 #ifndef M_PI
53 #define M_PI 3.14159265358979323846
54 #endif
55
56 #include "pgbench.h"
57
58 #define ERRCODE_UNDEFINED_TABLE  "42P01"
59
60 /*
61  * Multi-platform pthread implementations
62  */
63
64 #ifdef WIN32
65 /* Use native win32 threads on Windows */
66 typedef struct win32_pthread *pthread_t;
67 typedef int pthread_attr_t;
68
69 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
70 static int      pthread_join(pthread_t th, void **thread_return);
71 #elif defined(ENABLE_THREAD_SAFETY)
72 /* Use platform-dependent pthread capability */
73 #include <pthread.h>
74 #else
75 /* No threads implementation, use none (-j 1) */
76 #define pthread_t void *
77 #endif
78
79
80 /********************************************************************
81  * some configurable parameters */
82
83 /* max number of clients allowed */
84 #ifdef FD_SETSIZE
85 #define MAXCLIENTS      (FD_SETSIZE - 10)
86 #else
87 #define MAXCLIENTS      1024
88 #endif
89
90 #define LOG_STEP_SECONDS        5       /* seconds between log messages */
91 #define DEFAULT_NXACTS  10              /* default nxacts */
92
93 #define MIN_GAUSSIAN_THRESHOLD          2.0 /* minimum threshold for gauss */
94
95 int                     nxacts = 0;                     /* number of transactions per client */
96 int                     duration = 0;           /* duration in seconds */
97
98 /*
99  * scaling factor. for example, scale = 10 will make 1000000 tuples in
100  * pgbench_accounts table.
101  */
102 int                     scale = 1;
103
104 /*
105  * fillfactor. for example, fillfactor = 90 will use only 90 percent
106  * space during inserts and leave 10 percent free.
107  */
108 int                     fillfactor = 100;
109
110 /*
111  * create foreign key constraints on the tables?
112  */
113 int                     foreign_keys = 0;
114
115 /*
116  * use unlogged tables?
117  */
118 int                     unlogged_tables = 0;
119
120 /*
121  * log sampling rate (1.0 = log everything, 0.0 = option not given)
122  */
123 double          sample_rate = 0.0;
124
125 /*
126  * When threads are throttled to a given rate limit, this is the target delay
127  * to reach that rate in usec.  0 is the default and means no throttling.
128  */
129 int64           throttle_delay = 0;
130
131 /*
132  * Transactions which take longer than this limit (in usec) are counted as
133  * late, and reported as such, although they are completed anyway. When
134  * throttling is enabled, execution time slots that are more than this late
135  * are skipped altogether, and counted separately.
136  */
137 int64           latency_limit = 0;
138
139 /*
140  * tablespace selection
141  */
142 char       *tablespace = NULL;
143 char       *index_tablespace = NULL;
144
145 /*
146  * end of configurable parameters
147  *********************************************************************/
148
149 #define nbranches       1                       /* Makes little sense to change this.  Change
150                                                                  * -s instead */
151 #define ntellers        10
152 #define naccounts       100000
153
154 /*
155  * The scale factor at/beyond which 32bit integers are incapable of storing
156  * 64bit values.
157  *
158  * Although the actual threshold is 21474, we use 20000 because it is easier to
159  * document and remember, and isn't that far away from the real threshold.
160  */
161 #define SCALE_32BIT_THRESHOLD 20000
162
163 bool            use_log;                        /* log transaction latencies to a file */
164 bool            use_quiet;                      /* quiet logging onto stderr */
165 int                     agg_interval;           /* log aggregates instead of individual
166                                                                  * transactions */
167 int                     progress = 0;           /* thread progress report every this seconds */
168 int                     progress_nclients = 0;          /* number of clients for progress
169                                                                                  * report */
170 int                     progress_nthreads = 0;          /* number of threads for progress
171                                                                                  * report */
172 bool            is_connect;                     /* establish connection for each transaction */
173 bool            is_latencies;           /* report per-command latencies */
174 int                     main_pid;                       /* main process id used in log filename */
175
176 char       *pghost = "";
177 char       *pgport = "";
178 char       *login = NULL;
179 char       *dbName;
180 const char *progname;
181
182 volatile bool timer_exceeded = false;   /* flag from signal handler */
183
184 /* variable definitions */
185 typedef struct
186 {
187         char       *name;                       /* variable name */
188         char       *value;                      /* its value */
189 } Variable;
190
191 #define MAX_FILES               128             /* max number of SQL script files allowed */
192 #define SHELL_COMMAND_SIZE      256 /* maximum size allowed for shell command */
193
194 /*
195  * structures used in custom query mode
196  */
197
198 typedef struct
199 {
200         PGconn     *con;                        /* connection handle to DB */
201         int                     id;                             /* client No. */
202         int                     state;                  /* state No. */
203         int                     listen;                 /* 0 indicates that an async query has been
204                                                                  * sent */
205         int                     sleeping;               /* 1 indicates that the client is napping */
206         bool            throttling;             /* whether nap is for throttling */
207         Variable   *variables;          /* array of variable definitions */
208         int                     nvariables;
209         int64           txn_scheduled;  /* scheduled start time of transaction (usec) */
210         instr_time      txn_begin;              /* used for measuring schedule lag times */
211         instr_time      stmt_begin;             /* used for measuring statement latencies */
212         bool            is_throttled;   /* whether transaction throttling is done */
213         int                     use_file;               /* index in sql_files for this client */
214         bool            prepared[MAX_FILES];
215
216         /* per client collected stats */
217         int                     cnt;                    /* xacts count */
218         int                     ecnt;                   /* error count */
219         int64           txn_latencies;  /* cumulated latencies */
220         int64           txn_sqlats;             /* cumulated square latencies */
221 } CState;
222
223 /*
224  * Thread state
225  */
226 typedef struct
227 {
228         int                     tid;                    /* thread id */
229         pthread_t       thread;                 /* thread handle */
230         CState     *state;                      /* array of CState */
231         int                     nstate;                 /* length of state[] */
232         instr_time      start_time;             /* thread start time */
233         instr_time *exec_elapsed;       /* time spent executing cmds (per Command) */
234         int                *exec_count;         /* number of cmd executions (per Command) */
235         unsigned short random_state[3];         /* separate randomness for each thread */
236         int64           throttle_trigger;               /* previous/next throttling (us) */
237
238         /* per thread collected stats */
239         instr_time      conn_time;
240         int64           throttle_lag;   /* total transaction lag behind throttling */
241         int64           throttle_lag_max;               /* max transaction lag */
242         int64           throttle_latency_skipped;               /* lagging transactions
243                                                                                                  * skipped */
244         int64           latency_late;   /* late transactions */
245 } TState;
246
247 #define INVALID_THREAD          ((pthread_t) 0)
248
249 /*
250  * queries read from files
251  */
252 #define SQL_COMMAND             1
253 #define META_COMMAND    2
254 #define MAX_ARGS                10
255
256 typedef enum QueryMode
257 {
258         QUERY_SIMPLE,                           /* simple query */
259         QUERY_EXTENDED,                         /* extended query */
260         QUERY_PREPARED,                         /* extended query with prepared statements */
261         NUM_QUERYMODE
262 } QueryMode;
263
264 static QueryMode querymode = QUERY_SIMPLE;
265 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
266
267 typedef struct
268 {
269         char       *line;                       /* full text of command line */
270         int                     command_num;    /* unique index of this Command struct */
271         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
272         int                     argc;                   /* number of command words */
273         char       *argv[MAX_ARGS]; /* command word list */
274         int                     cols[MAX_ARGS]; /* corresponding column starting from 1 */
275         PgBenchExpr *expr;                      /* parsed expression */
276 } Command;
277
278 typedef struct
279 {
280
281         long            start_time;             /* when does the interval start */
282         int                     cnt;                    /* number of transactions */
283         int                     skipped;                /* number of transactions skipped under --rate
284                                                                  * and --latency-limit */
285
286         double          min_latency;    /* min/max latencies */
287         double          max_latency;
288         double          sum_latency;    /* sum(latency), sum(latency^2) - for
289                                                                  * estimates */
290         double          sum2_latency;
291
292         double          min_lag;
293         double          max_lag;
294         double          sum_lag;                /* sum(lag) */
295         double          sum2_lag;               /* sum(lag*lag) */
296 } AggVals;
297
298 static Command **sql_files[MAX_FILES];  /* SQL script files */
299 static int      num_files;                      /* number of script files */
300 static int      num_commands = 0;       /* total number of Command structs */
301 static int      debug = 0;                      /* debug flag */
302
303 /* default scenario */
304 static char *tpc_b = {
305         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
306         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
307         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
308         "\\setrandom aid 1 :naccounts\n"
309         "\\setrandom bid 1 :nbranches\n"
310         "\\setrandom tid 1 :ntellers\n"
311         "\\setrandom delta -5000 5000\n"
312         "BEGIN;\n"
313         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
314         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
315         "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
316         "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
317         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
318         "END;\n"
319 };
320
321 /* -N case */
322 static char *simple_update = {
323         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
324         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
325         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
326         "\\setrandom aid 1 :naccounts\n"
327         "\\setrandom bid 1 :nbranches\n"
328         "\\setrandom tid 1 :ntellers\n"
329         "\\setrandom delta -5000 5000\n"
330         "BEGIN;\n"
331         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
332         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
333         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
334         "END;\n"
335 };
336
337 /* -S case */
338 static char *select_only = {
339         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
340         "\\setrandom aid 1 :naccounts\n"
341         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
342 };
343
344 /* Function prototypes */
345 static void setalarm(int seconds);
346 static void *threadRun(void *arg);
347
348 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
349           AggVals *agg, bool skipped);
350
351 static void
352 usage(void)
353 {
354         printf("%s is a benchmarking tool for PostgreSQL.\n\n"
355                    "Usage:\n"
356                    "  %s [OPTION]... [DBNAME]\n"
357                    "\nInitialization options:\n"
358                    "  -i, --initialize         invokes initialization mode\n"
359                    "  -F, --fillfactor=NUM     set fill factor\n"
360                 "  -n, --no-vacuum          do not run VACUUM after initialization\n"
361         "  -q, --quiet              quiet logging (one message each 5 seconds)\n"
362                    "  -s, --scale=NUM          scaling factor\n"
363                    "  --foreign-keys           create foreign key constraints between tables\n"
364                    "  --index-tablespace=TABLESPACE\n"
365         "                           create indexes in the specified tablespace\n"
366          "  --tablespace=TABLESPACE  create tables in the specified tablespace\n"
367                    "  --unlogged-tables        create tables as unlogged tables\n"
368                    "\nBenchmarking options:\n"
369                    "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
370                    "  -C, --connect            establish new connection for each transaction\n"
371                    "  -D, --define=VARNAME=VALUE\n"
372           "                           define variable for use by custom script\n"
373                  "  -f, --file=FILENAME      read transaction script from FILENAME\n"
374                    "  -j, --jobs=NUM           number of threads (default: 1)\n"
375                    "  -l, --log                write transaction times to log file\n"
376         "  -L, --latency-limit=NUM  count transactions lasting more than NUM ms as late\n"
377                    "  -M, --protocol=simple|extended|prepared\n"
378                    "                           protocol for submitting queries (default: simple)\n"
379                    "  -n, --no-vacuum          do not run VACUUM before tests\n"
380                    "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
381                    "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
382                    "  -r, --report-latencies   report average latency per command\n"
383                 "  -R, --rate=NUM           target rate in transactions per second\n"
384                    "  -s, --scale=NUM          report this scale factor in output\n"
385                    "  -S, --select-only        perform SELECT-only transactions\n"
386                    "  -t, --transactions=NUM   number of transactions each client runs (default: 10)\n"
387                  "  -T, --time=NUM           duration of benchmark test in seconds\n"
388                    "  -v, --vacuum-all         vacuum all four standard tables before tests\n"
389                    "  --aggregate-interval=NUM aggregate data over NUM seconds\n"
390                    "  --sampling-rate=NUM      fraction of transactions to log (e.g. 0.01 for 1%%)\n"
391                    "\nCommon options:\n"
392                    "  -d, --debug              print debugging output\n"
393           "  -h, --host=HOSTNAME      database server host or socket directory\n"
394                    "  -p, --port=PORT          database server port number\n"
395                    "  -U, --username=USERNAME  connect as specified database user\n"
396                  "  -V, --version            output version information, then exit\n"
397                    "  -?, --help               show this help, then exit\n"
398                    "\n"
399                    "Report bugs to <pgsql-bugs@postgresql.org>.\n",
400                    progname, progname);
401 }
402
403 /*
404  * strtoint64 -- convert a string to 64-bit integer
405  *
406  * This function is a modified version of scanint8() from
407  * src/backend/utils/adt/int8.c.
408  */
409 int64
410 strtoint64(const char *str)
411 {
412         const char *ptr = str;
413         int64           result = 0;
414         int                     sign = 1;
415
416         /*
417          * Do our own scan, rather than relying on sscanf which might be broken
418          * for long long.
419          */
420
421         /* skip leading spaces */
422         while (*ptr && isspace((unsigned char) *ptr))
423                 ptr++;
424
425         /* handle sign */
426         if (*ptr == '-')
427         {
428                 ptr++;
429
430                 /*
431                  * Do an explicit check for INT64_MIN.  Ugly though this is, it's
432                  * cleaner than trying to get the loop below to handle it portably.
433                  */
434                 if (strncmp(ptr, "9223372036854775808", 19) == 0)
435                 {
436                         result = PG_INT64_MIN;
437                         ptr += 19;
438                         goto gotdigits;
439                 }
440                 sign = -1;
441         }
442         else if (*ptr == '+')
443                 ptr++;
444
445         /* require at least one digit */
446         if (!isdigit((unsigned char) *ptr))
447                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
448
449         /* process digits */
450         while (*ptr && isdigit((unsigned char) *ptr))
451         {
452                 int64           tmp = result * 10 + (*ptr++ - '0');
453
454                 if ((tmp / 10) != result)               /* overflow? */
455                         fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
456                 result = tmp;
457         }
458
459 gotdigits:
460
461         /* allow trailing whitespace, but not other trailing chars */
462         while (*ptr != '\0' && isspace((unsigned char) *ptr))
463                 ptr++;
464
465         if (*ptr != '\0')
466                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
467
468         return ((sign < 0) ? -result : result);
469 }
470
471 /* random number generator: uniform distribution from min to max inclusive */
472 static int64
473 getrand(TState *thread, int64 min, int64 max)
474 {
475         /*
476          * Odd coding is so that min and max have approximately the same chance of
477          * being selected as do numbers between them.
478          *
479          * pg_erand48() is thread-safe and concurrent, which is why we use it
480          * rather than random(), which in glibc is non-reentrant, and therefore
481          * protected by a mutex, and therefore a bottleneck on machines with many
482          * CPUs.
483          */
484         return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
485 }
486
487 /*
488  * random number generator: exponential distribution from min to max inclusive.
489  * the threshold is so that the density of probability for the last cut-off max
490  * value is exp(-threshold).
491  */
492 static int64
493 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
494 {
495         double          cut,
496                                 uniform,
497                                 rand;
498
499         Assert(threshold > 0.0);
500         cut = exp(-threshold);
501         /* erand in [0, 1), uniform in (0, 1] */
502         uniform = 1.0 - pg_erand48(thread->random_state);
503
504         /*
505          * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1)
506          */
507         Assert((1.0 - cut) != 0.0);
508         rand = -log(cut + (1.0 - cut) * uniform) / threshold;
509         /* return int64 random number within between min and max */
510         return min + (int64) ((max - min + 1) * rand);
511 }
512
513 /* random number generator: gaussian distribution from min to max inclusive */
514 static int64
515 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
516 {
517         double          stdev;
518         double          rand;
519
520         /*
521          * Get user specified random number from this loop, with -threshold <
522          * stdev <= threshold
523          *
524          * This loop is executed until the number is in the expected range.
525          *
526          * As the minimum threshold is 2.0, the probability of looping is low:
527          * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
528          * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
529          * the worst case. For a 5.0 threshold value, the looping probability is
530          * about e^{-5} * 2 / pi ~ 0.43%.
531          */
532         do
533         {
534                 /*
535                  * pg_erand48 generates [0,1), but for the basic version of the
536                  * Box-Muller transform the two uniformly distributed random numbers
537                  * are expected in (0, 1] (see
538                  * http://en.wikipedia.org/wiki/Box_muller)
539                  */
540                 double          rand1 = 1.0 - pg_erand48(thread->random_state);
541                 double          rand2 = 1.0 - pg_erand48(thread->random_state);
542
543                 /* Box-Muller basic form transform */
544                 double          var_sqrt = sqrt(-2.0 * log(rand1));
545
546                 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
547
548                 /*
549                  * we may try with cos, but there may be a bias induced if the
550                  * previous value fails the test. To be on the safe side, let us try
551                  * over.
552                  */
553         }
554         while (stdev < -threshold || stdev >= threshold);
555
556         /* stdev is in [-threshold, threshold), normalization to [0,1) */
557         rand = (stdev + threshold) / (threshold * 2.0);
558
559         /* return int64 random number within between min and max */
560         return min + (int64) ((max - min + 1) * rand);
561 }
562
563 /*
564  * random number generator: generate a value, such that the series of values
565  * will approximate a Poisson distribution centered on the given value.
566  */
567 static int64
568 getPoissonRand(TState *thread, int64 center)
569 {
570         /*
571          * Use inverse transform sampling to generate a value > 0, such that the
572          * expected (i.e. average) value is the given argument.
573          */
574         double          uniform;
575
576         /* erand in [0, 1), uniform in (0, 1] */
577         uniform = 1.0 - pg_erand48(thread->random_state);
578
579         return (int64) (-log(uniform) * ((double) center) + 0.5);
580 }
581
582 /* call PQexec() and exit() on failure */
583 static void
584 executeStatement(PGconn *con, const char *sql)
585 {
586         PGresult   *res;
587
588         res = PQexec(con, sql);
589         if (PQresultStatus(res) != PGRES_COMMAND_OK)
590         {
591                 fprintf(stderr, "%s", PQerrorMessage(con));
592                 exit(1);
593         }
594         PQclear(res);
595 }
596
597 /* call PQexec() and complain, but without exiting, on failure */
598 static void
599 tryExecuteStatement(PGconn *con, const char *sql)
600 {
601         PGresult   *res;
602
603         res = PQexec(con, sql);
604         if (PQresultStatus(res) != PGRES_COMMAND_OK)
605         {
606                 fprintf(stderr, "%s", PQerrorMessage(con));
607                 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
608         }
609         PQclear(res);
610 }
611
612 /* set up a connection to the backend */
613 static PGconn *
614 doConnect(void)
615 {
616         PGconn     *conn;
617         static char *password = NULL;
618         bool            new_pass;
619
620         /*
621          * Start the connection.  Loop until we have a password if requested by
622          * backend.
623          */
624         do
625         {
626 #define PARAMS_ARRAY_SIZE       7
627
628                 const char *keywords[PARAMS_ARRAY_SIZE];
629                 const char *values[PARAMS_ARRAY_SIZE];
630
631                 keywords[0] = "host";
632                 values[0] = pghost;
633                 keywords[1] = "port";
634                 values[1] = pgport;
635                 keywords[2] = "user";
636                 values[2] = login;
637                 keywords[3] = "password";
638                 values[3] = password;
639                 keywords[4] = "dbname";
640                 values[4] = dbName;
641                 keywords[5] = "fallback_application_name";
642                 values[5] = progname;
643                 keywords[6] = NULL;
644                 values[6] = NULL;
645
646                 new_pass = false;
647
648                 conn = PQconnectdbParams(keywords, values, true);
649
650                 if (!conn)
651                 {
652                         fprintf(stderr, "connection to database \"%s\" failed\n",
653                                         dbName);
654                         return NULL;
655                 }
656
657                 if (PQstatus(conn) == CONNECTION_BAD &&
658                         PQconnectionNeedsPassword(conn) &&
659                         password == NULL)
660                 {
661                         PQfinish(conn);
662                         password = simple_prompt("Password: ", 100, false);
663                         new_pass = true;
664                 }
665         } while (new_pass);
666
667         /* check to see that the backend connection was successfully made */
668         if (PQstatus(conn) == CONNECTION_BAD)
669         {
670                 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
671                                 dbName, PQerrorMessage(conn));
672                 PQfinish(conn);
673                 return NULL;
674         }
675
676         return conn;
677 }
678
679 /* throw away response from backend */
680 static void
681 discard_response(CState *state)
682 {
683         PGresult   *res;
684
685         do
686         {
687                 res = PQgetResult(state->con);
688                 if (res)
689                         PQclear(res);
690         } while (res);
691 }
692
693 static int
694 compareVariables(const void *v1, const void *v2)
695 {
696         return strcmp(((const Variable *) v1)->name,
697                                   ((const Variable *) v2)->name);
698 }
699
700 static char *
701 getVariable(CState *st, char *name)
702 {
703         Variable        key,
704                            *var;
705
706         /* On some versions of Solaris, bsearch of zero items dumps core */
707         if (st->nvariables <= 0)
708                 return NULL;
709
710         key.name = name;
711         var = (Variable *) bsearch((void *) &key,
712                                                            (void *) st->variables,
713                                                            st->nvariables,
714                                                            sizeof(Variable),
715                                                            compareVariables);
716         if (var != NULL)
717                 return var->value;
718         else
719                 return NULL;
720 }
721
722 /* check whether the name consists of alphabets, numerals and underscores. */
723 static bool
724 isLegalVariableName(const char *name)
725 {
726         int                     i;
727
728         for (i = 0; name[i] != '\0'; i++)
729         {
730                 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
731                         return false;
732         }
733
734         return true;
735 }
736
737 static int
738 putVariable(CState *st, const char *context, char *name, char *value)
739 {
740         Variable        key,
741                            *var;
742
743         key.name = name;
744         /* On some versions of Solaris, bsearch of zero items dumps core */
745         if (st->nvariables > 0)
746                 var = (Variable *) bsearch((void *) &key,
747                                                                    (void *) st->variables,
748                                                                    st->nvariables,
749                                                                    sizeof(Variable),
750                                                                    compareVariables);
751         else
752                 var = NULL;
753
754         if (var == NULL)
755         {
756                 Variable   *newvars;
757
758                 /*
759                  * Check for the name only when declaring a new variable to avoid
760                  * overhead.
761                  */
762                 if (!isLegalVariableName(name))
763                 {
764                         fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
765                                         context, name);
766                         return false;
767                 }
768
769                 if (st->variables)
770                         newvars = (Variable *) pg_realloc(st->variables,
771                                                                         (st->nvariables + 1) * sizeof(Variable));
772                 else
773                         newvars = (Variable *) pg_malloc(sizeof(Variable));
774
775                 st->variables = newvars;
776
777                 var = &newvars[st->nvariables];
778
779                 var->name = pg_strdup(name);
780                 var->value = pg_strdup(value);
781
782                 st->nvariables++;
783
784                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
785                           compareVariables);
786         }
787         else
788         {
789                 char       *val;
790
791                 /* dup then free, in case value is pointing at this variable */
792                 val = pg_strdup(value);
793
794                 free(var->value);
795                 var->value = val;
796         }
797
798         return true;
799 }
800
801 static char *
802 parseVariable(const char *sql, int *eaten)
803 {
804         int                     i = 0;
805         char       *name;
806
807         do
808         {
809                 i++;
810         } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
811         if (i == 1)
812                 return NULL;
813
814         name = pg_malloc(i);
815         memcpy(name, &sql[1], i - 1);
816         name[i - 1] = '\0';
817
818         *eaten = i;
819         return name;
820 }
821
822 static char *
823 replaceVariable(char **sql, char *param, int len, char *value)
824 {
825         int                     valueln = strlen(value);
826
827         if (valueln > len)
828         {
829                 size_t          offset = param - *sql;
830
831                 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
832                 param = *sql + offset;
833         }
834
835         if (valueln != len)
836                 memmove(param + valueln, param + len, strlen(param + len) + 1);
837         memcpy(param, value, valueln);
838
839         return param + valueln;
840 }
841
842 static char *
843 assignVariables(CState *st, char *sql)
844 {
845         char       *p,
846                            *name,
847                            *val;
848
849         p = sql;
850         while ((p = strchr(p, ':')) != NULL)
851         {
852                 int                     eaten;
853
854                 name = parseVariable(p, &eaten);
855                 if (name == NULL)
856                 {
857                         while (*p == ':')
858                         {
859                                 p++;
860                         }
861                         continue;
862                 }
863
864                 val = getVariable(st, name);
865                 free(name);
866                 if (val == NULL)
867                 {
868                         p++;
869                         continue;
870                 }
871
872                 p = replaceVariable(&sql, p, eaten, val);
873         }
874
875         return sql;
876 }
877
878 static void
879 getQueryParams(CState *st, const Command *command, const char **params)
880 {
881         int                     i;
882
883         for (i = 0; i < command->argc - 1; i++)
884                 params[i] = getVariable(st, command->argv[i + 1]);
885 }
886
887 /*
888  * Recursive evaluation of an expression in a pgbench script
889  * using the current state of variables.
890  * Returns whether the evaluation was ok,
891  * the value itself is returned through the retval pointer.
892  */
893 static bool
894 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
895 {
896         switch (expr->etype)
897         {
898                 case ENODE_INTEGER_CONSTANT:
899                         {
900                                 *retval = expr->u.integer_constant.ival;
901                                 return true;
902                         }
903
904                 case ENODE_VARIABLE:
905                         {
906                                 char       *var;
907
908                                 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
909                                 {
910                                         fprintf(stderr, "undefined variable \"%s\"\n",
911                                                         expr->u.variable.varname);
912                                         return false;
913                                 }
914                                 *retval = strtoint64(var);
915                                 return true;
916                         }
917
918                 case ENODE_OPERATOR:
919                         {
920                                 int64           lval;
921                                 int64           rval;
922
923                                 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
924                                         return false;
925                                 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
926                                         return false;
927                                 switch (expr->u.operator.operator)
928                                 {
929                                         case '+':
930                                                 *retval = lval + rval;
931                                                 return true;
932
933                                         case '-':
934                                                 *retval = lval - rval;
935                                                 return true;
936
937                                         case '*':
938                                                 *retval = lval * rval;
939                                                 return true;
940
941                                         case '/':
942                                                 if (rval == 0)
943                                                 {
944                                                         fprintf(stderr, "division by zero\n");
945                                                         return false;
946                                                 }
947                                                 *retval = lval / rval;
948                                                 return true;
949
950                                         case '%':
951                                                 if (rval == 0)
952                                                 {
953                                                         fprintf(stderr, "division by zero\n");
954                                                         return false;
955                                                 }
956                                                 *retval = lval % rval;
957                                                 return true;
958                                 }
959
960                                 fprintf(stderr, "bad operator\n");
961                                 return false;
962                         }
963
964                 default:
965                         break;
966         }
967
968         fprintf(stderr, "bad expression\n");
969         return false;
970 }
971
972 /*
973  * Run a shell command. The result is assigned to the variable if not NULL.
974  * Return true if succeeded, or false on error.
975  */
976 static bool
977 runShellCommand(CState *st, char *variable, char **argv, int argc)
978 {
979         char            command[SHELL_COMMAND_SIZE];
980         int                     i,
981                                 len = 0;
982         FILE       *fp;
983         char            res[64];
984         char       *endptr;
985         int                     retval;
986
987         /*----------
988          * Join arguments with whitespace separators. Arguments starting with
989          * exactly one colon are treated as variables:
990          *      name - append a string "name"
991          *      :var - append a variable named 'var'
992          *      ::name - append a string ":name"
993          *----------
994          */
995         for (i = 0; i < argc; i++)
996         {
997                 char       *arg;
998                 int                     arglen;
999
1000                 if (argv[i][0] != ':')
1001                 {
1002                         arg = argv[i];          /* a string literal */
1003                 }
1004                 else if (argv[i][1] == ':')
1005                 {
1006                         arg = argv[i] + 1;      /* a string literal starting with colons */
1007                 }
1008                 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1009                 {
1010                         fprintf(stderr, "%s: undefined variable \"%s\"\n",
1011                                         argv[0], argv[i]);
1012                         return false;
1013                 }
1014
1015                 arglen = strlen(arg);
1016                 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1017                 {
1018                         fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1019                         return false;
1020                 }
1021
1022                 if (i > 0)
1023                         command[len++] = ' ';
1024                 memcpy(command + len, arg, arglen);
1025                 len += arglen;
1026         }
1027
1028         command[len] = '\0';
1029
1030         /* Fast path for non-assignment case */
1031         if (variable == NULL)
1032         {
1033                 if (system(command))
1034                 {
1035                         if (!timer_exceeded)
1036                                 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1037                         return false;
1038                 }
1039                 return true;
1040         }
1041
1042         /* Execute the command with pipe and read the standard output. */
1043         if ((fp = popen(command, "r")) == NULL)
1044         {
1045                 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1046                 return false;
1047         }
1048         if (fgets(res, sizeof(res), fp) == NULL)
1049         {
1050                 if (!timer_exceeded)
1051                         fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1052                 (void) pclose(fp);
1053                 return false;
1054         }
1055         if (pclose(fp) < 0)
1056         {
1057                 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1058                 return false;
1059         }
1060
1061         /* Check whether the result is an integer and assign it to the variable */
1062         retval = (int) strtol(res, &endptr, 10);
1063         while (*endptr != '\0' && isspace((unsigned char) *endptr))
1064                 endptr++;
1065         if (*res == '\0' || *endptr != '\0')
1066         {
1067                 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1068                                 argv[0], res);
1069                 return false;
1070         }
1071         snprintf(res, sizeof(res), "%d", retval);
1072         if (!putVariable(st, "setshell", variable, res))
1073                 return false;
1074
1075 #ifdef DEBUG
1076         printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1077 #endif
1078         return true;
1079 }
1080
1081 #define MAX_PREPARE_NAME                32
1082 static void
1083 preparedStatementName(char *buffer, int file, int state)
1084 {
1085         sprintf(buffer, "P%d_%d", file, state);
1086 }
1087
1088 static bool
1089 clientDone(CState *st, bool ok)
1090 {
1091         (void) ok;                                      /* unused */
1092
1093         if (st->con != NULL)
1094         {
1095                 PQfinish(st->con);
1096                 st->con = NULL;
1097         }
1098         return false;                           /* always false */
1099 }
1100
1101 static void
1102 agg_vals_init(AggVals *aggs, instr_time start)
1103 {
1104         /* basic counters */
1105         aggs->cnt = 0;                          /* number of transactions (includes skipped) */
1106         aggs->skipped = 0;                      /* xacts skipped under --rate --latency-limit */
1107
1108         aggs->sum_latency = 0;          /* SUM(latency) */
1109         aggs->sum2_latency = 0;         /* SUM(latency*latency) */
1110
1111         /* min and max transaction duration */
1112         aggs->min_latency = 0;
1113         aggs->max_latency = 0;
1114
1115         /* schedule lag counters */
1116         aggs->sum_lag = 0;
1117         aggs->sum2_lag = 0;
1118         aggs->min_lag = 0;
1119         aggs->max_lag = 0;
1120
1121         /* start of the current interval */
1122         aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1123 }
1124
1125 /* return false iff client should be disconnected */
1126 static bool
1127 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1128 {
1129         PGresult   *res;
1130         Command   **commands;
1131         bool            trans_needs_throttle = false;
1132         instr_time      now;
1133
1134         /*
1135          * gettimeofday() isn't free, so we get the current timestamp lazily the
1136          * first time it's needed, and reuse the same value throughout this
1137          * function after that. This also ensures that e.g. the calculated latency
1138          * reported in the log file and in the totals are the same. Zero means
1139          * "not set yet". Reset "now" when we step to the next command with "goto
1140          * top", though.
1141          */
1142 top:
1143         INSTR_TIME_SET_ZERO(now);
1144
1145         commands = sql_files[st->use_file];
1146
1147         /*
1148          * Handle throttling once per transaction by sleeping.  It is simpler to
1149          * do this here rather than at the end, because so much complicated logic
1150          * happens below when statements finish.
1151          */
1152         if (throttle_delay && !st->is_throttled)
1153         {
1154                 /*
1155                  * Generate a delay such that the series of delays will approximate a
1156                  * Poisson distribution centered on the throttle_delay time.
1157                  *
1158                  * If transactions are too slow or a given wait is shorter than a
1159                  * transaction, the next transaction will start right away.
1160                  */
1161                 int64           wait = getPoissonRand(thread, throttle_delay);
1162
1163                 thread->throttle_trigger += wait;
1164                 st->txn_scheduled = thread->throttle_trigger;
1165
1166                 /*
1167                  * If this --latency-limit is used, and this slot is already late so
1168                  * that the transaction will miss the latency limit even if it
1169                  * completed immediately, we skip this time slot and iterate till the
1170                  * next slot that isn't late yet.
1171                  */
1172                 if (latency_limit)
1173                 {
1174                         int64           now_us;
1175
1176                         if (INSTR_TIME_IS_ZERO(now))
1177                                 INSTR_TIME_SET_CURRENT(now);
1178                         now_us = INSTR_TIME_GET_MICROSEC(now);
1179                         while (thread->throttle_trigger < now_us - latency_limit)
1180                         {
1181                                 thread->throttle_latency_skipped++;
1182
1183                                 if (logfile)
1184                                         doLog(thread, st, logfile, &now, agg, true);
1185
1186                                 wait = getPoissonRand(thread, throttle_delay);
1187                                 thread->throttle_trigger += wait;
1188                                 st->txn_scheduled = thread->throttle_trigger;
1189                         }
1190                 }
1191
1192                 st->sleeping = 1;
1193                 st->throttling = true;
1194                 st->is_throttled = true;
1195                 if (debug)
1196                         fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1197                                         st->id, wait);
1198         }
1199
1200         if (st->sleeping)
1201         {                                                       /* are we sleeping? */
1202                 int64           now_us;
1203
1204                 if (INSTR_TIME_IS_ZERO(now))
1205                         INSTR_TIME_SET_CURRENT(now);
1206                 now_us = INSTR_TIME_GET_MICROSEC(now);
1207                 if (st->txn_scheduled <= now_us)
1208                 {
1209                         st->sleeping = 0;       /* Done sleeping, go ahead with next command */
1210                         if (st->throttling)
1211                         {
1212                                 /* Measure lag of throttled transaction relative to target */
1213                                 int64           lag = now_us - st->txn_scheduled;
1214
1215                                 thread->throttle_lag += lag;
1216                                 if (lag > thread->throttle_lag_max)
1217                                         thread->throttle_lag_max = lag;
1218                                 st->throttling = false;
1219                         }
1220                 }
1221                 else
1222                         return true;            /* Still sleeping, nothing to do here */
1223         }
1224
1225         if (st->listen)
1226         {                                                       /* are we receiver? */
1227                 if (commands[st->state]->type == SQL_COMMAND)
1228                 {
1229                         if (debug)
1230                                 fprintf(stderr, "client %d receiving\n", st->id);
1231                         if (!PQconsumeInput(st->con))
1232                         {                                       /* there's something wrong */
1233                                 fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1234                                 return clientDone(st, false);
1235                         }
1236                         if (PQisBusy(st->con))
1237                                 return true;    /* don't have the whole result yet */
1238                 }
1239
1240                 /*
1241                  * command finished: accumulate per-command execution times in
1242                  * thread-local data structure, if per-command latencies are requested
1243                  */
1244                 if (is_latencies)
1245                 {
1246                         int                     cnum = commands[st->state]->command_num;
1247
1248                         if (INSTR_TIME_IS_ZERO(now))
1249                                 INSTR_TIME_SET_CURRENT(now);
1250                         INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1251                                                                   now, st->stmt_begin);
1252                         thread->exec_count[cnum]++;
1253                 }
1254
1255                 /* transaction finished: calculate latency and log the transaction */
1256                 if (commands[st->state + 1] == NULL)
1257                 {
1258                         /* only calculate latency if an option is used that needs it */
1259                         if (progress || throttle_delay || latency_limit)
1260                         {
1261                                 int64           latency;
1262
1263                                 if (INSTR_TIME_IS_ZERO(now))
1264                                         INSTR_TIME_SET_CURRENT(now);
1265
1266                                 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1267
1268                                 st->txn_latencies += latency;
1269
1270                                 /*
1271                                  * XXX In a long benchmark run of high-latency transactions,
1272                                  * this int64 addition eventually overflows.  For example, 100
1273                                  * threads running 10s transactions will overflow it in 2.56
1274                                  * hours.  With a more-typical OLTP workload of .1s
1275                                  * transactions, overflow would take 256 hours.
1276                                  */
1277                                 st->txn_sqlats += latency * latency;
1278
1279                                 /* record over the limit transactions if needed. */
1280                                 if (latency_limit && latency > latency_limit)
1281                                         thread->latency_late++;
1282                         }
1283
1284                         /* record the time it took in the log */
1285                         if (logfile)
1286                                 doLog(thread, st, logfile, &now, agg, false);
1287                 }
1288
1289                 if (commands[st->state]->type == SQL_COMMAND)
1290                 {
1291                         /*
1292                          * Read and discard the query result; note this is not included in
1293                          * the statement latency numbers.
1294                          */
1295                         res = PQgetResult(st->con);
1296                         switch (PQresultStatus(res))
1297                         {
1298                                 case PGRES_COMMAND_OK:
1299                                 case PGRES_TUPLES_OK:
1300                                         break;          /* OK */
1301                                 default:
1302                                         fprintf(stderr, "client %d aborted in state %d: %s",
1303                                                         st->id, st->state, PQerrorMessage(st->con));
1304                                         PQclear(res);
1305                                         return clientDone(st, false);
1306                         }
1307                         PQclear(res);
1308                         discard_response(st);
1309                 }
1310
1311                 if (commands[st->state + 1] == NULL)
1312                 {
1313                         if (is_connect)
1314                         {
1315                                 PQfinish(st->con);
1316                                 st->con = NULL;
1317                         }
1318
1319                         ++st->cnt;
1320                         if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1321                                 return clientDone(st, true);    /* exit success */
1322                 }
1323
1324                 /* increment state counter */
1325                 st->state++;
1326                 if (commands[st->state] == NULL)
1327                 {
1328                         st->state = 0;
1329                         st->use_file = (int) getrand(thread, 0, num_files - 1);
1330                         commands = sql_files[st->use_file];
1331                         st->is_throttled = false;
1332
1333                         /*
1334                          * No transaction is underway anymore, which means there is
1335                          * nothing to listen to right now.  When throttling rate limits
1336                          * are active, a sleep will happen next, as the next transaction
1337                          * starts.  And then in any case the next SQL command will set
1338                          * listen back to 1.
1339                          */
1340                         st->listen = 0;
1341                         trans_needs_throttle = (throttle_delay > 0);
1342                 }
1343         }
1344
1345         if (st->con == NULL)
1346         {
1347                 instr_time      start,
1348                                         end;
1349
1350                 INSTR_TIME_SET_CURRENT(start);
1351                 if ((st->con = doConnect()) == NULL)
1352                 {
1353                         fprintf(stderr, "client %d aborted while establishing connection\n",
1354                                         st->id);
1355                         return clientDone(st, false);
1356                 }
1357                 INSTR_TIME_SET_CURRENT(end);
1358                 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1359         }
1360
1361         /*
1362          * This ensures that a throttling delay is inserted before proceeding with
1363          * sql commands, after the first transaction. The first transaction
1364          * throttling is performed when first entering doCustom.
1365          */
1366         if (trans_needs_throttle)
1367         {
1368                 trans_needs_throttle = false;
1369                 goto top;
1370         }
1371
1372         /* Record transaction start time under logging, progress or throttling */
1373         if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1374         {
1375                 INSTR_TIME_SET_CURRENT(st->txn_begin);
1376
1377                 /*
1378                  * When not throttling, this is also the transaction's scheduled start
1379                  * time.
1380                  */
1381                 if (!throttle_delay)
1382                         st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1383         }
1384
1385         /* Record statement start time if per-command latencies are requested */
1386         if (is_latencies)
1387                 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1388
1389         if (commands[st->state]->type == SQL_COMMAND)
1390         {
1391                 const Command *command = commands[st->state];
1392                 int                     r;
1393
1394                 if (querymode == QUERY_SIMPLE)
1395                 {
1396                         char       *sql;
1397
1398                         sql = pg_strdup(command->argv[0]);
1399                         sql = assignVariables(st, sql);
1400
1401                         if (debug)
1402                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1403                         r = PQsendQuery(st->con, sql);
1404                         free(sql);
1405                 }
1406                 else if (querymode == QUERY_EXTENDED)
1407                 {
1408                         const char *sql = command->argv[0];
1409                         const char *params[MAX_ARGS];
1410
1411                         getQueryParams(st, command, params);
1412
1413                         if (debug)
1414                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1415                         r = PQsendQueryParams(st->con, sql, command->argc - 1,
1416                                                                   NULL, params, NULL, NULL, 0);
1417                 }
1418                 else if (querymode == QUERY_PREPARED)
1419                 {
1420                         char            name[MAX_PREPARE_NAME];
1421                         const char *params[MAX_ARGS];
1422
1423                         if (!st->prepared[st->use_file])
1424                         {
1425                                 int                     j;
1426
1427                                 for (j = 0; commands[j] != NULL; j++)
1428                                 {
1429                                         PGresult   *res;
1430                                         char            name[MAX_PREPARE_NAME];
1431
1432                                         if (commands[j]->type != SQL_COMMAND)
1433                                                 continue;
1434                                         preparedStatementName(name, st->use_file, j);
1435                                         res = PQprepare(st->con, name,
1436                                                   commands[j]->argv[0], commands[j]->argc - 1, NULL);
1437                                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1438                                                 fprintf(stderr, "%s", PQerrorMessage(st->con));
1439                                         PQclear(res);
1440                                 }
1441                                 st->prepared[st->use_file] = true;
1442                         }
1443
1444                         getQueryParams(st, command, params);
1445                         preparedStatementName(name, st->use_file, st->state);
1446
1447                         if (debug)
1448                                 fprintf(stderr, "client %d sending %s\n", st->id, name);
1449                         r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1450                                                                         params, NULL, NULL, 0);
1451                 }
1452                 else    /* unknown sql mode */
1453                         r = 0;
1454
1455                 if (r == 0)
1456                 {
1457                         if (debug)
1458                                 fprintf(stderr, "client %d could not send %s\n",
1459                                                 st->id, command->argv[0]);
1460                         st->ecnt++;
1461                 }
1462                 else
1463                         st->listen = 1;         /* flags that should be listened */
1464         }
1465         else if (commands[st->state]->type == META_COMMAND)
1466         {
1467                 int                     argc = commands[st->state]->argc,
1468                                         i;
1469                 char      **argv = commands[st->state]->argv;
1470
1471                 if (debug)
1472                 {
1473                         fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1474                         for (i = 1; i < argc; i++)
1475                                 fprintf(stderr, " %s", argv[i]);
1476                         fprintf(stderr, "\n");
1477                 }
1478
1479                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1480                 {
1481                         char       *var;
1482                         int64           min,
1483                                                 max;
1484                         double          threshold = 0;
1485                         char            res[64];
1486
1487                         if (*argv[2] == ':')
1488                         {
1489                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1490                                 {
1491                                         fprintf(stderr, "%s: undefined variable \"%s\"\n",
1492                                                         argv[0], argv[2]);
1493                                         st->ecnt++;
1494                                         return true;
1495                                 }
1496                                 min = strtoint64(var);
1497                         }
1498                         else
1499                                 min = strtoint64(argv[2]);
1500
1501                         if (*argv[3] == ':')
1502                         {
1503                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1504                                 {
1505                                         fprintf(stderr, "%s: undefined variable \"%s\"\n",
1506                                                         argv[0], argv[3]);
1507                                         st->ecnt++;
1508                                         return true;
1509                                 }
1510                                 max = strtoint64(var);
1511                         }
1512                         else
1513                                 max = strtoint64(argv[3]);
1514
1515                         if (max < min)
1516                         {
1517                                 fprintf(stderr, "%s: \\setrandom maximum is less than minimum\n",
1518                                                 argv[0]);
1519                                 st->ecnt++;
1520                                 return true;
1521                         }
1522
1523                         /*
1524                          * Generate random number functions need to be able to subtract
1525                          * max from min and add one to the result without overflowing.
1526                          * Since we know max > min, we can detect overflow just by
1527                          * checking for a negative result. But we must check both that the
1528                          * subtraction doesn't overflow, and that adding one to the result
1529                          * doesn't overflow either.
1530                          */
1531                         if (max - min < 0 || (max - min) + 1 < 0)
1532                         {
1533                                 fprintf(stderr, "%s: \\setrandom range is too large\n",
1534                                                 argv[0]);
1535                                 st->ecnt++;
1536                                 return true;
1537                         }
1538
1539                         if (argc == 4 ||        /* uniform without or with "uniform" keyword */
1540                                 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1541                         {
1542 #ifdef DEBUG
1543                                 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1544 #endif
1545                                 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1546                         }
1547                         else if (argc == 6 &&
1548                                          ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1549                                           (pg_strcasecmp(argv[4], "exponential") == 0)))
1550                         {
1551                                 if (*argv[5] == ':')
1552                                 {
1553                                         if ((var = getVariable(st, argv[5] + 1)) == NULL)
1554                                         {
1555                                                 fprintf(stderr, "%s: invalid threshold number: \"%s\"\n",
1556                                                                 argv[0], argv[5]);
1557                                                 st->ecnt++;
1558                                                 return true;
1559                                         }
1560                                         threshold = strtod(var, NULL);
1561                                 }
1562                                 else
1563                                         threshold = strtod(argv[5], NULL);
1564
1565                                 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1566                                 {
1567                                         if (threshold < MIN_GAUSSIAN_THRESHOLD)
1568                                         {
1569                                                 fprintf(stderr, "gaussian threshold must be at least %f (not \"%s\")\n", MIN_GAUSSIAN_THRESHOLD, argv[5]);
1570                                                 st->ecnt++;
1571                                                 return true;
1572                                         }
1573 #ifdef DEBUG
1574                                         printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1575 #endif
1576                                         snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1577                                 }
1578                                 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1579                                 {
1580                                         if (threshold <= 0.0)
1581                                         {
1582                                                 fprintf(stderr, "exponential threshold must be greater than zero (not \"%s\")\n", argv[5]);
1583                                                 st->ecnt++;
1584                                                 return true;
1585                                         }
1586 #ifdef DEBUG
1587                                         printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1588 #endif
1589                                         snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1590                                 }
1591                         }
1592                         else    /* this means an error somewhere in the parsing phase... */
1593                         {
1594                                 fprintf(stderr, "%s: invalid arguments for \\setrandom\n",
1595                                                 argv[0]);
1596                                 st->ecnt++;
1597                                 return true;
1598                         }
1599
1600                         if (!putVariable(st, argv[0], argv[1], res))
1601                         {
1602                                 st->ecnt++;
1603                                 return true;
1604                         }
1605
1606                         st->listen = 1;
1607                 }
1608                 else if (pg_strcasecmp(argv[0], "set") == 0)
1609                 {
1610                         char            res[64];
1611                         PgBenchExpr *expr = commands[st->state]->expr;
1612                         int64           result;
1613
1614                         if (!evaluateExpr(st, expr, &result))
1615                         {
1616                                 st->ecnt++;
1617                                 return true;
1618                         }
1619                         sprintf(res, INT64_FORMAT, result);
1620
1621                         if (!putVariable(st, argv[0], argv[1], res))
1622                         {
1623                                 st->ecnt++;
1624                                 return true;
1625                         }
1626
1627                         st->listen = 1;
1628                 }
1629                 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1630                 {
1631                         char       *var;
1632                         int                     usec;
1633                         instr_time      now;
1634
1635                         if (*argv[1] == ':')
1636                         {
1637                                 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1638                                 {
1639                                         fprintf(stderr, "%s: undefined variable \"%s\"\n",
1640                                                         argv[0], argv[1]);
1641                                         st->ecnt++;
1642                                         return true;
1643                                 }
1644                                 usec = atoi(var);
1645                         }
1646                         else
1647                                 usec = atoi(argv[1]);
1648
1649                         if (argc > 2)
1650                         {
1651                                 if (pg_strcasecmp(argv[2], "ms") == 0)
1652                                         usec *= 1000;
1653                                 else if (pg_strcasecmp(argv[2], "s") == 0)
1654                                         usec *= 1000000;
1655                         }
1656                         else
1657                                 usec *= 1000000;
1658
1659                         INSTR_TIME_SET_CURRENT(now);
1660                         st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1661                         st->sleeping = 1;
1662
1663                         st->listen = 1;
1664                 }
1665                 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1666                 {
1667                         bool            ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1668
1669                         if (timer_exceeded) /* timeout */
1670                                 return clientDone(st, true);
1671                         else if (!ret)          /* on error */
1672                         {
1673                                 st->ecnt++;
1674                                 return true;
1675                         }
1676                         else    /* succeeded */
1677                                 st->listen = 1;
1678                 }
1679                 else if (pg_strcasecmp(argv[0], "shell") == 0)
1680                 {
1681                         bool            ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1682
1683                         if (timer_exceeded) /* timeout */
1684                                 return clientDone(st, true);
1685                         else if (!ret)          /* on error */
1686                         {
1687                                 st->ecnt++;
1688                                 return true;
1689                         }
1690                         else    /* succeeded */
1691                                 st->listen = 1;
1692                 }
1693                 goto top;
1694         }
1695
1696         return true;
1697 }
1698
1699 /*
1700  * print log entry after completing one transaction.
1701  */
1702 static void
1703 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1704           bool skipped)
1705 {
1706         double          lag;
1707         double          latency;
1708
1709         /*
1710          * Skip the log entry if sampling is enabled and this row doesn't belong
1711          * to the random sample.
1712          */
1713         if (sample_rate != 0.0 &&
1714                 pg_erand48(thread->random_state) > sample_rate)
1715                 return;
1716
1717         if (INSTR_TIME_IS_ZERO(*now))
1718                 INSTR_TIME_SET_CURRENT(*now);
1719
1720         latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1721         if (skipped)
1722                 lag = latency;
1723         else
1724                 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1725
1726         /* should we aggregate the results or not? */
1727         if (agg_interval > 0)
1728         {
1729                 /*
1730                  * Are we still in the same interval? If yes, accumulate the values
1731                  * (print them otherwise)
1732                  */
1733                 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1734                 {
1735                         agg->cnt += 1;
1736                         if (skipped)
1737                         {
1738                                 /*
1739                                  * there is no latency to record if the transaction was
1740                                  * skipped
1741                                  */
1742                                 agg->skipped += 1;
1743                         }
1744                         else
1745                         {
1746                                 agg->sum_latency += latency;
1747                                 agg->sum2_latency += latency * latency;
1748
1749                                 /* first in this aggregation interval */
1750                                 if ((agg->cnt == 1) || (latency < agg->min_latency))
1751                                         agg->min_latency = latency;
1752
1753                                 if ((agg->cnt == 1) || (latency > agg->max_latency))
1754                                         agg->max_latency = latency;
1755
1756                                 /* and the same for schedule lag */
1757                                 if (throttle_delay)
1758                                 {
1759                                         agg->sum_lag += lag;
1760                                         agg->sum2_lag += lag * lag;
1761
1762                                         if ((agg->cnt == 1) || (lag < agg->min_lag))
1763                                                 agg->min_lag = lag;
1764                                         if ((agg->cnt == 1) || (lag > agg->max_lag))
1765                                                 agg->max_lag = lag;
1766                                 }
1767                         }
1768                 }
1769                 else
1770                 {
1771                         /*
1772                          * Loop until we reach the interval of the current transaction
1773                          * (and print all the empty intervals in between).
1774                          */
1775                         while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1776                         {
1777                                 /*
1778                                  * This is a non-Windows branch (thanks to the ifdef in
1779                                  * usage), so we don't need to handle this in a special way
1780                                  * (see below).
1781                                  */
1782                                 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1783                                                 agg->start_time,
1784                                                 agg->cnt,
1785                                                 agg->sum_latency,
1786                                                 agg->sum2_latency,
1787                                                 agg->min_latency,
1788                                                 agg->max_latency);
1789                                 if (throttle_delay)
1790                                 {
1791                                         fprintf(logfile, " %.0f %.0f %.0f %.0f",
1792                                                         agg->sum_lag,
1793                                                         agg->sum2_lag,
1794                                                         agg->min_lag,
1795                                                         agg->max_lag);
1796                                         if (latency_limit)
1797                                                 fprintf(logfile, " %d", agg->skipped);
1798                                 }
1799                                 fputc('\n', logfile);
1800
1801                                 /* move to the next inteval */
1802                                 agg->start_time = agg->start_time + agg_interval;
1803
1804                                 /* reset for "no transaction" intervals */
1805                                 agg->cnt = 0;
1806                                 agg->skipped = 0;
1807                                 agg->min_latency = 0;
1808                                 agg->max_latency = 0;
1809                                 agg->sum_latency = 0;
1810                                 agg->sum2_latency = 0;
1811                                 agg->min_lag = 0;
1812                                 agg->max_lag = 0;
1813                                 agg->sum_lag = 0;
1814                                 agg->sum2_lag = 0;
1815                         }
1816
1817                         /* reset the values to include only the current transaction. */
1818                         agg->cnt = 1;
1819                         agg->skipped = skipped ? 1 : 0;
1820                         agg->min_latency = latency;
1821                         agg->max_latency = latency;
1822                         agg->sum_latency = skipped ? 0.0 : latency;
1823                         agg->sum2_latency = skipped ? 0.0 : latency * latency;
1824                         agg->min_lag = lag;
1825                         agg->max_lag = lag;
1826                         agg->sum_lag = lag;
1827                         agg->sum2_lag = lag * lag;
1828                 }
1829         }
1830         else
1831         {
1832                 /* no, print raw transactions */
1833 #ifndef WIN32
1834
1835                 /* This is more than we really ought to know about instr_time */
1836                 if (skipped)
1837                         fprintf(logfile, "%d %d skipped %d %ld %ld",
1838                                         st->id, st->cnt, st->use_file,
1839                                         (long) now->tv_sec, (long) now->tv_usec);
1840                 else
1841                         fprintf(logfile, "%d %d %.0f %d %ld %ld",
1842                                         st->id, st->cnt, latency, st->use_file,
1843                                         (long) now->tv_sec, (long) now->tv_usec);
1844 #else
1845
1846                 /* On Windows, instr_time doesn't provide a timestamp anyway */
1847                 if (skipped)
1848                         fprintf(logfile, "%d %d skipped %d 0 0",
1849                                         st->id, st->cnt, st->use_file);
1850                 else
1851                         fprintf(logfile, "%d %d %.0f %d 0 0",
1852                                         st->id, st->cnt, latency, st->use_file);
1853 #endif
1854                 if (throttle_delay)
1855                         fprintf(logfile, " %.0f", lag);
1856                 fputc('\n', logfile);
1857         }
1858 }
1859
1860 /* discard connections */
1861 static void
1862 disconnect_all(CState *state, int length)
1863 {
1864         int                     i;
1865
1866         for (i = 0; i < length; i++)
1867         {
1868                 if (state[i].con)
1869                 {
1870                         PQfinish(state[i].con);
1871                         state[i].con = NULL;
1872                 }
1873         }
1874 }
1875
1876 /* create tables and setup data */
1877 static void
1878 init(bool is_no_vacuum)
1879 {
1880 /*
1881  * The scale factor at/beyond which 32-bit integers are insufficient for
1882  * storing TPC-B account IDs.
1883  *
1884  * Although the actual threshold is 21474, we use 20000 because it is easier to
1885  * document and remember, and isn't that far away from the real threshold.
1886  */
1887 #define SCALE_32BIT_THRESHOLD 20000
1888
1889         /*
1890          * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1891          * fields in these table declarations were intended to comply with that.
1892          * The pgbench_accounts table complies with that because the "filler"
1893          * column is set to blank-padded empty string. But for all other tables
1894          * the columns default to NULL and so don't actually take any space.  We
1895          * could fix that by giving them non-null default values.  However, that
1896          * would completely break comparability of pgbench results with prior
1897          * versions. Since pgbench has never pretended to be fully TPC-B compliant
1898          * anyway, we stick with the historical behavior.
1899          */
1900         struct ddlinfo
1901         {
1902                 const char *table;              /* table name */
1903                 const char *smcols;             /* column decls if accountIDs are 32 bits */
1904                 const char *bigcols;    /* column decls if accountIDs are 64 bits */
1905                 int                     declare_fillfactor;
1906         };
1907         static const struct ddlinfo DDLs[] = {
1908                 {
1909                         "pgbench_history",
1910                         "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
1911                         "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1912                         0
1913                 },
1914                 {
1915                         "pgbench_tellers",
1916                         "tid int not null,bid int,tbalance int,filler char(84)",
1917                         "tid int not null,bid int,tbalance int,filler char(84)",
1918                         1
1919                 },
1920                 {
1921                         "pgbench_accounts",
1922                         "aid    int not null,bid int,abalance int,filler char(84)",
1923                         "aid bigint not null,bid int,abalance int,filler char(84)",
1924                         1
1925                 },
1926                 {
1927                         "pgbench_branches",
1928                         "bid int not null,bbalance int,filler char(88)",
1929                         "bid int not null,bbalance int,filler char(88)",
1930                         1
1931                 }
1932         };
1933         static const char *const DDLINDEXes[] = {
1934                 "alter table pgbench_branches add primary key (bid)",
1935                 "alter table pgbench_tellers add primary key (tid)",
1936                 "alter table pgbench_accounts add primary key (aid)"
1937         };
1938         static const char *const DDLKEYs[] = {
1939                 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1940                 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1941                 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1942                 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1943                 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1944         };
1945
1946         PGconn     *con;
1947         PGresult   *res;
1948         char            sql[256];
1949         int                     i;
1950         int64           k;
1951
1952         /* used to track elapsed time and estimate of the remaining time */
1953         instr_time      start,
1954                                 diff;
1955         double          elapsed_sec,
1956                                 remaining_sec;
1957         int                     log_interval = 1;
1958
1959         if ((con = doConnect()) == NULL)
1960                 exit(1);
1961
1962         for (i = 0; i < lengthof(DDLs); i++)
1963         {
1964                 char            opts[256];
1965                 char            buffer[256];
1966                 const struct ddlinfo *ddl = &DDLs[i];
1967                 const char *cols;
1968
1969                 /* Remove old table, if it exists. */
1970                 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1971                 executeStatement(con, buffer);
1972
1973                 /* Construct new create table statement. */
1974                 opts[0] = '\0';
1975                 if (ddl->declare_fillfactor)
1976                         snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1977                                          " with (fillfactor=%d)", fillfactor);
1978                 if (tablespace != NULL)
1979                 {
1980                         char       *escape_tablespace;
1981
1982                         escape_tablespace = PQescapeIdentifier(con, tablespace,
1983                                                                                                    strlen(tablespace));
1984                         snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1985                                          " tablespace %s", escape_tablespace);
1986                         PQfreemem(escape_tablespace);
1987                 }
1988
1989                 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1990
1991                 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1992                                  unlogged_tables ? " unlogged" : "",
1993                                  ddl->table, cols, opts);
1994
1995                 executeStatement(con, buffer);
1996         }
1997
1998         executeStatement(con, "begin");
1999
2000         for (i = 0; i < nbranches * scale; i++)
2001         {
2002                 /* "filler" column defaults to NULL */
2003                 snprintf(sql, sizeof(sql),
2004                                  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2005                                  i + 1);
2006                 executeStatement(con, sql);
2007         }
2008
2009         for (i = 0; i < ntellers * scale; i++)
2010         {
2011                 /* "filler" column defaults to NULL */
2012                 snprintf(sql, sizeof(sql),
2013                         "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2014                                  i + 1, i / ntellers + 1);
2015                 executeStatement(con, sql);
2016         }
2017
2018         executeStatement(con, "commit");
2019
2020         /*
2021          * fill the pgbench_accounts table with some data
2022          */
2023         fprintf(stderr, "creating tables...\n");
2024
2025         executeStatement(con, "begin");
2026         executeStatement(con, "truncate pgbench_accounts");
2027
2028         res = PQexec(con, "copy pgbench_accounts from stdin");
2029         if (PQresultStatus(res) != PGRES_COPY_IN)
2030         {
2031                 fprintf(stderr, "%s", PQerrorMessage(con));
2032                 exit(1);
2033         }
2034         PQclear(res);
2035
2036         INSTR_TIME_SET_CURRENT(start);
2037
2038         for (k = 0; k < (int64) naccounts * scale; k++)
2039         {
2040                 int64           j = k + 1;
2041
2042                 /* "filler" column defaults to blank padded empty string */
2043                 snprintf(sql, sizeof(sql),
2044                                  INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2045                                  j, k / naccounts + 1, 0);
2046                 if (PQputline(con, sql))
2047                 {
2048                         fprintf(stderr, "PQputline failed\n");
2049                         exit(1);
2050                 }
2051
2052                 /*
2053                  * If we want to stick with the original logging, print a message each
2054                  * 100k inserted rows.
2055                  */
2056                 if ((!use_quiet) && (j % 100000 == 0))
2057                 {
2058                         INSTR_TIME_SET_CURRENT(diff);
2059                         INSTR_TIME_SUBTRACT(diff, start);
2060
2061                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2062                         remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2063
2064                         fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2065                                         j, (int64) naccounts * scale,
2066                                         (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2067                                         elapsed_sec, remaining_sec);
2068                 }
2069                 /* let's not call the timing for each row, but only each 100 rows */
2070                 else if (use_quiet && (j % 100 == 0))
2071                 {
2072                         INSTR_TIME_SET_CURRENT(diff);
2073                         INSTR_TIME_SUBTRACT(diff, start);
2074
2075                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2076                         remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2077
2078                         /* have we reached the next interval (or end)? */
2079                         if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2080                         {
2081                                 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2082                                                 j, (int64) naccounts * scale,
2083                                                 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2084
2085                                 /* skip to the next interval */
2086                                 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2087                         }
2088                 }
2089
2090         }
2091         if (PQputline(con, "\\.\n"))
2092         {
2093                 fprintf(stderr, "very last PQputline failed\n");
2094                 exit(1);
2095         }
2096         if (PQendcopy(con))
2097         {
2098                 fprintf(stderr, "PQendcopy failed\n");
2099                 exit(1);
2100         }
2101         executeStatement(con, "commit");
2102
2103         /* vacuum */
2104         if (!is_no_vacuum)
2105         {
2106                 fprintf(stderr, "vacuum...\n");
2107                 executeStatement(con, "vacuum analyze pgbench_branches");
2108                 executeStatement(con, "vacuum analyze pgbench_tellers");
2109                 executeStatement(con, "vacuum analyze pgbench_accounts");
2110                 executeStatement(con, "vacuum analyze pgbench_history");
2111         }
2112
2113         /*
2114          * create indexes
2115          */
2116         fprintf(stderr, "set primary keys...\n");
2117         for (i = 0; i < lengthof(DDLINDEXes); i++)
2118         {
2119                 char            buffer[256];
2120
2121                 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2122
2123                 if (index_tablespace != NULL)
2124                 {
2125                         char       *escape_tablespace;
2126
2127                         escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2128                                                                                                    strlen(index_tablespace));
2129                         snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2130                                          " using index tablespace %s", escape_tablespace);
2131                         PQfreemem(escape_tablespace);
2132                 }
2133
2134                 executeStatement(con, buffer);
2135         }
2136
2137         /*
2138          * create foreign keys
2139          */
2140         if (foreign_keys)
2141         {
2142                 fprintf(stderr, "set foreign keys...\n");
2143                 for (i = 0; i < lengthof(DDLKEYs); i++)
2144                 {
2145                         executeStatement(con, DDLKEYs[i]);
2146                 }
2147         }
2148
2149         fprintf(stderr, "done.\n");
2150         PQfinish(con);
2151 }
2152
2153 /*
2154  * Parse the raw sql and replace :param to $n.
2155  */
2156 static bool
2157 parseQuery(Command *cmd, const char *raw_sql)
2158 {
2159         char       *sql,
2160                            *p;
2161
2162         sql = pg_strdup(raw_sql);
2163         cmd->argc = 1;
2164
2165         p = sql;
2166         while ((p = strchr(p, ':')) != NULL)
2167         {
2168                 char            var[12];
2169                 char       *name;
2170                 int                     eaten;
2171
2172                 name = parseVariable(p, &eaten);
2173                 if (name == NULL)
2174                 {
2175                         while (*p == ':')
2176                         {
2177                                 p++;
2178                         }
2179                         continue;
2180                 }
2181
2182                 if (cmd->argc >= MAX_ARGS)
2183                 {
2184                         fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2185                         pg_free(name);
2186                         return false;
2187                 }
2188
2189                 sprintf(var, "$%d", cmd->argc);
2190                 p = replaceVariable(&sql, p, eaten, var);
2191
2192                 cmd->argv[cmd->argc] = name;
2193                 cmd->argc++;
2194         }
2195
2196         cmd->argv[0] = sql;
2197         return true;
2198 }
2199
2200 void pg_attribute_noreturn()
2201 syntax_error(const char *source, const int lineno,
2202                          const char *line, const char *command,
2203                          const char *msg, const char *more, const int column)
2204 {
2205         fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2206         if (more != NULL)
2207                 fprintf(stderr, " (%s)", more);
2208         if (column != -1)
2209                 fprintf(stderr, " at column %d", column);
2210         fprintf(stderr, " in command \"%s\"\n", command);
2211         if (line != NULL)
2212         {
2213                 fprintf(stderr, "%s\n", line);
2214                 if (column != -1)
2215                 {
2216                         int                     i;
2217
2218                         for (i = 0; i < column - 1; i++)
2219                                 fprintf(stderr, " ");
2220                         fprintf(stderr, "^ error found here\n");
2221                 }
2222         }
2223         exit(1);
2224 }
2225
2226 /* Parse a command; return a Command struct, or NULL if it's a comment */
2227 static Command *
2228 process_commands(char *buf, const char *source, const int lineno)
2229 {
2230         const char      delim[] = " \f\n\r\t\v";
2231
2232         Command    *my_commands;
2233         int                     j;
2234         char       *p,
2235                            *tok;
2236
2237         /* Make the string buf end at the next newline */
2238         if ((p = strchr(buf, '\n')) != NULL)
2239                 *p = '\0';
2240
2241         /* Skip leading whitespace */
2242         p = buf;
2243         while (isspace((unsigned char) *p))
2244                 p++;
2245
2246         /* If the line is empty or actually a comment, we're done */
2247         if (*p == '\0' || strncmp(p, "--", 2) == 0)
2248                 return NULL;
2249
2250         /* Allocate and initialize Command structure */
2251         my_commands = (Command *) pg_malloc(sizeof(Command));
2252         my_commands->line = pg_strdup(buf);
2253         my_commands->command_num = num_commands++;
2254         my_commands->type = 0;          /* until set */
2255         my_commands->argc = 0;
2256
2257         if (*p == '\\')
2258         {
2259                 int                     max_args = -1;
2260
2261                 my_commands->type = META_COMMAND;
2262
2263                 j = 0;
2264                 tok = strtok(++p, delim);
2265
2266                 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2267                         max_args = 2;
2268
2269                 while (tok != NULL)
2270                 {
2271                         my_commands->cols[j] = tok - buf + 1;
2272                         my_commands->argv[j++] = pg_strdup(tok);
2273                         my_commands->argc++;
2274                         if (max_args >= 0 && my_commands->argc >= max_args)
2275                                 tok = strtok(NULL, "");
2276                         else
2277                                 tok = strtok(NULL, delim);
2278                 }
2279
2280                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2281                 {
2282                         /*
2283                          * parsing: \setrandom variable min max [uniform] \setrandom
2284                          * variable min max (gaussian|exponential) threshold
2285                          */
2286
2287                         if (my_commands->argc < 4)
2288                         {
2289                                 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2290                                                          "missing arguments", NULL, -1);
2291                         }
2292
2293                         /* argc >= 4 */
2294
2295                         if (my_commands->argc == 4 ||           /* uniform without/with
2296                                                                                                  * "uniform" keyword */
2297                                 (my_commands->argc == 5 &&
2298                                  pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2299                         {
2300                                 /* nothing to do */
2301                         }
2302                         else if (                       /* argc >= 5 */
2303                                          (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2304                                    (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2305                         {
2306                                 if (my_commands->argc < 6)
2307                                 {
2308                                         syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2309                                          "missing threshold argument", my_commands->argv[4], -1);
2310                                 }
2311                                 else if (my_commands->argc > 6)
2312                                 {
2313                                         syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2314                                                                  "too many arguments", my_commands->argv[4],
2315                                                                  my_commands->cols[6]);
2316                                 }
2317                         }
2318                         else    /* cannot parse, unexpected arguments */
2319                         {
2320                                 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2321                                                          "unexpected argument", my_commands->argv[4],
2322                                                          my_commands->cols[4]);
2323                         }
2324                 }
2325                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2326                 {
2327                         if (my_commands->argc < 3)
2328                         {
2329                                 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2330                                                          "missing argument", NULL, -1);
2331                         }
2332
2333                         expr_scanner_init(my_commands->argv[2], source, lineno,
2334                                                           my_commands->line, my_commands->argv[0],
2335                                                           my_commands->cols[2] - 1);
2336
2337                         if (expr_yyparse() != 0)
2338                         {
2339                                 /* dead code: exit done from syntax_error called by yyerror */
2340                                 exit(1);
2341                         }
2342
2343                         my_commands->expr = expr_parse_result;
2344
2345                         expr_scanner_finish();
2346                 }
2347                 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2348                 {
2349                         if (my_commands->argc < 2)
2350                         {
2351                                 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2352                                                          "missing argument", NULL, -1);
2353                         }
2354
2355                         /*
2356                          * Split argument into number and unit to allow "sleep 1ms" etc.
2357                          * We don't have to terminate the number argument with null
2358                          * because it will be parsed with atoi, which ignores trailing
2359                          * non-digit characters.
2360                          */
2361                         if (my_commands->argv[1][0] != ':')
2362                         {
2363                                 char       *c = my_commands->argv[1];
2364
2365                                 while (isdigit((unsigned char) *c))
2366                                         c++;
2367                                 if (*c)
2368                                 {
2369                                         my_commands->argv[2] = c;
2370                                         if (my_commands->argc < 3)
2371                                                 my_commands->argc = 3;
2372                                 }
2373                         }
2374
2375                         if (my_commands->argc >= 3)
2376                         {
2377                                 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2378                                         pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2379                                         pg_strcasecmp(my_commands->argv[2], "s") != 0)
2380                                 {
2381                                         syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2382                                                                  "unknown time unit, must be us, ms or s",
2383                                                                  my_commands->argv[2], my_commands->cols[2]);
2384                                 }
2385                         }
2386
2387                         /* this should be an error?! */
2388                         for (j = 3; j < my_commands->argc; j++)
2389                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2390                                                 my_commands->argv[0], my_commands->argv[j]);
2391                 }
2392                 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2393                 {
2394                         if (my_commands->argc < 3)
2395                         {
2396                                 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2397                                                          "missing argument", NULL, -1);
2398                         }
2399                 }
2400                 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2401                 {
2402                         if (my_commands->argc < 1)
2403                         {
2404                                 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2405                                                          "missing command", NULL, -1);
2406                         }
2407                 }
2408                 else
2409                 {
2410                         syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2411                                                  "invalid command", NULL, -1);
2412                 }
2413         }
2414         else
2415         {
2416                 my_commands->type = SQL_COMMAND;
2417
2418                 switch (querymode)
2419                 {
2420                         case QUERY_SIMPLE:
2421                                 my_commands->argv[0] = pg_strdup(p);
2422                                 my_commands->argc++;
2423                                 break;
2424                         case QUERY_EXTENDED:
2425                         case QUERY_PREPARED:
2426                                 if (!parseQuery(my_commands, p))
2427                                         exit(1);
2428                                 break;
2429                         default:
2430                                 exit(1);
2431                 }
2432         }
2433
2434         return my_commands;
2435 }
2436
2437 /*
2438  * Read a line from fd, and return it in a malloc'd buffer.
2439  * Return NULL at EOF.
2440  *
2441  * The buffer will typically be larger than necessary, but we don't care
2442  * in this program, because we'll free it as soon as we've parsed the line.
2443  */
2444 static char *
2445 read_line_from_file(FILE *fd)
2446 {
2447         char            tmpbuf[BUFSIZ];
2448         char       *buf;
2449         size_t          buflen = BUFSIZ;
2450         size_t          used = 0;
2451
2452         buf = (char *) palloc(buflen);
2453         buf[0] = '\0';
2454
2455         while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2456         {
2457                 size_t          thislen = strlen(tmpbuf);
2458
2459                 /* Append tmpbuf to whatever we had already */
2460                 memcpy(buf + used, tmpbuf, thislen + 1);
2461                 used += thislen;
2462
2463                 /* Done if we collected a newline */
2464                 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2465                         break;
2466
2467                 /* Else, enlarge buf to ensure we can append next bufferload */
2468                 buflen += BUFSIZ;
2469                 buf = (char *) pg_realloc(buf, buflen);
2470         }
2471
2472         if (used > 0)
2473                 return buf;
2474
2475         /* Reached EOF */
2476         free(buf);
2477         return NULL;
2478 }
2479
2480 static int
2481 process_file(char *filename)
2482 {
2483 #define COMMANDS_ALLOC_NUM 128
2484
2485         Command   **my_commands;
2486         FILE       *fd;
2487         int                     lineno,
2488                                 index;
2489         char       *buf;
2490         int                     alloc_num;
2491
2492         if (num_files >= MAX_FILES)
2493         {
2494                 fprintf(stderr, "at most %d SQL files are allowed\n", MAX_FILES);
2495                 exit(1);
2496         }
2497
2498         alloc_num = COMMANDS_ALLOC_NUM;
2499         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2500
2501         if (strcmp(filename, "-") == 0)
2502                 fd = stdin;
2503         else if ((fd = fopen(filename, "r")) == NULL)
2504         {
2505                 fprintf(stderr, "could not open file \"%s\": %s\n",
2506                                 filename, strerror(errno));
2507                 pg_free(my_commands);
2508                 return false;
2509         }
2510
2511         lineno = 0;
2512         index = 0;
2513
2514         while ((buf = read_line_from_file(fd)) != NULL)
2515         {
2516                 Command    *command;
2517
2518                 lineno += 1;
2519
2520                 command = process_commands(buf, filename, lineno);
2521
2522                 free(buf);
2523
2524                 if (command == NULL)
2525                         continue;
2526
2527                 my_commands[index] = command;
2528                 index++;
2529
2530                 if (index >= alloc_num)
2531                 {
2532                         alloc_num += COMMANDS_ALLOC_NUM;
2533                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2534                 }
2535         }
2536         fclose(fd);
2537
2538         my_commands[index] = NULL;
2539
2540         sql_files[num_files++] = my_commands;
2541
2542         return true;
2543 }
2544
2545 static Command **
2546 process_builtin(char *tb, const char *source)
2547 {
2548 #define COMMANDS_ALLOC_NUM 128
2549
2550         Command   **my_commands;
2551         int                     lineno,
2552                                 index;
2553         char            buf[BUFSIZ];
2554         int                     alloc_num;
2555
2556         alloc_num = COMMANDS_ALLOC_NUM;
2557         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2558
2559         lineno = 0;
2560         index = 0;
2561
2562         for (;;)
2563         {
2564                 char       *p;
2565                 Command    *command;
2566
2567                 p = buf;
2568                 while (*tb && *tb != '\n')
2569                         *p++ = *tb++;
2570
2571                 if (*tb == '\0')
2572                         break;
2573
2574                 if (*tb == '\n')
2575                         tb++;
2576
2577                 *p = '\0';
2578
2579                 lineno += 1;
2580
2581                 command = process_commands(buf, source, lineno);
2582                 if (command == NULL)
2583                         continue;
2584
2585                 my_commands[index] = command;
2586                 index++;
2587
2588                 if (index >= alloc_num)
2589                 {
2590                         alloc_num += COMMANDS_ALLOC_NUM;
2591                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2592                 }
2593         }
2594
2595         my_commands[index] = NULL;
2596
2597         return my_commands;
2598 }
2599
2600 /* print out results */
2601 static void
2602 printResults(int ttype, int64 normal_xacts, int nclients,
2603                          TState *threads, int nthreads,
2604                          instr_time total_time, instr_time conn_total_time,
2605                          int64 total_latencies, int64 total_sqlats,
2606                          int64 throttle_lag, int64 throttle_lag_max,
2607                          int64 throttle_latency_skipped, int64 latency_late)
2608 {
2609         double          time_include,
2610                                 tps_include,
2611                                 tps_exclude;
2612         char       *s;
2613
2614         time_include = INSTR_TIME_GET_DOUBLE(total_time);
2615         tps_include = normal_xacts / time_include;
2616         tps_exclude = normal_xacts / (time_include -
2617                                                 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2618
2619         if (ttype == 0)
2620                 s = "TPC-B (sort of)";
2621         else if (ttype == 2)
2622                 s = "Update only pgbench_accounts";
2623         else if (ttype == 1)
2624                 s = "SELECT only";
2625         else
2626                 s = "Custom query";
2627
2628         printf("transaction type: %s\n", s);
2629         printf("scaling factor: %d\n", scale);
2630         printf("query mode: %s\n", QUERYMODE[querymode]);
2631         printf("number of clients: %d\n", nclients);
2632         printf("number of threads: %d\n", nthreads);
2633         if (duration <= 0)
2634         {
2635                 printf("number of transactions per client: %d\n", nxacts);
2636                 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2637                            normal_xacts, (int64) nxacts * nclients);
2638         }
2639         else
2640         {
2641                 printf("duration: %d s\n", duration);
2642                 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2643                            normal_xacts);
2644         }
2645
2646         /* Remaining stats are nonsensical if we failed to execute any xacts */
2647         if (normal_xacts <= 0)
2648                 return;
2649
2650         if (throttle_delay && latency_limit)
2651                 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2652                            throttle_latency_skipped,
2653                            100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2654
2655         if (latency_limit)
2656                 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2657                            latency_limit / 1000.0, latency_late,
2658                    100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2659
2660         if (throttle_delay || progress || latency_limit)
2661         {
2662                 /* compute and show latency average and standard deviation */
2663                 double          latency = 0.001 * total_latencies / normal_xacts;
2664                 double          sqlat = (double) total_sqlats / normal_xacts;
2665
2666                 printf("latency average: %.3f ms\n"
2667                            "latency stddev: %.3f ms\n",
2668                            latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2669         }
2670         else
2671         {
2672                 /* only an average latency computed from the duration is available */
2673                 printf("latency average: %.3f ms\n",
2674                            1000.0 * duration * nclients / normal_xacts);
2675         }
2676
2677         if (throttle_delay)
2678         {
2679                 /*
2680                  * Report average transaction lag under rate limit throttling.  This
2681                  * is the delay between scheduled and actual start times for the
2682                  * transaction.  The measured lag may be caused by thread/client load,
2683                  * the database load, or the Poisson throttling process.
2684                  */
2685                 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2686                            0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2687         }
2688
2689         printf("tps = %f (including connections establishing)\n", tps_include);
2690         printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2691
2692         /* Report per-command latencies */
2693         if (is_latencies)
2694         {
2695                 int                     i;
2696
2697                 for (i = 0; i < num_files; i++)
2698                 {
2699                         Command   **commands;
2700
2701                         if (num_files > 1)
2702                                 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2703                         else
2704                                 printf("statement latencies in milliseconds:\n");
2705
2706                         for (commands = sql_files[i]; *commands != NULL; commands++)
2707                         {
2708                                 Command    *command = *commands;
2709                                 int                     cnum = command->command_num;
2710                                 double          total_time;
2711                                 instr_time      total_exec_elapsed;
2712                                 int                     total_exec_count;
2713                                 int                     t;
2714
2715                                 /* Accumulate per-thread data for command */
2716                                 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2717                                 total_exec_count = 0;
2718                                 for (t = 0; t < nthreads; t++)
2719                                 {
2720                                         TState     *thread = &threads[t];
2721
2722                                         INSTR_TIME_ADD(total_exec_elapsed,
2723                                                                    thread->exec_elapsed[cnum]);
2724                                         total_exec_count += thread->exec_count[cnum];
2725                                 }
2726
2727                                 if (total_exec_count > 0)
2728                                         total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2729                                 else
2730                                         total_time = 0.0;
2731
2732                                 printf("\t%f\t%s\n", total_time, command->line);
2733                         }
2734                 }
2735         }
2736 }
2737
2738
2739 int
2740 main(int argc, char **argv)
2741 {
2742         static struct option long_options[] = {
2743                 /* systematic long/short named options */
2744                 {"client", required_argument, NULL, 'c'},
2745                 {"connect", no_argument, NULL, 'C'},
2746                 {"debug", no_argument, NULL, 'd'},
2747                 {"define", required_argument, NULL, 'D'},
2748                 {"file", required_argument, NULL, 'f'},
2749                 {"fillfactor", required_argument, NULL, 'F'},
2750                 {"host", required_argument, NULL, 'h'},
2751                 {"initialize", no_argument, NULL, 'i'},
2752                 {"jobs", required_argument, NULL, 'j'},
2753                 {"log", no_argument, NULL, 'l'},
2754                 {"no-vacuum", no_argument, NULL, 'n'},
2755                 {"port", required_argument, NULL, 'p'},
2756                 {"progress", required_argument, NULL, 'P'},
2757                 {"protocol", required_argument, NULL, 'M'},
2758                 {"quiet", no_argument, NULL, 'q'},
2759                 {"report-latencies", no_argument, NULL, 'r'},
2760                 {"scale", required_argument, NULL, 's'},
2761                 {"select-only", no_argument, NULL, 'S'},
2762                 {"skip-some-updates", no_argument, NULL, 'N'},
2763                 {"time", required_argument, NULL, 'T'},
2764                 {"transactions", required_argument, NULL, 't'},
2765                 {"username", required_argument, NULL, 'U'},
2766                 {"vacuum-all", no_argument, NULL, 'v'},
2767                 /* long-named only options */
2768                 {"foreign-keys", no_argument, &foreign_keys, 1},
2769                 {"index-tablespace", required_argument, NULL, 3},
2770                 {"tablespace", required_argument, NULL, 2},
2771                 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2772                 {"sampling-rate", required_argument, NULL, 4},
2773                 {"aggregate-interval", required_argument, NULL, 5},
2774                 {"rate", required_argument, NULL, 'R'},
2775                 {"latency-limit", required_argument, NULL, 'L'},
2776                 {NULL, 0, NULL, 0}
2777         };
2778
2779         int                     c;
2780         int                     nclients = 1;   /* default number of simulated clients */
2781         int                     nthreads = 1;   /* default number of threads */
2782         int                     is_init_mode = 0;               /* initialize mode? */
2783         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
2784         int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2785         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
2786                                                                  * 2: skip update of branches and tellers */
2787         int                     optindex;
2788         char       *filename = NULL;
2789         bool            scale_given = false;
2790
2791         bool            benchmarking_option_set = false;
2792         bool            initialization_option_set = false;
2793
2794         CState     *state;                      /* status of clients */
2795         TState     *threads;            /* array of thread */
2796
2797         instr_time      start_time;             /* start up time */
2798         instr_time      total_time;
2799         instr_time      conn_total_time;
2800         int64           total_xacts = 0;
2801         int64           total_latencies = 0;
2802         int64           total_sqlats = 0;
2803         int64           throttle_lag = 0;
2804         int64           throttle_lag_max = 0;
2805         int64           throttle_latency_skipped = 0;
2806         int64           latency_late = 0;
2807
2808         int                     i;
2809         int                     nclients_dealt;
2810
2811 #ifdef HAVE_GETRLIMIT
2812         struct rlimit rlim;
2813 #endif
2814
2815         PGconn     *con;
2816         PGresult   *res;
2817         char       *env;
2818
2819         char            val[64];
2820
2821         progname = get_progname(argv[0]);
2822
2823         if (argc > 1)
2824         {
2825                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2826                 {
2827                         usage();
2828                         exit(0);
2829                 }
2830                 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2831                 {
2832                         puts("pgbench (PostgreSQL) " PG_VERSION);
2833                         exit(0);
2834                 }
2835         }
2836
2837 #ifdef WIN32
2838         /* stderr is buffered on Win32. */
2839         setvbuf(stderr, NULL, _IONBF, 0);
2840 #endif
2841
2842         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2843                 pghost = env;
2844         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2845                 pgport = env;
2846         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2847                 login = env;
2848
2849         state = (CState *) pg_malloc(sizeof(CState));
2850         memset(state, 0, sizeof(CState));
2851
2852         while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2853         {
2854                 switch (c)
2855                 {
2856                         case 'i':
2857                                 is_init_mode++;
2858                                 break;
2859                         case 'h':
2860                                 pghost = pg_strdup(optarg);
2861                                 break;
2862                         case 'n':
2863                                 is_no_vacuum++;
2864                                 break;
2865                         case 'v':
2866                                 do_vacuum_accounts++;
2867                                 break;
2868                         case 'p':
2869                                 pgport = pg_strdup(optarg);
2870                                 break;
2871                         case 'd':
2872                                 debug++;
2873                                 break;
2874                         case 'S':
2875                                 ttype = 1;
2876                                 benchmarking_option_set = true;
2877                                 break;
2878                         case 'N':
2879                                 ttype = 2;
2880                                 benchmarking_option_set = true;
2881                                 break;
2882                         case 'c':
2883                                 benchmarking_option_set = true;
2884                                 nclients = atoi(optarg);
2885                                 if (nclients <= 0 || nclients > MAXCLIENTS)
2886                                 {
2887                                         fprintf(stderr, "invalid number of clients: \"%s\"\n",
2888                                                         optarg);
2889                                         exit(1);
2890                                 }
2891 #ifdef HAVE_GETRLIMIT
2892 #ifdef RLIMIT_NOFILE                    /* most platforms use RLIMIT_NOFILE */
2893                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2894 #else                                                   /* but BSD doesn't ... */
2895                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2896 #endif   /* RLIMIT_NOFILE */
2897                                 {
2898                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2899                                         exit(1);
2900                                 }
2901                                 if (rlim.rlim_cur < nclients + 3)
2902                                 {
2903                                         fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
2904                                                         nclients + 3, (long) rlim.rlim_cur);
2905                                         fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
2906                                         exit(1);
2907                                 }
2908 #endif   /* HAVE_GETRLIMIT */
2909                                 break;
2910                         case 'j':                       /* jobs */
2911                                 benchmarking_option_set = true;
2912                                 nthreads = atoi(optarg);
2913                                 if (nthreads <= 0)
2914                                 {
2915                                         fprintf(stderr, "invalid number of threads: \"%s\"\n",
2916                                                         optarg);
2917                                         exit(1);
2918                                 }
2919 #ifndef ENABLE_THREAD_SAFETY
2920                                 if (nthreads != 1)
2921                                 {
2922                                         fprintf(stderr, "threads are not supported on this platform; use -j1\n");
2923                                         exit(1);
2924                                 }
2925 #endif   /* !ENABLE_THREAD_SAFETY */
2926                                 break;
2927                         case 'C':
2928                                 benchmarking_option_set = true;
2929                                 is_connect = true;
2930                                 break;
2931                         case 'r':
2932                                 benchmarking_option_set = true;
2933                                 is_latencies = true;
2934                                 break;
2935                         case 's':
2936                                 scale_given = true;
2937                                 scale = atoi(optarg);
2938                                 if (scale <= 0)
2939                                 {
2940                                         fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
2941                                         exit(1);
2942                                 }
2943                                 break;
2944                         case 't':
2945                                 benchmarking_option_set = true;
2946                                 if (duration > 0)
2947                                 {
2948                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2949                                         exit(1);
2950                                 }
2951                                 nxacts = atoi(optarg);
2952                                 if (nxacts <= 0)
2953                                 {
2954                                         fprintf(stderr, "invalid number of transactions: \"%s\"\n",
2955                                                         optarg);
2956                                         exit(1);
2957                                 }
2958                                 break;
2959                         case 'T':
2960                                 benchmarking_option_set = true;
2961                                 if (nxacts > 0)
2962                                 {
2963                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2964                                         exit(1);
2965                                 }
2966                                 duration = atoi(optarg);
2967                                 if (duration <= 0)
2968                                 {
2969                                         fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
2970                                         exit(1);
2971                                 }
2972                                 break;
2973                         case 'U':
2974                                 login = pg_strdup(optarg);
2975                                 break;
2976                         case 'l':
2977                                 benchmarking_option_set = true;
2978                                 use_log = true;
2979                                 break;
2980                         case 'q':
2981                                 initialization_option_set = true;
2982                                 use_quiet = true;
2983                                 break;
2984                         case 'f':
2985                                 benchmarking_option_set = true;
2986                                 ttype = 3;
2987                                 filename = pg_strdup(optarg);
2988                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2989                                         exit(1);
2990                                 break;
2991                         case 'D':
2992                                 {
2993                                         char       *p;
2994
2995                                         benchmarking_option_set = true;
2996
2997                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2998                                         {
2999                                                 fprintf(stderr, "invalid variable definition: \"%s\"\n",
3000                                                                 optarg);
3001                                                 exit(1);
3002                                         }
3003
3004                                         *p++ = '\0';
3005                                         if (!putVariable(&state[0], "option", optarg, p))
3006                                                 exit(1);
3007                                 }
3008                                 break;
3009                         case 'F':
3010                                 initialization_option_set = true;
3011                                 fillfactor = atoi(optarg);
3012                                 if (fillfactor < 10 || fillfactor > 100)
3013                                 {
3014                                         fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3015                                         exit(1);
3016                                 }
3017                                 break;
3018                         case 'M':
3019                                 benchmarking_option_set = true;
3020                                 if (num_files > 0)
3021                                 {
3022                                         fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f)\n");
3023                                         exit(1);
3024                                 }
3025                                 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3026                                         if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3027                                                 break;
3028                                 if (querymode >= NUM_QUERYMODE)
3029                                 {
3030                                         fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3031                                                         optarg);
3032                                         exit(1);
3033                                 }
3034                                 break;
3035                         case 'P':
3036                                 benchmarking_option_set = true;
3037                                 progress = atoi(optarg);
3038                                 if (progress <= 0)
3039                                 {
3040                                         fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3041                                                         optarg);
3042                                         exit(1);
3043                                 }
3044                                 break;
3045                         case 'R':
3046                                 {
3047                                         /* get a double from the beginning of option value */
3048                                         double          throttle_value = atof(optarg);
3049
3050                                         benchmarking_option_set = true;
3051
3052                                         if (throttle_value <= 0.0)
3053                                         {
3054                                                 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3055                                                 exit(1);
3056                                         }
3057                                         /* Invert rate limit into a time offset */
3058                                         throttle_delay = (int64) (1000000.0 / throttle_value);
3059                                 }
3060                                 break;
3061                         case 'L':
3062                                 {
3063                                         double          limit_ms = atof(optarg);
3064
3065                                         if (limit_ms <= 0.0)
3066                                         {
3067                                                 fprintf(stderr, "invalid latency limit: \"%s\"\n",
3068                                                                 optarg);
3069                                                 exit(1);
3070                                         }
3071                                         benchmarking_option_set = true;
3072                                         latency_limit = (int64) (limit_ms * 1000);
3073                                 }
3074                                 break;
3075                         case 0:
3076                                 /* This covers long options which take no argument. */
3077                                 if (foreign_keys || unlogged_tables)
3078                                         initialization_option_set = true;
3079                                 break;
3080                         case 2:                         /* tablespace */
3081                                 initialization_option_set = true;
3082                                 tablespace = pg_strdup(optarg);
3083                                 break;
3084                         case 3:                         /* index-tablespace */
3085                                 initialization_option_set = true;
3086                                 index_tablespace = pg_strdup(optarg);
3087                                 break;
3088                         case 4:
3089                                 benchmarking_option_set = true;
3090                                 sample_rate = atof(optarg);
3091                                 if (sample_rate <= 0.0 || sample_rate > 1.0)
3092                                 {
3093                                         fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3094                                         exit(1);
3095                                 }
3096                                 break;
3097                         case 5:
3098 #ifdef WIN32
3099                                 fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3100                                 exit(1);
3101 #else
3102                                 benchmarking_option_set = true;
3103                                 agg_interval = atoi(optarg);
3104                                 if (agg_interval <= 0)
3105                                 {
3106                                         fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3107                                                         optarg);
3108                                         exit(1);
3109                                 }
3110 #endif
3111                                 break;
3112                         default:
3113                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3114                                 exit(1);
3115                                 break;
3116                 }
3117         }
3118
3119         /*
3120          * Don't need more threads than there are clients.  (This is not merely an
3121          * optimization; throttle_delay is calculated incorrectly below if some
3122          * threads have no clients assigned to them.)
3123          */
3124         if (nthreads > nclients)
3125                 nthreads = nclients;
3126
3127         /* compute a per thread delay */
3128         throttle_delay *= nthreads;
3129
3130         if (argc > optind)
3131                 dbName = argv[optind];
3132         else
3133         {
3134                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3135                         dbName = env;
3136                 else if (login != NULL && *login != '\0')
3137                         dbName = login;
3138                 else
3139                         dbName = "";
3140         }
3141
3142         if (is_init_mode)
3143         {
3144                 if (benchmarking_option_set)
3145                 {
3146                         fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3147                         exit(1);
3148                 }
3149
3150                 init(is_no_vacuum);
3151                 exit(0);
3152         }
3153         else
3154         {
3155                 if (initialization_option_set)
3156                 {
3157                         fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3158                         exit(1);
3159                 }
3160         }
3161
3162         /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3163         if (nxacts <= 0 && duration <= 0)
3164                 nxacts = DEFAULT_NXACTS;
3165
3166         /* --sampling-rate may be used only with -l */
3167         if (sample_rate > 0.0 && !use_log)
3168         {
3169                 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3170                 exit(1);
3171         }
3172
3173         /* --sampling-rate may must not be used with --aggregate-interval */
3174         if (sample_rate > 0.0 && agg_interval > 0)
3175         {
3176                 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3177                 exit(1);
3178         }
3179
3180         if (agg_interval > 0 && !use_log)
3181         {
3182                 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3183                 exit(1);
3184         }
3185
3186         if (duration > 0 && agg_interval > duration)
3187         {
3188                 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3189                 exit(1);
3190         }
3191
3192         if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3193         {
3194                 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3195                 exit(1);
3196         }
3197
3198         /*
3199          * save main process id in the global variable because process id will be
3200          * changed after fork.
3201          */
3202         main_pid = (int) getpid();
3203         progress_nclients = nclients;
3204         progress_nthreads = nthreads;
3205
3206         if (nclients > 1)
3207         {
3208                 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3209                 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3210
3211                 /* copy any -D switch values to all clients */
3212                 for (i = 1; i < nclients; i++)
3213                 {
3214                         int                     j;
3215
3216                         state[i].id = i;
3217                         for (j = 0; j < state[0].nvariables; j++)
3218                         {
3219                                 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3220                                         exit(1);
3221                         }
3222                 }
3223         }
3224
3225         if (debug)
3226         {
3227                 if (duration <= 0)
3228                         printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3229                                    pghost, pgport, nclients, nxacts, dbName);
3230                 else
3231                         printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3232                                    pghost, pgport, nclients, duration, dbName);
3233         }
3234
3235         /* opening connection... */
3236         con = doConnect();
3237         if (con == NULL)
3238                 exit(1);
3239
3240         if (PQstatus(con) == CONNECTION_BAD)
3241         {
3242                 fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
3243                 fprintf(stderr, "%s", PQerrorMessage(con));
3244                 exit(1);
3245         }
3246
3247         if (ttype != 3)
3248         {
3249                 /*
3250                  * get the scaling factor that should be same as count(*) from
3251                  * pgbench_branches if this is not a custom query
3252                  */
3253                 res = PQexec(con, "select count(*) from pgbench_branches");
3254                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3255                 {
3256                         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
3257
3258                         fprintf(stderr, "%s", PQerrorMessage(con));
3259                         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
3260                         {
3261                                 fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
3262                         }
3263
3264                         exit(1);
3265                 }
3266                 scale = atoi(PQgetvalue(res, 0, 0));
3267                 if (scale < 0)
3268                 {
3269                         fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3270                                         PQgetvalue(res, 0, 0));
3271                         exit(1);
3272                 }
3273                 PQclear(res);
3274
3275                 /* warn if we override user-given -s switch */
3276                 if (scale_given)
3277                         fprintf(stderr,
3278                                         "scale option ignored, using count from pgbench_branches table (%d)\n",
3279                                         scale);
3280         }
3281
3282         /*
3283          * :scale variables normally get -s or database scale, but don't override
3284          * an explicit -D switch
3285          */
3286         if (getVariable(&state[0], "scale") == NULL)
3287         {
3288                 snprintf(val, sizeof(val), "%d", scale);
3289                 for (i = 0; i < nclients; i++)
3290                 {
3291                         if (!putVariable(&state[i], "startup", "scale", val))
3292                                 exit(1);
3293                 }
3294         }
3295
3296         /*
3297          * Define a :client_id variable that is unique per connection. But don't
3298          * override an explicit -D switch.
3299          */
3300         if (getVariable(&state[0], "client_id") == NULL)
3301         {
3302                 for (i = 0; i < nclients; i++)
3303                 {
3304                         snprintf(val, sizeof(val), "%d", i);
3305                         if (!putVariable(&state[i], "startup", "client_id", val))
3306                                 exit(1);
3307                 }
3308         }
3309
3310         if (!is_no_vacuum)
3311         {
3312                 fprintf(stderr, "starting vacuum...");
3313                 tryExecuteStatement(con, "vacuum pgbench_branches");
3314                 tryExecuteStatement(con, "vacuum pgbench_tellers");
3315                 tryExecuteStatement(con, "truncate pgbench_history");
3316                 fprintf(stderr, "end.\n");
3317
3318                 if (do_vacuum_accounts)
3319                 {
3320                         fprintf(stderr, "starting vacuum pgbench_accounts...");
3321                         tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3322                         fprintf(stderr, "end.\n");
3323                 }
3324         }
3325         PQfinish(con);
3326
3327         /* set random seed */
3328         INSTR_TIME_SET_CURRENT(start_time);
3329         srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3330
3331         /* process builtin SQL scripts */
3332         switch (ttype)
3333         {
3334                 case 0:
3335                         sql_files[0] = process_builtin(tpc_b,
3336                                                                                    "<builtin: TPC-B (sort of)>");
3337                         num_files = 1;
3338                         break;
3339
3340                 case 1:
3341                         sql_files[0] = process_builtin(select_only,
3342                                                                                    "<builtin: select only>");
3343                         num_files = 1;
3344                         break;
3345
3346                 case 2:
3347                         sql_files[0] = process_builtin(simple_update,
3348                                                                                    "<builtin: simple update>");
3349                         num_files = 1;
3350                         break;
3351
3352                 default:
3353                         break;
3354         }
3355
3356         /* set up thread data structures */
3357         threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3358         nclients_dealt = 0;
3359
3360         for (i = 0; i < nthreads; i++)
3361         {
3362                 TState     *thread = &threads[i];
3363
3364                 thread->tid = i;
3365                 thread->state = &state[nclients_dealt];
3366                 thread->nstate =
3367                         (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
3368                 thread->random_state[0] = random();
3369                 thread->random_state[1] = random();
3370                 thread->random_state[2] = random();
3371                 thread->throttle_latency_skipped = 0;
3372                 thread->latency_late = 0;
3373
3374                 nclients_dealt += thread->nstate;
3375
3376                 if (is_latencies)
3377                 {
3378                         /* Reserve memory for the thread to store per-command latencies */
3379                         int                     t;
3380
3381                         thread->exec_elapsed = (instr_time *)
3382                                 pg_malloc(sizeof(instr_time) * num_commands);
3383                         thread->exec_count = (int *)
3384                                 pg_malloc(sizeof(int) * num_commands);
3385
3386                         for (t = 0; t < num_commands; t++)
3387                         {
3388                                 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3389                                 thread->exec_count[t] = 0;
3390                         }
3391                 }
3392                 else
3393                 {
3394                         thread->exec_elapsed = NULL;
3395                         thread->exec_count = NULL;
3396                 }
3397         }
3398
3399         /* all clients must be assigned to a thread */
3400         Assert(nclients_dealt == nclients);
3401
3402         /* get start up time */
3403         INSTR_TIME_SET_CURRENT(start_time);
3404
3405         /* set alarm if duration is specified. */
3406         if (duration > 0)
3407                 setalarm(duration);
3408
3409         /* start threads */
3410 #ifdef ENABLE_THREAD_SAFETY
3411         for (i = 0; i < nthreads; i++)
3412         {
3413                 TState     *thread = &threads[i];
3414
3415                 INSTR_TIME_SET_CURRENT(thread->start_time);
3416
3417                 /* the first thread (i = 0) is executed by main thread */
3418                 if (i > 0)
3419                 {
3420                         int                     err = pthread_create(&thread->thread, NULL, threadRun, thread);
3421
3422                         if (err != 0 || thread->thread == INVALID_THREAD)
3423                         {
3424                                 fprintf(stderr, "could not create thread: %s\n", strerror(err));
3425                                 exit(1);
3426                         }
3427                 }
3428                 else
3429                 {
3430                         thread->thread = INVALID_THREAD;
3431                 }
3432         }
3433 #else
3434         INSTR_TIME_SET_CURRENT(threads[0].start_time);
3435         threads[0].thread = INVALID_THREAD;
3436 #endif   /* ENABLE_THREAD_SAFETY */
3437
3438         /* wait for threads and accumulate results */
3439         INSTR_TIME_SET_ZERO(conn_total_time);
3440         for (i = 0; i < nthreads; i++)
3441         {
3442                 TState     *thread = &threads[i];
3443                 int                     j;
3444
3445 #ifdef ENABLE_THREAD_SAFETY
3446                 if (threads[i].thread == INVALID_THREAD)
3447                         /* actually run this thread directly in the main thread */
3448                         (void) threadRun(thread);
3449                 else
3450                         /* wait of other threads. should check that 0 is returned? */
3451                         pthread_join(thread->thread, NULL);
3452 #else
3453                 (void) threadRun(thread);
3454 #endif   /* ENABLE_THREAD_SAFETY */
3455
3456                 /* thread level stats */
3457                 throttle_lag += thread->throttle_lag;
3458                 throttle_latency_skipped += threads->throttle_latency_skipped;
3459                 latency_late += thread->latency_late;
3460                 if (throttle_lag_max > thread->throttle_lag_max)
3461                         throttle_lag_max = thread->throttle_lag_max;
3462                 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
3463
3464                 /* client-level stats */
3465                 for (j = 0; j < thread->nstate; j++)
3466                 {
3467                         total_xacts += thread->state[j].cnt;
3468                         total_latencies += thread->state[j].txn_latencies;
3469                         total_sqlats += thread->state[j].txn_sqlats;
3470                 }
3471         }
3472         disconnect_all(state, nclients);
3473
3474         /*
3475          * XXX We compute results as though every client of every thread started
3476          * and finished at the same time.  That model can diverge noticeably from
3477          * reality for a short benchmark run involving relatively many threads.
3478          * The first thread may process notably many transactions before the last
3479          * thread begins.  Improving the model alone would bring limited benefit,
3480          * because performance during those periods of partial thread count can
3481          * easily exceed steady state performance.  This is one of the many ways
3482          * short runs convey deceptive performance figures.
3483          */
3484         INSTR_TIME_SET_CURRENT(total_time);
3485         INSTR_TIME_SUBTRACT(total_time, start_time);
3486         printResults(ttype, total_xacts, nclients, threads, nthreads,
3487                                  total_time, conn_total_time, total_latencies, total_sqlats,
3488                                  throttle_lag, throttle_lag_max, throttle_latency_skipped,
3489                                  latency_late);
3490
3491         return 0;
3492 }
3493
3494 static void *
3495 threadRun(void *arg)
3496 {
3497         TState     *thread = (TState *) arg;
3498         CState     *state = thread->state;
3499         FILE       *logfile = NULL; /* per-thread log file */
3500         instr_time      start,
3501                                 end;
3502         int                     nstate = thread->nstate;
3503         int                     remains = nstate;               /* number of remaining clients */
3504         int                     i;
3505
3506         /* for reporting progress: */
3507         int64           thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3508         int64           last_report = thread_start;
3509         int64           next_report = last_report + (int64) progress * 1000000;
3510         int64           last_count = 0,
3511                                 last_lats = 0,
3512                                 last_sqlats = 0,
3513                                 last_lags = 0,
3514                                 last_skipped = 0;
3515
3516         AggVals         aggs;
3517
3518         /*
3519          * Initialize throttling rate target for all of the thread's clients.  It
3520          * might be a little more accurate to reset thread->start_time here too.
3521          * The possible drift seems too small relative to typical throttle delay
3522          * times to worry about it.
3523          */
3524         INSTR_TIME_SET_CURRENT(start);
3525         thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3526         thread->throttle_lag = 0;
3527         thread->throttle_lag_max = 0;
3528
3529         INSTR_TIME_SET_ZERO(thread->conn_time);
3530
3531         /* open log file if requested */
3532         if (use_log)
3533         {
3534                 char            logpath[64];
3535
3536                 if (thread->tid == 0)
3537                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3538                 else
3539                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3540                 logfile = fopen(logpath, "w");
3541
3542                 if (logfile == NULL)
3543                 {
3544                         fprintf(stderr, "could not open logfile \"%s\": %s\n",
3545                                         logpath, strerror(errno));
3546                         goto done;
3547                 }
3548         }
3549
3550         if (!is_connect)
3551         {
3552                 /* make connections to the database */
3553                 for (i = 0; i < nstate; i++)
3554                 {
3555                         if ((state[i].con = doConnect()) == NULL)
3556                                 goto done;
3557                 }
3558         }
3559
3560         /* time after thread and connections set up */
3561         INSTR_TIME_SET_CURRENT(thread->conn_time);
3562         INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
3563
3564         agg_vals_init(&aggs, thread->start_time);
3565
3566         /* send start up queries in async manner */
3567         for (i = 0; i < nstate; i++)
3568         {
3569                 CState     *st = &state[i];
3570                 Command   **commands = sql_files[st->use_file];
3571                 int                     prev_ecnt = st->ecnt;
3572
3573                 st->use_file = getrand(thread, 0, num_files - 1);
3574                 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3575                         remains--;                      /* I've aborted */
3576
3577                 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3578                 {
3579                         fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3580                                         i, st->state);
3581                         remains--;                      /* I've aborted */
3582                         PQfinish(st->con);
3583                         st->con = NULL;
3584                 }
3585         }
3586
3587         while (remains > 0)
3588         {
3589                 fd_set          input_mask;
3590                 int                     maxsock;        /* max socket number to be waited */
3591                 int64           now_usec = 0;
3592                 int64           min_usec;
3593
3594                 FD_ZERO(&input_mask);
3595
3596                 maxsock = -1;
3597                 min_usec = PG_INT64_MAX;
3598                 for (i = 0; i < nstate; i++)
3599                 {
3600                         CState     *st = &state[i];
3601                         Command   **commands = sql_files[st->use_file];
3602                         int                     sock;
3603
3604                         if (st->con == NULL)
3605                         {
3606                                 continue;
3607                         }
3608                         else if (st->sleeping)
3609                         {
3610                                 if (st->throttling && timer_exceeded)
3611                                 {
3612                                         /* interrupt client which has not started a transaction */
3613                                         remains--;
3614                                         st->sleeping = 0;
3615                                         st->throttling = false;
3616                                         PQfinish(st->con);
3617                                         st->con = NULL;
3618                                         continue;
3619                                 }
3620                                 else    /* just a nap from the script */
3621                                 {
3622                                         int                     this_usec;
3623
3624                                         if (min_usec == PG_INT64_MAX)
3625                                         {
3626                                                 instr_time      now;
3627
3628                                                 INSTR_TIME_SET_CURRENT(now);
3629                                                 now_usec = INSTR_TIME_GET_MICROSEC(now);
3630                                         }
3631
3632                                         this_usec = st->txn_scheduled - now_usec;
3633                                         if (min_usec > this_usec)
3634                                                 min_usec = this_usec;
3635                                 }
3636                         }
3637                         else if (commands[st->state]->type == META_COMMAND)
3638                         {
3639                                 min_usec = 0;   /* the connection is ready to run */
3640                                 break;
3641                         }
3642
3643                         sock = PQsocket(st->con);
3644                         if (sock < 0)
3645                         {
3646                                 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3647                                 goto done;
3648                         }
3649
3650                         FD_SET(sock, &input_mask);
3651
3652                         if (maxsock < sock)
3653                                 maxsock = sock;
3654                 }
3655
3656                 /* also wake up to print the next progress report on time */
3657                 if (progress && min_usec > 0 && thread->tid == 0)
3658                 {
3659                         /* get current time if needed */
3660                         if (now_usec == 0)
3661                         {
3662                                 instr_time      now;
3663
3664                                 INSTR_TIME_SET_CURRENT(now);
3665                                 now_usec = INSTR_TIME_GET_MICROSEC(now);
3666                         }
3667
3668                         if (now_usec >= next_report)
3669                                 min_usec = 0;
3670                         else if ((next_report - now_usec) < min_usec)
3671                                 min_usec = next_report - now_usec;
3672                 }
3673
3674                 /*
3675                  * Sleep until we receive data from the server, or a nap-time
3676                  * specified in the script ends, or it's time to print a progress
3677                  * report.
3678                  */
3679                 if (min_usec > 0 && maxsock != -1)
3680                 {
3681                         int                     nsocks; /* return from select(2) */
3682
3683                         if (min_usec != PG_INT64_MAX)
3684                         {
3685                                 struct timeval timeout;
3686
3687                                 timeout.tv_sec = min_usec / 1000000;
3688                                 timeout.tv_usec = min_usec % 1000000;
3689                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3690                         }
3691                         else
3692                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3693                         if (nsocks < 0)
3694                         {
3695                                 if (errno == EINTR)
3696                                         continue;
3697                                 /* must be something wrong */
3698                                 fprintf(stderr, "select() failed: %s\n", strerror(errno));
3699                                 goto done;
3700                         }
3701                 }
3702
3703                 /* ok, backend returns reply */
3704                 for (i = 0; i < nstate; i++)
3705                 {
3706                         CState     *st = &state[i];
3707                         Command   **commands = sql_files[st->use_file];
3708                         int                     prev_ecnt = st->ecnt;
3709
3710                         if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3711                                                         || commands[st->state]->type == META_COMMAND))
3712                         {
3713                                 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3714                                         remains--;      /* I've aborted */
3715                         }
3716
3717                         if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3718                         {
3719                                 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3720                                                 i, st->state);
3721                                 remains--;              /* I've aborted */
3722                                 PQfinish(st->con);
3723                                 st->con = NULL;
3724                         }
3725                 }
3726
3727                 /* progress report by thread 0 for all threads */
3728                 if (progress && thread->tid == 0)
3729                 {
3730                         instr_time      now_time;
3731                         int64           now;
3732
3733                         INSTR_TIME_SET_CURRENT(now_time);
3734                         now = INSTR_TIME_GET_MICROSEC(now_time);
3735                         if (now >= next_report)
3736                         {
3737                                 /* generate and show report */
3738                                 int64           count = 0,
3739                                                         lats = 0,
3740                                                         sqlats = 0,
3741                                                         lags = 0,
3742                                                         skipped = 0;
3743                                 int64           run = now - last_report;
3744                                 double          tps,
3745                                                         total_run,
3746                                                         latency,
3747                                                         sqlat,
3748                                                         lag,
3749                                                         stdev;
3750
3751                                 /*
3752                                  * Add up the statistics of all threads.
3753                                  *
3754                                  * XXX: No locking. There is no guarantee that we get an
3755                                  * atomic snapshot of the transaction count and latencies, so
3756                                  * these figures can well be off by a small amount. The
3757                                  * progress is report's purpose is to give a quick overview of
3758                                  * how the test is going, so that shouldn't matter too much.
3759                                  * (If a read from a 64-bit integer is not atomic, you might
3760                                  * get a "torn" read and completely bogus latencies though!)
3761                                  */
3762                                 for (i = 0; i < progress_nclients; i++)
3763                                 {
3764                                         count += state[i].cnt;
3765                                         lats += state[i].txn_latencies;
3766                                         sqlats += state[i].txn_sqlats;
3767                                 }
3768
3769                                 for (i = 0; i < progress_nthreads; i++)
3770                                 {
3771                                         skipped += thread[i].throttle_latency_skipped;
3772                                         lags += thread[i].throttle_lag;
3773                                 }
3774
3775                                 total_run = (now - thread_start) / 1000000.0;
3776                                 tps = 1000000.0 * (count - last_count) / run;
3777                                 latency = 0.001 * (lats - last_lats) / (count - last_count);
3778                                 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3779                                 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3780                                 lag = 0.001 * (lags - last_lags) / (count - last_count);
3781
3782                                 fprintf(stderr,
3783                                                 "progress: %.1f s, %.1f tps, "
3784                                                 "lat %.3f ms stddev %.3f",
3785                                                 total_run, tps, latency, stdev);
3786                                 if (throttle_delay)
3787                                 {
3788                                         fprintf(stderr, ", lag %.3f ms", lag);
3789                                         if (latency_limit)
3790                                                 fprintf(stderr, ", " INT64_FORMAT " skipped",
3791                                                                 skipped - last_skipped);
3792                                 }
3793                                 fprintf(stderr, "\n");
3794
3795                                 last_count = count;
3796                                 last_lats = lats;
3797                                 last_sqlats = sqlats;
3798                                 last_lags = lags;
3799                                 last_report = now;
3800                                 last_skipped = skipped;
3801
3802                                 /*
3803                                  * Ensure that the next report is in the future, in case
3804                                  * pgbench/postgres got stuck somewhere.
3805                                  */
3806                                 do
3807                                 {
3808                                         next_report += (int64) progress *1000000;
3809                                 } while (now >= next_report);
3810                         }
3811                 }
3812         }
3813
3814 done:
3815         INSTR_TIME_SET_CURRENT(start);
3816         disconnect_all(state, nstate);
3817         INSTR_TIME_SET_CURRENT(end);
3818         INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
3819         if (logfile)
3820                 fclose(logfile);
3821         return NULL;
3822 }
3823
3824 /*
3825  * Support for duration option: set timer_exceeded after so many seconds.
3826  */
3827
3828 #ifndef WIN32
3829
3830 static void
3831 handle_sig_alarm(SIGNAL_ARGS)
3832 {
3833         timer_exceeded = true;
3834 }
3835
3836 static void
3837 setalarm(int seconds)
3838 {
3839         pqsignal(SIGALRM, handle_sig_alarm);
3840         alarm(seconds);
3841 }
3842
3843 #else                                                   /* WIN32 */
3844
3845 static VOID CALLBACK
3846 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3847 {
3848         timer_exceeded = true;
3849 }
3850
3851 static void
3852 setalarm(int seconds)
3853 {
3854         HANDLE          queue;
3855         HANDLE          timer;
3856
3857         /* This function will be called at most once, so we can cheat a bit. */
3858         queue = CreateTimerQueue();
3859         if (seconds > ((DWORD) -1) / 1000 ||
3860                 !CreateTimerQueueTimer(&timer, queue,
3861                                                            win32_timer_callback, NULL, seconds * 1000, 0,
3862                                                            WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3863         {
3864                 fprintf(stderr, "failed to set timer\n");
3865                 exit(1);
3866         }
3867 }
3868
3869 /* partial pthread implementation for Windows */
3870
3871 typedef struct win32_pthread
3872 {
3873         HANDLE          handle;
3874         void       *(*routine) (void *);
3875         void       *arg;
3876         void       *result;
3877 } win32_pthread;
3878
3879 static unsigned __stdcall
3880 win32_pthread_run(void *arg)
3881 {
3882         win32_pthread *th = (win32_pthread *) arg;
3883
3884         th->result = th->routine(th->arg);
3885
3886         return 0;
3887 }
3888
3889 static int
3890 pthread_create(pthread_t *thread,
3891                            pthread_attr_t *attr,
3892                            void *(*start_routine) (void *),
3893                            void *arg)
3894 {
3895         int                     save_errno;
3896         win32_pthread *th;
3897
3898         th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3899         th->routine = start_routine;
3900         th->arg = arg;
3901         th->result = NULL;
3902
3903         th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3904         if (th->handle == NULL)
3905         {
3906                 save_errno = errno;
3907                 free(th);
3908                 return save_errno;
3909         }
3910
3911         *thread = th;
3912         return 0;
3913 }
3914
3915 static int
3916 pthread_join(pthread_t th, void **thread_return)
3917 {
3918         if (th == NULL || th->handle == NULL)
3919                 return errno = EINVAL;
3920
3921         if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3922         {
3923                 _dosmaperr(GetLastError());
3924                 return errno;
3925         }
3926
3927         if (thread_return)
3928                 *thread_return = th->result;
3929
3930         CloseHandle(th->handle);
3931         free(th);
3932         return 0;
3933 }
3934
3935 #endif   /* WIN32 */