4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2015, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define M_PI 3.14159265358979323846
59 * Multi-platform pthread implementations
63 /* Use native win32 threads on Windows */
64 typedef struct win32_pthread *pthread_t;
65 typedef int pthread_attr_t;
67 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
68 static int pthread_join(pthread_t th, void **thread_return);
69 #elif defined(ENABLE_THREAD_SAFETY)
70 /* Use platform-dependent pthread capability */
73 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
74 #define PTHREAD_FORK_EMULATION
77 #define pthread_t pg_pthread_t
78 #define pthread_attr_t pg_pthread_attr_t
79 #define pthread_create pg_pthread_create
80 #define pthread_join pg_pthread_join
82 typedef struct fork_pthread *pthread_t;
83 typedef int pthread_attr_t;
85 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
86 static int pthread_join(pthread_t th, void **thread_return);
90 /********************************************************************
91 * some configurable parameters */
93 /* max number of clients allowed */
95 #define MAXCLIENTS (FD_SETSIZE - 10)
97 #define MAXCLIENTS 1024
100 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
101 #define DEFAULT_NXACTS 10 /* default nxacts */
103 #define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */
105 int nxacts = 0; /* number of transactions per client */
106 int duration = 0; /* duration in seconds */
109 * scaling factor. for example, scale = 10 will make 1000000 tuples in
110 * pgbench_accounts table.
115 * fillfactor. for example, fillfactor = 90 will use only 90 percent
116 * space during inserts and leave 10 percent free.
118 int fillfactor = 100;
121 * create foreign key constraints on the tables?
123 int foreign_keys = 0;
126 * use unlogged tables?
128 int unlogged_tables = 0;
131 * log sampling rate (1.0 = log everything, 0.0 = option not given)
133 double sample_rate = 0.0;
136 * When threads are throttled to a given rate limit, this is the target delay
137 * to reach that rate in usec. 0 is the default and means no throttling.
139 int64 throttle_delay = 0;
142 * Transactions which take longer than this limit (in usec) are counted as
143 * late, and reported as such, although they are completed anyway. When
144 * throttling is enabled, execution time slots that are more than this late
145 * are skipped altogether, and counted separately.
147 int64 latency_limit = 0;
150 * tablespace selection
152 char *tablespace = NULL;
153 char *index_tablespace = NULL;
156 * end of configurable parameters
157 *********************************************************************/
159 #define nbranches 1 /* Makes little sense to change this. Change
162 #define naccounts 100000
165 * The scale factor at/beyond which 32bit integers are incapable of storing
168 * Although the actual threshold is 21474, we use 20000 because it is easier to
169 * document and remember, and isn't that far away from the real threshold.
171 #define SCALE_32BIT_THRESHOLD 20000
173 bool use_log; /* log transaction latencies to a file */
174 bool use_quiet; /* quiet logging onto stderr */
175 int agg_interval; /* log aggregates instead of individual
177 int progress = 0; /* thread progress report every this seconds */
178 int progress_nclients = 0; /* number of clients for progress
180 int progress_nthreads = 0; /* number of threads for progress
182 bool is_connect; /* establish connection for each transaction */
183 bool is_latencies; /* report per-command latencies */
184 int main_pid; /* main process id used in log filename */
190 const char *progname;
192 volatile bool timer_exceeded = false; /* flag from signal handler */
194 /* variable definitions */
197 char *name; /* variable name */
198 char *value; /* its value */
201 #define MAX_FILES 128 /* max number of SQL script files allowed */
202 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
205 * structures used in custom query mode
210 PGconn *con; /* connection handle to DB */
211 int id; /* client No. */
212 int state; /* state No. */
213 int cnt; /* xacts count */
214 int ecnt; /* error count */
215 int listen; /* 0 indicates that an async query has been
217 int sleeping; /* 1 indicates that the client is napping */
218 bool throttling; /* whether nap is for throttling */
219 Variable *variables; /* array of variable definitions */
221 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
222 instr_time txn_begin; /* used for measuring schedule lag times */
223 instr_time stmt_begin; /* used for measuring statement latencies */
224 int64 txn_latencies; /* cumulated latencies */
225 int64 txn_sqlats; /* cumulated square latencies */
226 bool is_throttled; /* whether transaction throttling is done */
227 int use_file; /* index in sql_files for this client */
228 bool prepared[MAX_FILES];
232 * Thread state and result
236 int tid; /* thread id */
237 pthread_t thread; /* thread handle */
238 CState *state; /* array of CState */
239 int nstate; /* length of state[] */
240 instr_time start_time; /* thread start time */
241 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
242 int *exec_count; /* number of cmd executions (per Command) */
243 unsigned short random_state[3]; /* separate randomness for each thread */
244 int64 throttle_trigger; /* previous/next throttling (us) */
245 int64 throttle_lag; /* total transaction lag behind throttling */
246 int64 throttle_lag_max; /* max transaction lag */
247 int64 throttle_latency_skipped; /* lagging transactions skipped */
248 int64 latency_late; /* late transactions */
251 #define INVALID_THREAD ((pthread_t) 0)
255 instr_time conn_time;
260 int64 throttle_lag_max;
261 int64 throttle_latency_skipped;
266 * queries read from files
268 #define SQL_COMMAND 1
269 #define META_COMMAND 2
272 typedef enum QueryMode
274 QUERY_SIMPLE, /* simple query */
275 QUERY_EXTENDED, /* extended query */
276 QUERY_PREPARED, /* extended query with prepared statements */
280 static QueryMode querymode = QUERY_SIMPLE;
281 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
285 char *line; /* full text of command line */
286 int command_num; /* unique index of this Command struct */
287 int type; /* command type (SQL_COMMAND or META_COMMAND) */
288 int argc; /* number of command words */
289 char *argv[MAX_ARGS]; /* command word list */
290 int cols[MAX_ARGS]; /* corresponding column starting from 1 */
291 PgBenchExpr *expr; /* parsed expression */
297 long start_time; /* when does the interval start */
298 int cnt; /* number of transactions */
299 int skipped; /* number of transactions skipped under
300 * --rate and --latency-limit */
302 double min_latency; /* min/max latencies */
304 double sum_latency; /* sum(latency), sum(latency^2) - for
310 double sum_lag; /* sum(lag) */
311 double sum2_lag; /* sum(lag*lag) */
314 static Command **sql_files[MAX_FILES]; /* SQL script files */
315 static int num_files; /* number of script files */
316 static int num_commands = 0; /* total number of Command structs */
317 static int debug = 0; /* debug flag */
319 /* default scenario */
320 static char *tpc_b = {
321 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
322 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
323 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
324 "\\setrandom aid 1 :naccounts\n"
325 "\\setrandom bid 1 :nbranches\n"
326 "\\setrandom tid 1 :ntellers\n"
327 "\\setrandom delta -5000 5000\n"
329 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
330 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
331 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
332 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
333 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
338 static char *simple_update = {
339 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
340 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
341 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
342 "\\setrandom aid 1 :naccounts\n"
343 "\\setrandom bid 1 :nbranches\n"
344 "\\setrandom tid 1 :ntellers\n"
345 "\\setrandom delta -5000 5000\n"
347 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
348 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
349 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
354 static char *select_only = {
355 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
356 "\\setrandom aid 1 :naccounts\n"
357 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
360 /* Function prototypes */
361 static void setalarm(int seconds);
362 static void *threadRun(void *arg);
364 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
365 AggVals *agg, bool skipped);
370 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
372 " %s [OPTION]... [DBNAME]\n"
373 "\nInitialization options:\n"
374 " -i, --initialize invokes initialization mode\n"
375 " -F, --fillfactor=NUM set fill factor\n"
376 " -n, --no-vacuum do not run VACUUM after initialization\n"
377 " -q, --quiet quiet logging (one message each 5 seconds)\n"
378 " -s, --scale=NUM scaling factor\n"
379 " --foreign-keys create foreign key constraints between tables\n"
380 " --index-tablespace=TABLESPACE\n"
381 " create indexes in the specified tablespace\n"
382 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
383 " --unlogged-tables create tables as unlogged tables\n"
384 "\nBenchmarking options:\n"
385 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
386 " -C, --connect establish new connection for each transaction\n"
387 " -D, --define=VARNAME=VALUE\n"
388 " define variable for use by custom script\n"
389 " -f, --file=FILENAME read transaction script from FILENAME\n"
390 " -j, --jobs=NUM number of threads (default: 1)\n"
391 " -l, --log write transaction times to log file\n"
392 " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n"
394 " -M, --protocol=simple|extended|prepared\n"
395 " protocol for submitting queries (default: simple)\n"
396 " -n, --no-vacuum do not run VACUUM before tests\n"
397 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
398 " -P, --progress=NUM show thread progress report every NUM seconds\n"
399 " -r, --report-latencies report average latency per command\n"
400 " -R, --rate=NUM target rate in transactions per second\n"
401 " -s, --scale=NUM report this scale factor in output\n"
402 " -S, --select-only perform SELECT-only transactions\n"
403 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
404 " -T, --time=NUM duration of benchmark test in seconds\n"
405 " -v, --vacuum-all vacuum all four standard tables before tests\n"
406 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
407 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
408 "\nCommon options:\n"
409 " -d, --debug print debugging output\n"
410 " -h, --host=HOSTNAME database server host or socket directory\n"
411 " -p, --port=PORT database server port number\n"
412 " -U, --username=USERNAME connect as specified database user\n"
413 " -V, --version output version information, then exit\n"
414 " -?, --help show this help, then exit\n"
416 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
421 * strtoint64 -- convert a string to 64-bit integer
423 * This function is a modified version of scanint8() from
424 * src/backend/utils/adt/int8.c.
427 strtoint64(const char *str)
429 const char *ptr = str;
434 * Do our own scan, rather than relying on sscanf which might be broken
438 /* skip leading spaces */
439 while (*ptr && isspace((unsigned char) *ptr))
448 * Do an explicit check for INT64_MIN. Ugly though this is, it's
449 * cleaner than trying to get the loop below to handle it portably.
451 if (strncmp(ptr, "9223372036854775808", 19) == 0)
453 result = PG_INT64_MIN;
459 else if (*ptr == '+')
462 /* require at least one digit */
463 if (!isdigit((unsigned char) *ptr))
464 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
467 while (*ptr && isdigit((unsigned char) *ptr))
469 int64 tmp = result * 10 + (*ptr++ - '0');
471 if ((tmp / 10) != result) /* overflow? */
472 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
478 /* allow trailing whitespace, but not other trailing chars */
479 while (*ptr != '\0' && isspace((unsigned char) *ptr))
483 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
485 return ((sign < 0) ? -result : result);
488 /* random number generator: uniform distribution from min to max inclusive */
490 getrand(TState *thread, int64 min, int64 max)
493 * Odd coding is so that min and max have approximately the same chance of
494 * being selected as do numbers between them.
496 * pg_erand48() is thread-safe and concurrent, which is why we use it
497 * rather than random(), which in glibc is non-reentrant, and therefore
498 * protected by a mutex, and therefore a bottleneck on machines with many
501 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
505 * random number generator: exponential distribution from min to max inclusive.
506 * the threshold is so that the density of probability for the last cut-off max
507 * value is exp(-threshold).
510 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
512 double cut, uniform, rand;
513 Assert(threshold > 0.0);
514 cut = exp(-threshold);
515 /* erand in [0, 1), uniform in (0, 1] */
516 uniform = 1.0 - pg_erand48(thread->random_state);
518 * inner expresion in (cut, 1] (if threshold > 0),
521 Assert((1.0 - cut) != 0.0);
522 rand = - log(cut + (1.0 - cut) * uniform) / threshold;
523 /* return int64 random number within between min and max */
524 return min + (int64)((max - min + 1) * rand);
527 /* random number generator: gaussian distribution from min to max inclusive */
529 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
535 * Get user specified random number from this loop, with
536 * -threshold < stdev <= threshold
538 * This loop is executed until the number is in the expected range.
540 * As the minimum threshold is 2.0, the probability of looping is low:
541 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the average
542 * sinus multiplier as 2/pi, we have a 8.6% looping probability in the
543 * worst case. For a 5.0 threshold value, the looping probability
544 * is about e^{-5} * 2 / pi ~ 0.43%.
549 * pg_erand48 generates [0,1), but for the basic version of the
550 * Box-Muller transform the two uniformly distributed random numbers
551 * are expected in (0, 1] (see http://en.wikipedia.org/wiki/Box_muller)
553 double rand1 = 1.0 - pg_erand48(thread->random_state);
554 double rand2 = 1.0 - pg_erand48(thread->random_state);
556 /* Box-Muller basic form transform */
557 double var_sqrt = sqrt(-2.0 * log(rand1));
558 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
561 * we may try with cos, but there may be a bias induced if the previous
562 * value fails the test. To be on the safe side, let us try over.
565 while (stdev < -threshold || stdev >= threshold);
567 /* stdev is in [-threshold, threshold), normalization to [0,1) */
568 rand = (stdev + threshold) / (threshold * 2.0);
570 /* return int64 random number within between min and max */
571 return min + (int64)((max - min + 1) * rand);
575 * random number generator: generate a value, such that the series of values
576 * will approximate a Poisson distribution centered on the given value.
579 getPoissonRand(TState *thread, int64 center)
582 * Use inverse transform sampling to generate a value > 0, such that the
583 * expected (i.e. average) value is the given argument.
587 /* erand in [0, 1), uniform in (0, 1] */
588 uniform = 1.0 - pg_erand48(thread->random_state);
590 return (int64) (-log(uniform) * ((double) center) + 0.5);
593 /* call PQexec() and exit() on failure */
595 executeStatement(PGconn *con, const char *sql)
599 res = PQexec(con, sql);
600 if (PQresultStatus(res) != PGRES_COMMAND_OK)
602 fprintf(stderr, "%s", PQerrorMessage(con));
608 /* set up a connection to the backend */
613 static char *password = NULL;
617 * Start the connection. Loop until we have a password if requested by
622 #define PARAMS_ARRAY_SIZE 7
624 const char *keywords[PARAMS_ARRAY_SIZE];
625 const char *values[PARAMS_ARRAY_SIZE];
627 keywords[0] = "host";
629 keywords[1] = "port";
631 keywords[2] = "user";
633 keywords[3] = "password";
634 values[3] = password;
635 keywords[4] = "dbname";
637 keywords[5] = "fallback_application_name";
638 values[5] = progname;
644 conn = PQconnectdbParams(keywords, values, true);
648 fprintf(stderr, "Connection to database \"%s\" failed\n",
653 if (PQstatus(conn) == CONNECTION_BAD &&
654 PQconnectionNeedsPassword(conn) &&
658 password = simple_prompt("Password: ", 100, false);
663 /* check to see that the backend connection was successfully made */
664 if (PQstatus(conn) == CONNECTION_BAD)
666 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
667 dbName, PQerrorMessage(conn));
675 /* throw away response from backend */
677 discard_response(CState *state)
683 res = PQgetResult(state->con);
690 compareVariables(const void *v1, const void *v2)
692 return strcmp(((const Variable *) v1)->name,
693 ((const Variable *) v2)->name);
697 getVariable(CState *st, char *name)
702 /* On some versions of Solaris, bsearch of zero items dumps core */
703 if (st->nvariables <= 0)
707 var = (Variable *) bsearch((void *) &key,
708 (void *) st->variables,
718 /* check whether the name consists of alphabets, numerals and underscores. */
720 isLegalVariableName(const char *name)
724 for (i = 0; name[i] != '\0'; i++)
726 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
734 putVariable(CState *st, const char *context, char *name, char *value)
740 /* On some versions of Solaris, bsearch of zero items dumps core */
741 if (st->nvariables > 0)
742 var = (Variable *) bsearch((void *) &key,
743 (void *) st->variables,
755 * Check for the name only when declaring a new variable to avoid
758 if (!isLegalVariableName(name))
760 fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
765 newvars = (Variable *) pg_realloc(st->variables,
766 (st->nvariables + 1) * sizeof(Variable));
768 newvars = (Variable *) pg_malloc(sizeof(Variable));
770 st->variables = newvars;
772 var = &newvars[st->nvariables];
774 var->name = pg_strdup(name);
775 var->value = pg_strdup(value);
779 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
786 /* dup then free, in case value is pointing at this variable */
787 val = pg_strdup(value);
797 parseVariable(const char *sql, int *eaten)
805 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
810 memcpy(name, &sql[1], i - 1);
818 replaceVariable(char **sql, char *param, int len, char *value)
820 int valueln = strlen(value);
824 size_t offset = param - *sql;
826 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
827 param = *sql + offset;
831 memmove(param + valueln, param + len, strlen(param + len) + 1);
832 memcpy(param, value, valueln);
834 return param + valueln;
838 assignVariables(CState *st, char *sql)
845 while ((p = strchr(p, ':')) != NULL)
849 name = parseVariable(p, &eaten);
859 val = getVariable(st, name);
867 p = replaceVariable(&sql, p, eaten, val);
874 getQueryParams(CState *st, const Command *command, const char **params)
878 for (i = 0; i < command->argc - 1; i++)
879 params[i] = getVariable(st, command->argv[i + 1]);
883 * Recursive evaluation of an expression in a pgbench script
884 * using the current state of variables.
885 * Returns whether the evaluation was ok,
886 * the value itself is returned through the retval pointer.
889 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
893 case ENODE_INTEGER_CONSTANT:
895 *retval = expr->u.integer_constant.ival;
903 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
905 fprintf(stderr, "undefined variable %s\n",
906 expr->u.variable.varname);
909 *retval = strtoint64(var);
918 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
920 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
922 switch (expr->u.operator.operator)
925 *retval = lval + rval;
929 *retval = lval - rval;
933 *retval = lval * rval;
939 fprintf(stderr, "division by zero\n");
942 *retval = lval / rval;
948 fprintf(stderr, "division by zero\n");
951 *retval = lval % rval;
955 fprintf(stderr, "bad operator\n");
963 fprintf(stderr, "bad expression\n");
968 * Run a shell command. The result is assigned to the variable if not NULL.
969 * Return true if succeeded, or false on error.
972 runShellCommand(CState *st, char *variable, char **argv, int argc)
974 char command[SHELL_COMMAND_SIZE];
983 * Join arguments with whitespace separators. Arguments starting with
984 * exactly one colon are treated as variables:
985 * name - append a string "name"
986 * :var - append a variable named 'var'
987 * ::name - append a string ":name"
990 for (i = 0; i < argc; i++)
995 if (argv[i][0] != ':')
997 arg = argv[i]; /* a string literal */
999 else if (argv[i][1] == ':')
1001 arg = argv[i] + 1; /* a string literal starting with colons */
1003 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1005 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
1009 arglen = strlen(arg);
1010 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1012 fprintf(stderr, "%s: too long shell command\n", argv[0]);
1017 command[len++] = ' ';
1018 memcpy(command + len, arg, arglen);
1022 command[len] = '\0';
1024 /* Fast path for non-assignment case */
1025 if (variable == NULL)
1027 if (system(command))
1029 if (!timer_exceeded)
1030 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
1036 /* Execute the command with pipe and read the standard output. */
1037 if ((fp = popen(command, "r")) == NULL)
1039 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
1042 if (fgets(res, sizeof(res), fp) == NULL)
1044 if (!timer_exceeded)
1045 fprintf(stderr, "%s: cannot read the result\n", argv[0]);
1051 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
1055 /* Check whether the result is an integer and assign it to the variable */
1056 retval = (int) strtol(res, &endptr, 10);
1057 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1059 if (*res == '\0' || *endptr != '\0')
1061 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
1064 snprintf(res, sizeof(res), "%d", retval);
1065 if (!putVariable(st, "setshell", variable, res))
1069 printf("shell parameter name: %s, value: %s\n", argv[1], res);
1074 #define MAX_PREPARE_NAME 32
1076 preparedStatementName(char *buffer, int file, int state)
1078 sprintf(buffer, "P%d_%d", file, state);
1082 clientDone(CState *st, bool ok)
1084 (void) ok; /* unused */
1086 if (st->con != NULL)
1091 return false; /* always false */
1096 agg_vals_init(AggVals *aggs, instr_time start)
1098 /* basic counters */
1099 aggs->cnt = 0; /* number of transactions (includes skipped) */
1100 aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
1102 aggs->sum_latency = 0; /* SUM(latency) */
1103 aggs->sum2_latency = 0; /* SUM(latency*latency) */
1105 /* min and max transaction duration */
1106 aggs->min_latency = 0;
1107 aggs->max_latency = 0;
1109 /* schedule lag counters */
1115 /* start of the current interval */
1116 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1119 /* return false iff client should be disconnected */
1121 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1125 bool trans_needs_throttle = false;
1129 * gettimeofday() isn't free, so we get the current timestamp lazily the
1130 * first time it's needed, and reuse the same value throughout this
1131 * function after that. This also ensures that e.g. the calculated latency
1132 * reported in the log file and in the totals are the same. Zero means
1135 INSTR_TIME_SET_ZERO(now);
1138 commands = sql_files[st->use_file];
1141 * Handle throttling once per transaction by sleeping. It is simpler to
1142 * do this here rather than at the end, because so much complicated logic
1143 * happens below when statements finish.
1145 if (throttle_delay && !st->is_throttled)
1148 * Generate a delay such that the series of delays will approximate a
1149 * Poisson distribution centered on the throttle_delay time.
1151 * If transactions are too slow or a given wait is shorter than a
1152 * transaction, the next transaction will start right away.
1154 int64 wait = getPoissonRand(thread, throttle_delay);
1156 thread->throttle_trigger += wait;
1157 st->txn_scheduled = thread->throttle_trigger;
1160 * If this --latency-limit is used, and this slot is already late so
1161 * that the transaction will miss the latency limit even if it
1162 * completed immediately, we skip this time slot and iterate till the
1163 * next slot that isn't late yet.
1169 if (INSTR_TIME_IS_ZERO(now))
1170 INSTR_TIME_SET_CURRENT(now);
1171 now_us = INSTR_TIME_GET_MICROSEC(now);
1172 while (thread->throttle_trigger < now_us - latency_limit)
1174 thread->throttle_latency_skipped++;
1177 doLog(thread, st, logfile, &now, agg, true);
1179 wait = getPoissonRand(thread, throttle_delay);
1180 thread->throttle_trigger += wait;
1181 st->txn_scheduled = thread->throttle_trigger;
1186 st->throttling = true;
1187 st->is_throttled = true;
1189 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1194 { /* are we sleeping? */
1197 if (INSTR_TIME_IS_ZERO(now))
1198 INSTR_TIME_SET_CURRENT(now);
1199 now_us = INSTR_TIME_GET_MICROSEC(now);
1200 if (st->txn_scheduled <= now_us)
1202 st->sleeping = 0; /* Done sleeping, go ahead with next command */
1205 /* Measure lag of throttled transaction relative to target */
1206 int64 lag = now_us - st->txn_scheduled;
1208 thread->throttle_lag += lag;
1209 if (lag > thread->throttle_lag_max)
1210 thread->throttle_lag_max = lag;
1211 st->throttling = false;
1215 return true; /* Still sleeping, nothing to do here */
1219 { /* are we receiver? */
1220 if (commands[st->state]->type == SQL_COMMAND)
1223 fprintf(stderr, "client %d receiving\n", st->id);
1224 if (!PQconsumeInput(st->con))
1225 { /* there's something wrong */
1226 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
1227 return clientDone(st, false);
1229 if (PQisBusy(st->con))
1230 return true; /* don't have the whole result yet */
1234 * command finished: accumulate per-command execution times in
1235 * thread-local data structure, if per-command latencies are requested
1239 int cnum = commands[st->state]->command_num;
1241 if (INSTR_TIME_IS_ZERO(now))
1242 INSTR_TIME_SET_CURRENT(now);
1243 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1244 now, st->stmt_begin);
1245 thread->exec_count[cnum]++;
1248 /* transaction finished: calculate latency and log the transaction */
1249 if (commands[st->state + 1] == NULL)
1251 /* only calculate latency if an option is used that needs it */
1252 if (progress || throttle_delay || latency_limit)
1256 if (INSTR_TIME_IS_ZERO(now))
1257 INSTR_TIME_SET_CURRENT(now);
1259 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1261 st->txn_latencies += latency;
1264 * XXX In a long benchmark run of high-latency transactions,
1265 * this int64 addition eventually overflows. For example, 100
1266 * threads running 10s transactions will overflow it in 2.56
1267 * hours. With a more-typical OLTP workload of .1s
1268 * transactions, overflow would take 256 hours.
1270 st->txn_sqlats += latency * latency;
1272 /* record over the limit transactions if needed. */
1273 if (latency_limit && latency > latency_limit)
1274 thread->latency_late++;
1277 /* record the time it took in the log */
1279 doLog(thread, st, logfile, &now, agg, false);
1282 if (commands[st->state]->type == SQL_COMMAND)
1285 * Read and discard the query result; note this is not included in
1286 * the statement latency numbers.
1288 res = PQgetResult(st->con);
1289 switch (PQresultStatus(res))
1291 case PGRES_COMMAND_OK:
1292 case PGRES_TUPLES_OK:
1295 fprintf(stderr, "Client %d aborted in state %d: %s",
1296 st->id, st->state, PQerrorMessage(st->con));
1298 return clientDone(st, false);
1301 discard_response(st);
1304 if (commands[st->state + 1] == NULL)
1313 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1314 return clientDone(st, true); /* exit success */
1317 /* increment state counter */
1319 if (commands[st->state] == NULL)
1322 st->use_file = (int) getrand(thread, 0, num_files - 1);
1323 commands = sql_files[st->use_file];
1324 st->is_throttled = false;
1327 * No transaction is underway anymore, which means there is
1328 * nothing to listen to right now. When throttling rate limits
1329 * are active, a sleep will happen next, as the next transaction
1330 * starts. And then in any case the next SQL command will set
1334 trans_needs_throttle = (throttle_delay > 0);
1338 if (st->con == NULL)
1343 INSTR_TIME_SET_CURRENT(start);
1344 if ((st->con = doConnect()) == NULL)
1346 fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1347 return clientDone(st, false);
1349 INSTR_TIME_SET_CURRENT(end);
1350 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1354 * This ensures that a throttling delay is inserted before proceeding with
1355 * sql commands, after the first transaction. The first transaction
1356 * throttling is performed when first entering doCustom.
1358 if (trans_needs_throttle)
1360 trans_needs_throttle = false;
1364 /* Record transaction start time under logging, progress or throttling */
1365 if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1367 INSTR_TIME_SET_CURRENT(st->txn_begin);
1370 * When not throttling, this is also the transaction's scheduled start
1373 if (!throttle_delay)
1374 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1377 /* Record statement start time if per-command latencies are requested */
1379 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1381 if (commands[st->state]->type == SQL_COMMAND)
1383 const Command *command = commands[st->state];
1386 if (querymode == QUERY_SIMPLE)
1390 sql = pg_strdup(command->argv[0]);
1391 sql = assignVariables(st, sql);
1394 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1395 r = PQsendQuery(st->con, sql);
1398 else if (querymode == QUERY_EXTENDED)
1400 const char *sql = command->argv[0];
1401 const char *params[MAX_ARGS];
1403 getQueryParams(st, command, params);
1406 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1407 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1408 NULL, params, NULL, NULL, 0);
1410 else if (querymode == QUERY_PREPARED)
1412 char name[MAX_PREPARE_NAME];
1413 const char *params[MAX_ARGS];
1415 if (!st->prepared[st->use_file])
1419 for (j = 0; commands[j] != NULL; j++)
1422 char name[MAX_PREPARE_NAME];
1424 if (commands[j]->type != SQL_COMMAND)
1426 preparedStatementName(name, st->use_file, j);
1427 res = PQprepare(st->con, name,
1428 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1429 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1430 fprintf(stderr, "%s", PQerrorMessage(st->con));
1433 st->prepared[st->use_file] = true;
1436 getQueryParams(st, command, params);
1437 preparedStatementName(name, st->use_file, st->state);
1440 fprintf(stderr, "client %d sending %s\n", st->id, name);
1441 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1442 params, NULL, NULL, 0);
1444 else /* unknown sql mode */
1450 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1454 st->listen = 1; /* flags that should be listened */
1456 else if (commands[st->state]->type == META_COMMAND)
1458 int argc = commands[st->state]->argc,
1460 char **argv = commands[st->state]->argv;
1464 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1465 for (i = 1; i < argc; i++)
1466 fprintf(stderr, " %s", argv[i]);
1467 fprintf(stderr, "\n");
1470 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1475 double threshold = 0;
1478 if (*argv[2] == ':')
1480 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1482 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1486 min = strtoint64(var);
1489 min = strtoint64(argv[2]);
1494 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1500 if (*argv[3] == ':')
1502 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1504 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1508 max = strtoint64(var);
1511 max = strtoint64(argv[3]);
1515 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1521 * Generate random number functions need to be able to subtract
1522 * max from min and add one to the result without overflowing.
1523 * Since we know max > min, we can detect overflow just by checking
1524 * for a negative result. But we must check both that the subtraction
1525 * doesn't overflow, and that adding one to the result doesn't overflow either.
1527 if (max - min < 0 || (max - min) + 1 < 0)
1529 fprintf(stderr, "%s: range too large\n", argv[0]);
1534 if (argc == 4 || /* uniform without or with "uniform" keyword */
1535 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1538 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1540 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1542 else if (argc == 6 &&
1543 ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1544 (pg_strcasecmp(argv[4], "exponential") == 0)))
1546 if (*argv[5] == ':')
1548 if ((var = getVariable(st, argv[5] + 1)) == NULL)
1550 fprintf(stderr, "%s: invalid threshold number %s\n", argv[0], argv[5]);
1554 threshold = strtod(var, NULL);
1557 threshold = strtod(argv[5], NULL);
1559 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1561 if (threshold < MIN_GAUSSIAN_THRESHOLD)
1563 fprintf(stderr, "%s: gaussian threshold must be at least %f\n,", argv[5], MIN_GAUSSIAN_THRESHOLD);
1568 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1570 snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1572 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1574 if (threshold <= 0.0)
1576 fprintf(stderr, "%s: exponential threshold must be strictly positive\n,", argv[5]);
1581 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1583 snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1586 else /* this means an error somewhere in the parsing phase... */
1588 fprintf(stderr, "%s: unexpected arguments\n", argv[0]);
1593 if (!putVariable(st, argv[0], argv[1], res))
1601 else if (pg_strcasecmp(argv[0], "set") == 0)
1604 PgBenchExpr *expr = commands[st->state]->expr;
1607 if (!evaluateExpr(st, expr, &result))
1612 sprintf(res, INT64_FORMAT, result);
1614 if (!putVariable(st, argv[0], argv[1], res))
1622 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1628 if (*argv[1] == ':')
1630 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1632 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1639 usec = atoi(argv[1]);
1643 if (pg_strcasecmp(argv[2], "ms") == 0)
1645 else if (pg_strcasecmp(argv[2], "s") == 0)
1651 INSTR_TIME_SET_CURRENT(now);
1652 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1657 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1659 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1661 if (timer_exceeded) /* timeout */
1662 return clientDone(st, true);
1663 else if (!ret) /* on error */
1668 else /* succeeded */
1671 else if (pg_strcasecmp(argv[0], "shell") == 0)
1673 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1675 if (timer_exceeded) /* timeout */
1676 return clientDone(st, true);
1677 else if (!ret) /* on error */
1682 else /* succeeded */
1692 * print log entry after completing one transaction.
1695 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1702 * Skip the log entry if sampling is enabled and this row doesn't belong
1703 * to the random sample.
1705 if (sample_rate != 0.0 &&
1706 pg_erand48(thread->random_state) > sample_rate)
1709 if (INSTR_TIME_IS_ZERO(*now))
1710 INSTR_TIME_SET_CURRENT(*now);
1712 latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1716 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1718 /* should we aggregate the results or not? */
1719 if (agg_interval > 0)
1722 * Are we still in the same interval? If yes, accumulate the values
1723 * (print them otherwise)
1725 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1730 /* there is no latency to record if the transaction was skipped */
1735 agg->sum_latency += latency;
1736 agg->sum2_latency += latency * latency;
1738 /* first in this aggregation interval */
1739 if ((agg->cnt == 1) || (latency < agg->min_latency))
1740 agg->min_latency = latency;
1742 if ((agg->cnt == 1) || (latency > agg->max_latency))
1743 agg->max_latency = latency;
1745 /* and the same for schedule lag */
1748 agg->sum_lag += lag;
1749 agg->sum2_lag += lag * lag;
1751 if ((agg->cnt == 1) || (lag < agg->min_lag))
1753 if ((agg->cnt == 1) || (lag > agg->max_lag))
1761 * Loop until we reach the interval of the current transaction
1762 * (and print all the empty intervals in between).
1764 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1767 * This is a non-Windows branch (thanks to the
1768 * ifdef in usage), so we don't need to handle
1769 * this in a special way (see below).
1771 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1780 fprintf(logfile, " %.0f %.0f %.0f %.0f",
1786 fprintf(logfile, " %d", agg->skipped);
1788 fputc('\n', logfile);
1790 /* move to the next inteval */
1791 agg->start_time = agg->start_time + agg_interval;
1793 /* reset for "no transaction" intervals */
1796 agg->min_latency = 0;
1797 agg->max_latency = 0;
1798 agg->sum_latency = 0;
1799 agg->sum2_latency = 0;
1806 /* reset the values to include only the current transaction. */
1808 agg->skipped = skipped ? 1 : 0;
1809 agg->min_latency = latency;
1810 agg->max_latency = latency;
1811 agg->sum_latency = skipped ? 0.0 : latency;
1812 agg->sum2_latency = skipped ? 0.0 : latency * latency;
1816 agg->sum2_lag = lag * lag;
1821 /* no, print raw transactions */
1824 /* This is more than we really ought to know about instr_time */
1826 fprintf(logfile, "%d %d skipped %d %ld %ld",
1827 st->id, st->cnt, st->use_file,
1828 (long) now->tv_sec, (long) now->tv_usec);
1830 fprintf(logfile, "%d %d %.0f %d %ld %ld",
1831 st->id, st->cnt, latency, st->use_file,
1832 (long) now->tv_sec, (long) now->tv_usec);
1835 /* On Windows, instr_time doesn't provide a timestamp anyway */
1837 fprintf(logfile, "%d %d skipped %d 0 0",
1838 st->id, st->cnt, st->use_file);
1840 fprintf(logfile, "%d %d %.0f %d 0 0",
1841 st->id, st->cnt, latency, st->use_file);
1844 fprintf(logfile, " %.0f", lag);
1845 fputc('\n', logfile);
1849 /* discard connections */
1851 disconnect_all(CState *state, int length)
1855 for (i = 0; i < length; i++)
1859 PQfinish(state[i].con);
1860 state[i].con = NULL;
1865 /* create tables and setup data */
1867 init(bool is_no_vacuum)
1870 * The scale factor at/beyond which 32-bit integers are insufficient for
1871 * storing TPC-B account IDs.
1873 * Although the actual threshold is 21474, we use 20000 because it is easier to
1874 * document and remember, and isn't that far away from the real threshold.
1876 #define SCALE_32BIT_THRESHOLD 20000
1879 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1880 * fields in these table declarations were intended to comply with that.
1881 * The pgbench_accounts table complies with that because the "filler"
1882 * column is set to blank-padded empty string. But for all other tables
1883 * the columns default to NULL and so don't actually take any space. We
1884 * could fix that by giving them non-null default values. However, that
1885 * would completely break comparability of pgbench results with prior
1886 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1887 * anyway, we stick with the historical behavior.
1891 const char *table; /* table name */
1892 const char *smcols; /* column decls if accountIDs are 32 bits */
1893 const char *bigcols; /* column decls if accountIDs are 64 bits */
1894 int declare_fillfactor;
1896 static const struct ddlinfo DDLs[] = {
1899 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1900 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1905 "tid int not null,bid int,tbalance int,filler char(84)",
1906 "tid int not null,bid int,tbalance int,filler char(84)",
1911 "aid int not null,bid int,abalance int,filler char(84)",
1912 "aid bigint not null,bid int,abalance int,filler char(84)",
1917 "bid int not null,bbalance int,filler char(88)",
1918 "bid int not null,bbalance int,filler char(88)",
1922 static const char *const DDLINDEXes[] = {
1923 "alter table pgbench_branches add primary key (bid)",
1924 "alter table pgbench_tellers add primary key (tid)",
1925 "alter table pgbench_accounts add primary key (aid)"
1927 static const char *const DDLKEYs[] = {
1928 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1929 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1930 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1931 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1932 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1941 /* used to track elapsed time and estimate of the remaining time */
1946 int log_interval = 1;
1948 if ((con = doConnect()) == NULL)
1951 for (i = 0; i < lengthof(DDLs); i++)
1955 const struct ddlinfo *ddl = &DDLs[i];
1958 /* Remove old table, if it exists. */
1959 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1960 executeStatement(con, buffer);
1962 /* Construct new create table statement. */
1964 if (ddl->declare_fillfactor)
1965 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1966 " with (fillfactor=%d)", fillfactor);
1967 if (tablespace != NULL)
1969 char *escape_tablespace;
1971 escape_tablespace = PQescapeIdentifier(con, tablespace,
1972 strlen(tablespace));
1973 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1974 " tablespace %s", escape_tablespace);
1975 PQfreemem(escape_tablespace);
1978 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1980 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1981 unlogged_tables ? " unlogged" : "",
1982 ddl->table, cols, opts);
1984 executeStatement(con, buffer);
1987 executeStatement(con, "begin");
1989 for (i = 0; i < nbranches * scale; i++)
1991 /* "filler" column defaults to NULL */
1992 snprintf(sql, sizeof(sql),
1993 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
1995 executeStatement(con, sql);
1998 for (i = 0; i < ntellers * scale; i++)
2000 /* "filler" column defaults to NULL */
2001 snprintf(sql, sizeof(sql),
2002 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2003 i + 1, i / ntellers + 1);
2004 executeStatement(con, sql);
2007 executeStatement(con, "commit");
2010 * fill the pgbench_accounts table with some data
2012 fprintf(stderr, "creating tables...\n");
2014 executeStatement(con, "begin");
2015 executeStatement(con, "truncate pgbench_accounts");
2017 res = PQexec(con, "copy pgbench_accounts from stdin");
2018 if (PQresultStatus(res) != PGRES_COPY_IN)
2020 fprintf(stderr, "%s", PQerrorMessage(con));
2025 INSTR_TIME_SET_CURRENT(start);
2027 for (k = 0; k < (int64) naccounts * scale; k++)
2031 /* "filler" column defaults to blank padded empty string */
2032 snprintf(sql, sizeof(sql),
2033 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2034 j, k / naccounts + 1, 0);
2035 if (PQputline(con, sql))
2037 fprintf(stderr, "PQputline failed\n");
2042 * If we want to stick with the original logging, print a message each
2043 * 100k inserted rows.
2045 if ((!use_quiet) && (j % 100000 == 0))
2047 INSTR_TIME_SET_CURRENT(diff);
2048 INSTR_TIME_SUBTRACT(diff, start);
2050 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2051 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2053 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2054 j, (int64) naccounts * scale,
2055 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2056 elapsed_sec, remaining_sec);
2058 /* let's not call the timing for each row, but only each 100 rows */
2059 else if (use_quiet && (j % 100 == 0))
2061 INSTR_TIME_SET_CURRENT(diff);
2062 INSTR_TIME_SUBTRACT(diff, start);
2064 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2065 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2067 /* have we reached the next interval (or end)? */
2068 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2070 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2071 j, (int64) naccounts * scale,
2072 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2074 /* skip to the next interval */
2075 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2080 if (PQputline(con, "\\.\n"))
2082 fprintf(stderr, "very last PQputline failed\n");
2087 fprintf(stderr, "PQendcopy failed\n");
2090 executeStatement(con, "commit");
2095 fprintf(stderr, "vacuum...\n");
2096 executeStatement(con, "vacuum analyze pgbench_branches");
2097 executeStatement(con, "vacuum analyze pgbench_tellers");
2098 executeStatement(con, "vacuum analyze pgbench_accounts");
2099 executeStatement(con, "vacuum analyze pgbench_history");
2105 fprintf(stderr, "set primary keys...\n");
2106 for (i = 0; i < lengthof(DDLINDEXes); i++)
2110 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2112 if (index_tablespace != NULL)
2114 char *escape_tablespace;
2116 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2117 strlen(index_tablespace));
2118 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2119 " using index tablespace %s", escape_tablespace);
2120 PQfreemem(escape_tablespace);
2123 executeStatement(con, buffer);
2127 * create foreign keys
2131 fprintf(stderr, "set foreign keys...\n");
2132 for (i = 0; i < lengthof(DDLKEYs); i++)
2134 executeStatement(con, DDLKEYs[i]);
2138 fprintf(stderr, "done.\n");
2143 * Parse the raw sql and replace :param to $n.
2146 parseQuery(Command *cmd, const char *raw_sql)
2151 sql = pg_strdup(raw_sql);
2155 while ((p = strchr(p, ':')) != NULL)
2161 name = parseVariable(p, &eaten);
2171 if (cmd->argc >= MAX_ARGS)
2173 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2178 sprintf(var, "$%d", cmd->argc);
2179 p = replaceVariable(&sql, p, eaten, var);
2181 cmd->argv[cmd->argc] = name;
2190 syntax_error(const char *source, const int lineno,
2191 const char *line, const char *command,
2192 const char *msg, const char *more, const int column)
2194 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2196 fprintf(stderr, " (%s)", more);
2198 fprintf(stderr, " at column %d", column);
2199 fprintf(stderr, " in command \"%s\"\n", command);
2202 fprintf(stderr, "%s\n", line);
2207 for (i = 0; i < column - 1; i++)
2208 fprintf(stderr, " ");
2209 fprintf(stderr, "^ error found here\n");
2215 /* Parse a command; return a Command struct, or NULL if it's a comment */
2217 process_commands(char *buf, const char *source, const int lineno)
2219 const char delim[] = " \f\n\r\t\v";
2221 Command *my_commands;
2226 /* Make the string buf end at the next newline */
2227 if ((p = strchr(buf, '\n')) != NULL)
2230 /* Skip leading whitespace */
2232 while (isspace((unsigned char) *p))
2235 /* If the line is empty or actually a comment, we're done */
2236 if (*p == '\0' || strncmp(p, "--", 2) == 0)
2239 /* Allocate and initialize Command structure */
2240 my_commands = (Command *) pg_malloc(sizeof(Command));
2241 my_commands->line = pg_strdup(buf);
2242 my_commands->command_num = num_commands++;
2243 my_commands->type = 0; /* until set */
2244 my_commands->argc = 0;
2249 my_commands->type = META_COMMAND;
2252 tok = strtok(++p, delim);
2254 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2259 my_commands->cols[j] = tok - buf + 1;
2260 my_commands->argv[j++] = pg_strdup(tok);
2261 my_commands->argc++;
2262 if (max_args >= 0 && my_commands->argc >= max_args)
2263 tok = strtok(NULL, "");
2265 tok = strtok(NULL, delim);
2268 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2271 * \setrandom variable min max [uniform]
2272 * \setrandom variable min max (gaussian|exponential) threshold
2275 if (my_commands->argc < 4)
2277 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2278 "missing arguments", NULL, -1);
2283 if (my_commands->argc == 4 || /* uniform without/with "uniform" keyword */
2284 (my_commands->argc == 5 &&
2285 pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2289 else if (/* argc >= 5 */
2290 (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2291 (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2293 if (my_commands->argc < 6)
2295 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2296 "missing threshold argument", my_commands->argv[4], -1);
2298 else if (my_commands->argc > 6)
2300 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2301 "too many arguments", my_commands->argv[4],
2302 my_commands->cols[6]);
2305 else /* cannot parse, unexpected arguments */
2307 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2308 "unexpected argument", my_commands->argv[4],
2309 my_commands->cols[4]);
2312 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2314 if (my_commands->argc < 3)
2316 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2317 "missing argument", NULL, -1);
2320 expr_scanner_init(my_commands->argv[2], source, lineno,
2321 my_commands->line, my_commands->argv[0],
2322 my_commands->cols[2] - 1);
2324 if (expr_yyparse() != 0)
2326 /* dead code: exit done from syntax_error called by yyerror */
2330 my_commands->expr = expr_parse_result;
2332 expr_scanner_finish();
2334 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2336 if (my_commands->argc < 2)
2338 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2339 "missing argument", NULL, -1);
2343 * Split argument into number and unit to allow "sleep 1ms" etc.
2344 * We don't have to terminate the number argument with null
2345 * because it will be parsed with atoi, which ignores trailing
2346 * non-digit characters.
2348 if (my_commands->argv[1][0] != ':')
2350 char *c = my_commands->argv[1];
2352 while (isdigit((unsigned char) *c))
2356 my_commands->argv[2] = c;
2357 if (my_commands->argc < 3)
2358 my_commands->argc = 3;
2362 if (my_commands->argc >= 3)
2364 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2365 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2366 pg_strcasecmp(my_commands->argv[2], "s") != 0)
2368 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2369 "unknown time unit, must be us, ms or s",
2370 my_commands->argv[2], my_commands->cols[2]);
2374 /* this should be an error?! */
2375 for (j = 3; j < my_commands->argc; j++)
2376 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2377 my_commands->argv[0], my_commands->argv[j]);
2379 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2381 if (my_commands->argc < 3)
2383 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2384 "missing argument", NULL, -1);
2387 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2389 if (my_commands->argc < 1)
2391 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2392 "missing command", NULL, -1);
2397 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2398 "invalid command", NULL, -1);
2403 my_commands->type = SQL_COMMAND;
2408 my_commands->argv[0] = pg_strdup(p);
2409 my_commands->argc++;
2411 case QUERY_EXTENDED:
2412 case QUERY_PREPARED:
2413 if (!parseQuery(my_commands, p))
2425 * Read a line from fd, and return it in a malloc'd buffer.
2426 * Return NULL at EOF.
2428 * The buffer will typically be larger than necessary, but we don't care
2429 * in this program, because we'll free it as soon as we've parsed the line.
2432 read_line_from_file(FILE *fd)
2434 char tmpbuf[BUFSIZ];
2436 size_t buflen = BUFSIZ;
2439 buf = (char *) palloc(buflen);
2442 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2444 size_t thislen = strlen(tmpbuf);
2446 /* Append tmpbuf to whatever we had already */
2447 memcpy(buf + used, tmpbuf, thislen + 1);
2450 /* Done if we collected a newline */
2451 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2454 /* Else, enlarge buf to ensure we can append next bufferload */
2456 buf = (char *) pg_realloc(buf, buflen);
2468 process_file(char *filename)
2470 #define COMMANDS_ALLOC_NUM 128
2472 Command **my_commands;
2478 if (num_files >= MAX_FILES)
2480 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
2484 alloc_num = COMMANDS_ALLOC_NUM;
2485 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2487 if (strcmp(filename, "-") == 0)
2489 else if ((fd = fopen(filename, "r")) == NULL)
2491 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
2492 pg_free(my_commands);
2499 while ((buf = read_line_from_file(fd)) != NULL)
2504 command = process_commands(buf, filename, lineno);
2508 if (command == NULL)
2511 my_commands[index] = command;
2514 if (index >= alloc_num)
2516 alloc_num += COMMANDS_ALLOC_NUM;
2517 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2522 my_commands[index] = NULL;
2524 sql_files[num_files++] = my_commands;
2530 process_builtin(char *tb, const char *source)
2532 #define COMMANDS_ALLOC_NUM 128
2534 Command **my_commands;
2539 alloc_num = COMMANDS_ALLOC_NUM;
2540 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2551 while (*tb && *tb != '\n')
2564 command = process_commands(buf, source, lineno);
2565 if (command == NULL)
2568 my_commands[index] = command;
2571 if (index >= alloc_num)
2573 alloc_num += COMMANDS_ALLOC_NUM;
2574 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2578 my_commands[index] = NULL;
2583 /* print out results */
2585 printResults(int ttype, int64 normal_xacts, int nclients,
2586 TState *threads, int nthreads,
2587 instr_time total_time, instr_time conn_total_time,
2588 int64 total_latencies, int64 total_sqlats,
2589 int64 throttle_lag, int64 throttle_lag_max,
2590 int64 throttle_latency_skipped, int64 latency_late)
2592 double time_include,
2597 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2598 tps_include = normal_xacts / time_include;
2599 tps_exclude = normal_xacts / (time_include -
2600 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2603 s = "TPC-B (sort of)";
2604 else if (ttype == 2)
2605 s = "Update only pgbench_accounts";
2606 else if (ttype == 1)
2611 printf("transaction type: %s\n", s);
2612 printf("scaling factor: %d\n", scale);
2613 printf("query mode: %s\n", QUERYMODE[querymode]);
2614 printf("number of clients: %d\n", nclients);
2615 printf("number of threads: %d\n", nthreads);
2618 printf("number of transactions per client: %d\n", nxacts);
2619 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2620 normal_xacts, (int64) nxacts * nclients);
2624 printf("duration: %d s\n", duration);
2625 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2629 /* Remaining stats are nonsensical if we failed to execute any xacts */
2630 if (normal_xacts <= 0)
2633 if (throttle_delay && latency_limit)
2634 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2635 throttle_latency_skipped,
2636 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2639 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2640 latency_limit / 1000.0, latency_late,
2641 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2643 if (throttle_delay || progress || latency_limit)
2645 /* compute and show latency average and standard deviation */
2646 double latency = 0.001 * total_latencies / normal_xacts;
2647 double sqlat = (double) total_sqlats / normal_xacts;
2649 printf("latency average: %.3f ms\n"
2650 "latency stddev: %.3f ms\n",
2651 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2655 /* only an average latency computed from the duration is available */
2656 printf("latency average: %.3f ms\n",
2657 1000.0 * duration * nclients / normal_xacts);
2663 * Report average transaction lag under rate limit throttling. This
2664 * is the delay between scheduled and actual start times for the
2665 * transaction. The measured lag may be caused by thread/client load,
2666 * the database load, or the Poisson throttling process.
2668 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2669 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2672 printf("tps = %f (including connections establishing)\n", tps_include);
2673 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2675 /* Report per-command latencies */
2680 for (i = 0; i < num_files; i++)
2685 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2687 printf("statement latencies in milliseconds:\n");
2689 for (commands = sql_files[i]; *commands != NULL; commands++)
2691 Command *command = *commands;
2692 int cnum = command->command_num;
2694 instr_time total_exec_elapsed;
2695 int total_exec_count;
2698 /* Accumulate per-thread data for command */
2699 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2700 total_exec_count = 0;
2701 for (t = 0; t < nthreads; t++)
2703 TState *thread = &threads[t];
2705 INSTR_TIME_ADD(total_exec_elapsed,
2706 thread->exec_elapsed[cnum]);
2707 total_exec_count += thread->exec_count[cnum];
2710 if (total_exec_count > 0)
2711 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2715 printf("\t%f\t%s\n", total_time, command->line);
2723 main(int argc, char **argv)
2725 static struct option long_options[] = {
2726 /* systematic long/short named options */
2727 {"client", required_argument, NULL, 'c'},
2728 {"connect", no_argument, NULL, 'C'},
2729 {"debug", no_argument, NULL, 'd'},
2730 {"define", required_argument, NULL, 'D'},
2731 {"file", required_argument, NULL, 'f'},
2732 {"fillfactor", required_argument, NULL, 'F'},
2733 {"host", required_argument, NULL, 'h'},
2734 {"initialize", no_argument, NULL, 'i'},
2735 {"jobs", required_argument, NULL, 'j'},
2736 {"log", no_argument, NULL, 'l'},
2737 {"no-vacuum", no_argument, NULL, 'n'},
2738 {"port", required_argument, NULL, 'p'},
2739 {"progress", required_argument, NULL, 'P'},
2740 {"protocol", required_argument, NULL, 'M'},
2741 {"quiet", no_argument, NULL, 'q'},
2742 {"report-latencies", no_argument, NULL, 'r'},
2743 {"scale", required_argument, NULL, 's'},
2744 {"select-only", no_argument, NULL, 'S'},
2745 {"skip-some-updates", no_argument, NULL, 'N'},
2746 {"time", required_argument, NULL, 'T'},
2747 {"transactions", required_argument, NULL, 't'},
2748 {"username", required_argument, NULL, 'U'},
2749 {"vacuum-all", no_argument, NULL, 'v'},
2750 /* long-named only options */
2751 {"foreign-keys", no_argument, &foreign_keys, 1},
2752 {"index-tablespace", required_argument, NULL, 3},
2753 {"tablespace", required_argument, NULL, 2},
2754 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2755 {"sampling-rate", required_argument, NULL, 4},
2756 {"aggregate-interval", required_argument, NULL, 5},
2757 {"rate", required_argument, NULL, 'R'},
2758 {"latency-limit", required_argument, NULL, 'L'},
2763 int nclients = 1; /* default number of simulated clients */
2764 int nthreads = 1; /* default number of threads */
2765 int is_init_mode = 0; /* initialize mode? */
2766 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2767 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2768 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2769 * 2: skip update of branches and tellers */
2771 char *filename = NULL;
2772 bool scale_given = false;
2774 bool benchmarking_option_set = false;
2775 bool initialization_option_set = false;
2777 CState *state; /* status of clients */
2778 TState *threads; /* array of thread */
2780 instr_time start_time; /* start up time */
2781 instr_time total_time;
2782 instr_time conn_total_time;
2783 int64 total_xacts = 0;
2784 int64 total_latencies = 0;
2785 int64 total_sqlats = 0;
2786 int64 throttle_lag = 0;
2787 int64 throttle_lag_max = 0;
2788 int64 throttle_latency_skipped = 0;
2789 int64 latency_late = 0;
2793 #ifdef HAVE_GETRLIMIT
2803 progname = get_progname(argv[0]);
2807 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2812 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2814 puts("pgbench (PostgreSQL) " PG_VERSION);
2820 /* stderr is buffered on Win32. */
2821 setvbuf(stderr, NULL, _IONBF, 0);
2824 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2826 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2828 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2831 state = (CState *) pg_malloc(sizeof(CState));
2832 memset(state, 0, sizeof(CState));
2834 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2842 pghost = pg_strdup(optarg);
2848 do_vacuum_accounts++;
2851 pgport = pg_strdup(optarg);
2858 benchmarking_option_set = true;
2862 benchmarking_option_set = true;
2865 benchmarking_option_set = true;
2866 nclients = atoi(optarg);
2867 if (nclients <= 0 || nclients > MAXCLIENTS)
2869 fprintf(stderr, "invalid number of clients: %d\n", nclients);
2872 #ifdef HAVE_GETRLIMIT
2873 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2874 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2875 #else /* but BSD doesn't ... */
2876 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2877 #endif /* RLIMIT_NOFILE */
2879 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2882 if (rlim.rlim_cur <= (nclients + 2))
2884 fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2885 fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2888 #endif /* HAVE_GETRLIMIT */
2890 case 'j': /* jobs */
2891 benchmarking_option_set = true;
2892 nthreads = atoi(optarg);
2895 fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2900 benchmarking_option_set = true;
2904 benchmarking_option_set = true;
2905 is_latencies = true;
2909 scale = atoi(optarg);
2912 fprintf(stderr, "invalid scaling factor: %d\n", scale);
2917 benchmarking_option_set = true;
2920 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2923 nxacts = atoi(optarg);
2926 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2931 benchmarking_option_set = true;
2934 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2937 duration = atoi(optarg);
2940 fprintf(stderr, "invalid duration: %d\n", duration);
2945 login = pg_strdup(optarg);
2948 benchmarking_option_set = true;
2952 initialization_option_set = true;
2956 benchmarking_option_set = true;
2958 filename = pg_strdup(optarg);
2959 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2966 benchmarking_option_set = true;
2968 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2970 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2975 if (!putVariable(&state[0], "option", optarg, p))
2980 initialization_option_set = true;
2981 fillfactor = atoi(optarg);
2982 if ((fillfactor < 10) || (fillfactor > 100))
2984 fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2989 benchmarking_option_set = true;
2992 fprintf(stderr, "query mode (-M) should be specified before transaction scripts (-f)\n");
2995 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2996 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2998 if (querymode >= NUM_QUERYMODE)
3000 fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
3005 benchmarking_option_set = true;
3006 progress = atoi(optarg);
3010 "thread progress delay (-P) must be positive (%s)\n",
3017 /* get a double from the beginning of option value */
3018 double throttle_value = atof(optarg);
3020 benchmarking_option_set = true;
3022 if (throttle_value <= 0.0)
3024 fprintf(stderr, "invalid rate limit: %s\n", optarg);
3027 /* Invert rate limit into a time offset */
3028 throttle_delay = (int64) (1000000.0 / throttle_value);
3033 double limit_ms = atof(optarg);
3034 if (limit_ms <= 0.0)
3036 fprintf(stderr, "invalid latency limit: %s\n", optarg);
3039 benchmarking_option_set = true;
3040 latency_limit = (int64) (limit_ms * 1000);
3044 /* This covers long options which take no argument. */
3045 if (foreign_keys || unlogged_tables)
3046 initialization_option_set = true;
3048 case 2: /* tablespace */
3049 initialization_option_set = true;
3050 tablespace = pg_strdup(optarg);
3052 case 3: /* index-tablespace */
3053 initialization_option_set = true;
3054 index_tablespace = pg_strdup(optarg);
3057 benchmarking_option_set = true;
3058 sample_rate = atof(optarg);
3059 if (sample_rate <= 0.0 || sample_rate > 1.0)
3061 fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
3067 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
3070 benchmarking_option_set = true;
3071 agg_interval = atoi(optarg);
3072 if (agg_interval <= 0)
3074 fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
3080 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3086 /* compute a per thread delay */
3087 throttle_delay *= nthreads;
3090 dbName = argv[optind];
3093 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3095 else if (login != NULL && *login != '\0')
3103 if (benchmarking_option_set)
3105 fprintf(stderr, "some options cannot be used in initialization (-i) mode\n");
3114 if (initialization_option_set)
3116 fprintf(stderr, "some options cannot be used in benchmarking mode\n");
3121 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3122 if (nxacts <= 0 && duration <= 0)
3123 nxacts = DEFAULT_NXACTS;
3125 if (nclients % nthreads != 0)
3127 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
3131 /* --sampling-rate may be used only with -l */
3132 if (sample_rate > 0.0 && !use_log)
3134 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
3138 /* --sampling-rate may must not be used with --aggregate-interval */
3139 if (sample_rate > 0.0 && agg_interval > 0)
3141 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
3145 if (agg_interval > 0 && (!use_log))
3147 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3151 if ((duration > 0) && (agg_interval > duration))
3153 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
3157 if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
3159 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3164 * is_latencies only works with multiple threads in thread-based
3165 * implementations, not fork-based ones, because it supposes that the
3166 * parent can see changes made to the per-thread execution stats by child
3167 * threads. It seems useful enough to accept despite this limitation, but
3168 * perhaps we should FIXME someday (by passing the stats data back up
3169 * through the parent-to-child pipes).
3171 #ifndef ENABLE_THREAD_SAFETY
3172 if (is_latencies && nthreads > 1)
3174 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
3180 * save main process id in the global variable because process id will be
3181 * changed after fork.
3183 main_pid = (int) getpid();
3184 progress_nclients = nclients;
3185 progress_nthreads = nthreads;
3189 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3190 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3192 /* copy any -D switch values to all clients */
3193 for (i = 1; i < nclients; i++)
3198 for (j = 0; j < state[0].nvariables; j++)
3200 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3209 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3210 pghost, pgport, nclients, nxacts, dbName);
3212 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3213 pghost, pgport, nclients, duration, dbName);
3216 /* opening connection... */
3221 if (PQstatus(con) == CONNECTION_BAD)
3223 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
3224 fprintf(stderr, "%s", PQerrorMessage(con));
3231 * get the scaling factor that should be same as count(*) from
3232 * pgbench_branches if this is not a custom query
3234 res = PQexec(con, "select count(*) from pgbench_branches");
3235 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3237 fprintf(stderr, "%s", PQerrorMessage(con));
3240 scale = atoi(PQgetvalue(res, 0, 0));
3243 fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
3248 /* warn if we override user-given -s switch */
3251 "Scale option ignored, using pgbench_branches table count = %d\n",
3256 * :scale variables normally get -s or database scale, but don't override
3257 * an explicit -D switch
3259 if (getVariable(&state[0], "scale") == NULL)
3261 snprintf(val, sizeof(val), "%d", scale);
3262 for (i = 0; i < nclients; i++)
3264 if (!putVariable(&state[i], "startup", "scale", val))
3270 * Define a :client_id variable that is unique per connection. But don't
3271 * override an explicit -D switch.
3273 if (getVariable(&state[0], "client_id") == NULL)
3275 for (i = 0; i < nclients; i++)
3277 snprintf(val, sizeof(val), "%d", i);
3278 if (!putVariable(&state[i], "startup", "client_id", val))
3285 fprintf(stderr, "starting vacuum...");
3286 executeStatement(con, "vacuum pgbench_branches");
3287 executeStatement(con, "vacuum pgbench_tellers");
3288 executeStatement(con, "truncate pgbench_history");
3289 fprintf(stderr, "end.\n");
3291 if (do_vacuum_accounts)
3293 fprintf(stderr, "starting vacuum pgbench_accounts...");
3294 executeStatement(con, "vacuum analyze pgbench_accounts");
3295 fprintf(stderr, "end.\n");
3300 /* set random seed */
3301 INSTR_TIME_SET_CURRENT(start_time);
3302 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3304 /* process builtin SQL scripts */
3308 sql_files[0] = process_builtin(tpc_b,
3309 "<builtin: TPC-B (sort of)>");
3314 sql_files[0] = process_builtin(select_only,
3315 "<builtin: select only>");
3320 sql_files[0] = process_builtin(simple_update,
3321 "<builtin: simple update>");
3329 /* set up thread data structures */
3330 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3331 for (i = 0; i < nthreads; i++)
3333 TState *thread = &threads[i];
3336 thread->state = &state[nclients / nthreads * i];
3337 thread->nstate = nclients / nthreads;
3338 thread->random_state[0] = random();
3339 thread->random_state[1] = random();
3340 thread->random_state[2] = random();
3341 thread->throttle_latency_skipped = 0;
3342 thread->latency_late = 0;
3346 /* Reserve memory for the thread to store per-command latencies */
3349 thread->exec_elapsed = (instr_time *)
3350 pg_malloc(sizeof(instr_time) * num_commands);
3351 thread->exec_count = (int *)
3352 pg_malloc(sizeof(int) * num_commands);
3354 for (t = 0; t < num_commands; t++)
3356 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3357 thread->exec_count[t] = 0;
3362 thread->exec_elapsed = NULL;
3363 thread->exec_count = NULL;
3367 /* get start up time */
3368 INSTR_TIME_SET_CURRENT(start_time);
3370 /* set alarm if duration is specified. */
3375 for (i = 0; i < nthreads; i++)
3377 TState *thread = &threads[i];
3379 INSTR_TIME_SET_CURRENT(thread->start_time);
3381 /* the first thread (i = 0) is executed by main thread */
3384 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
3386 if (err != 0 || thread->thread == INVALID_THREAD)
3388 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
3394 thread->thread = INVALID_THREAD;
3398 /* wait for threads and accumulate results */
3399 INSTR_TIME_SET_ZERO(conn_total_time);
3400 for (i = 0; i < nthreads; i++)
3404 if (threads[i].thread == INVALID_THREAD)
3405 ret = threadRun(&threads[i]);
3407 pthread_join(threads[i].thread, &ret);
3411 TResult *r = (TResult *) ret;
3413 total_xacts += r->xacts;
3414 total_latencies += r->latencies;
3415 total_sqlats += r->sqlats;
3416 throttle_lag += r->throttle_lag;
3417 throttle_latency_skipped += r->throttle_latency_skipped;
3418 latency_late += r->latency_late;
3419 if (r->throttle_lag_max > throttle_lag_max)
3420 throttle_lag_max = r->throttle_lag_max;
3421 INSTR_TIME_ADD(conn_total_time, r->conn_time);
3425 disconnect_all(state, nclients);
3428 * XXX We compute results as though every client of every thread started
3429 * and finished at the same time. That model can diverge noticeably from
3430 * reality for a short benchmark run involving relatively many threads.
3431 * The first thread may process notably many transactions before the last
3432 * thread begins. Improving the model alone would bring limited benefit,
3433 * because performance during those periods of partial thread count can
3434 * easily exceed steady state performance. This is one of the many ways
3435 * short runs convey deceptive performance figures.
3437 INSTR_TIME_SET_CURRENT(total_time);
3438 INSTR_TIME_SUBTRACT(total_time, start_time);
3439 printResults(ttype, total_xacts, nclients, threads, nthreads,
3440 total_time, conn_total_time, total_latencies, total_sqlats,
3441 throttle_lag, throttle_lag_max, throttle_latency_skipped,
3448 threadRun(void *arg)
3450 TState *thread = (TState *) arg;
3451 CState *state = thread->state;
3453 FILE *logfile = NULL; /* per-thread log file */
3456 int nstate = thread->nstate;
3457 int remains = nstate; /* number of remaining clients */
3460 /* for reporting progress: */
3461 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3462 int64 last_report = thread_start;
3463 int64 next_report = last_report + (int64) progress * 1000000;
3464 int64 last_count = 0,
3473 * Initialize throttling rate target for all of the thread's clients. It
3474 * might be a little more accurate to reset thread->start_time here too.
3475 * The possible drift seems too small relative to typical throttle delay
3476 * times to worry about it.
3478 INSTR_TIME_SET_CURRENT(start);
3479 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3480 thread->throttle_lag = 0;
3481 thread->throttle_lag_max = 0;
3483 result = pg_malloc(sizeof(TResult));
3485 INSTR_TIME_SET_ZERO(result->conn_time);
3487 /* open log file if requested */
3492 if (thread->tid == 0)
3493 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3495 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3496 logfile = fopen(logpath, "w");
3498 if (logfile == NULL)
3500 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
3507 /* make connections to the database */
3508 for (i = 0; i < nstate; i++)
3510 if ((state[i].con = doConnect()) == NULL)
3515 /* time after thread and connections set up */
3516 INSTR_TIME_SET_CURRENT(result->conn_time);
3517 INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
3519 agg_vals_init(&aggs, thread->start_time);
3521 /* send start up queries in async manner */
3522 for (i = 0; i < nstate; i++)
3524 CState *st = &state[i];
3525 Command **commands = sql_files[st->use_file];
3526 int prev_ecnt = st->ecnt;
3528 st->use_file = getrand(thread, 0, num_files - 1);
3529 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3530 remains--; /* I've aborted */
3532 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3534 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
3535 remains--; /* I've aborted */
3544 int maxsock; /* max socket number to be waited */
3548 FD_ZERO(&input_mask);
3551 min_usec = PG_INT64_MAX;
3552 for (i = 0; i < nstate; i++)
3554 CState *st = &state[i];
3555 Command **commands = sql_files[st->use_file];
3558 if (st->con == NULL)
3562 else if (st->sleeping)
3564 if (st->throttling && timer_exceeded)
3566 /* interrupt client which has not started a transaction */
3569 st->throttling = false;
3574 else /* just a nap from the script */
3578 if (min_usec == PG_INT64_MAX)
3582 INSTR_TIME_SET_CURRENT(now);
3583 now_usec = INSTR_TIME_GET_MICROSEC(now);
3586 this_usec = st->txn_scheduled - now_usec;
3587 if (min_usec > this_usec)
3588 min_usec = this_usec;
3591 else if (commands[st->state]->type == META_COMMAND)
3593 min_usec = 0; /* the connection is ready to run */
3597 sock = PQsocket(st->con);
3600 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3604 FD_SET(sock, &input_mask);
3610 if (min_usec > 0 && maxsock != -1)
3612 int nsocks; /* return from select(2) */
3614 if (min_usec != PG_INT64_MAX)
3616 struct timeval timeout;
3618 timeout.tv_sec = min_usec / 1000000;
3619 timeout.tv_usec = min_usec % 1000000;
3620 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3623 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3628 /* must be something wrong */
3629 fprintf(stderr, "select failed: %s\n", strerror(errno));
3634 /* ok, backend returns reply */
3635 for (i = 0; i < nstate; i++)
3637 CState *st = &state[i];
3638 Command **commands = sql_files[st->use_file];
3639 int prev_ecnt = st->ecnt;
3641 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3642 || commands[st->state]->type == META_COMMAND))
3644 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3645 remains--; /* I've aborted */
3648 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3650 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
3651 remains--; /* I've aborted */
3657 #ifdef PTHREAD_FORK_EMULATION
3658 /* each process reports its own progression */
3661 instr_time now_time;
3664 INSTR_TIME_SET_CURRENT(now_time);
3665 now = INSTR_TIME_GET_MICROSEC(now_time);
3666 if (now >= next_report)
3668 /* generate and show report */
3673 int64 lags = thread->throttle_lag;
3674 int64 run = now - last_report;
3682 for (i = 0; i < nstate; i++)
3684 count += state[i].cnt;
3685 lats += state[i].txn_latencies;
3686 sqlats += state[i].txn_sqlats;
3689 total_run = (now - thread_start) / 1000000.0;
3690 tps = 1000000.0 * (count - last_count) / run;
3691 latency = 0.001 * (lats - last_lats) / (count - last_count);
3692 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3693 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3694 lag = 0.001 * (lags - last_lags) / (count - last_count);
3695 skipped = thread->throttle_latency_skipped - last_skipped;
3698 "progress %d: %.1f s, %.1f tps, "
3699 "lat %.3f ms stddev %.3f",
3700 thread->tid, total_run, tps, latency, stdev);
3703 fprintf(stderr, ", lag %.3f ms", lag);
3705 fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
3707 fprintf(stderr, "\n");
3711 last_sqlats = sqlats;
3714 last_skipped = thread->throttle_latency_skipped;
3715 next_report += (int64) progress *1000000;
3719 /* progress report by thread 0 for all threads */
3720 if (progress && thread->tid == 0)
3722 instr_time now_time;
3725 INSTR_TIME_SET_CURRENT(now_time);
3726 now = INSTR_TIME_GET_MICROSEC(now_time);
3727 if (now >= next_report)
3729 /* generate and show report */
3735 int64 run = now - last_report;
3743 for (i = 0; i < progress_nclients; i++)
3745 count += state[i].cnt;
3746 lats += state[i].txn_latencies;
3747 sqlats += state[i].txn_sqlats;
3750 for (i = 0; i < progress_nthreads; i++)
3751 lags += thread[i].throttle_lag;
3753 total_run = (now - thread_start) / 1000000.0;
3754 tps = 1000000.0 * (count - last_count) / run;
3755 latency = 0.001 * (lats - last_lats) / (count - last_count);
3756 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3757 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3758 lag = 0.001 * (lags - last_lags) / (count - last_count);
3759 skipped = thread->throttle_latency_skipped - last_skipped;
3762 "progress: %.1f s, %.1f tps, "
3763 "lat %.3f ms stddev %.3f",
3764 total_run, tps, latency, stdev);
3767 fprintf(stderr, ", lag %.3f ms", lag);
3769 fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
3771 fprintf(stderr, "\n");
3775 last_sqlats = sqlats;
3778 last_skipped = thread->throttle_latency_skipped;
3779 next_report += (int64) progress *1000000;
3782 #endif /* PTHREAD_FORK_EMULATION */
3786 INSTR_TIME_SET_CURRENT(start);
3787 disconnect_all(state, nstate);
3789 result->latencies = 0;
3791 for (i = 0; i < nstate; i++)
3793 result->xacts += state[i].cnt;
3794 result->latencies += state[i].txn_latencies;
3795 result->sqlats += state[i].txn_sqlats;
3797 result->throttle_lag = thread->throttle_lag;
3798 result->throttle_lag_max = thread->throttle_lag_max;
3799 result->throttle_latency_skipped = thread->throttle_latency_skipped;
3800 result->latency_late = thread->latency_late;
3802 INSTR_TIME_SET_CURRENT(end);
3803 INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
3810 * Support for duration option: set timer_exceeded after so many seconds.
3816 handle_sig_alarm(SIGNAL_ARGS)
3818 timer_exceeded = true;
3822 setalarm(int seconds)
3824 pqsignal(SIGALRM, handle_sig_alarm);
3828 #ifndef ENABLE_THREAD_SAFETY
3831 * implements pthread using fork.
3834 typedef struct fork_pthread
3841 pthread_create(pthread_t *thread,
3842 pthread_attr_t *attr,
3843 void *(*start_routine) (void *),
3850 th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
3851 if (pipe(th->pipes) < 0)
3858 if (th->pid == -1) /* error */
3863 if (th->pid != 0) /* in parent process */
3865 close(th->pipes[1]);
3870 /* in child process */
3871 close(th->pipes[0]);
3873 /* set alarm again because the child does not inherit timers */
3877 ret = start_routine(arg);
3878 rc = write(th->pipes[1], ret, sizeof(TResult));
3880 close(th->pipes[1]);
3886 pthread_join(pthread_t th, void **thread_return)
3890 while (waitpid(th->pid, &status, 0) != th->pid)
3896 if (thread_return != NULL)
3898 /* assume result is TResult */
3899 *thread_return = pg_malloc(sizeof(TResult));
3900 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
3902 free(*thread_return);
3903 *thread_return = NULL;
3906 close(th->pipes[0]);
3914 static VOID CALLBACK
3915 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3917 timer_exceeded = true;
3921 setalarm(int seconds)
3926 /* This function will be called at most once, so we can cheat a bit. */
3927 queue = CreateTimerQueue();
3928 if (seconds > ((DWORD) -1) / 1000 ||
3929 !CreateTimerQueueTimer(&timer, queue,
3930 win32_timer_callback, NULL, seconds * 1000, 0,
3931 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3933 fprintf(stderr, "Failed to set timer\n");
3938 /* partial pthread implementation for Windows */
3940 typedef struct win32_pthread
3943 void *(*routine) (void *);
3948 static unsigned __stdcall
3949 win32_pthread_run(void *arg)
3951 win32_pthread *th = (win32_pthread *) arg;
3953 th->result = th->routine(th->arg);
3959 pthread_create(pthread_t *thread,
3960 pthread_attr_t *attr,
3961 void *(*start_routine) (void *),
3967 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3968 th->routine = start_routine;
3972 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3973 if (th->handle == NULL)
3985 pthread_join(pthread_t th, void **thread_return)
3987 if (th == NULL || th->handle == NULL)
3988 return errno = EINVAL;
3990 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3992 _dosmaperr(GetLastError());
3997 *thread_return = th->result;
3999 CloseHandle(th->handle);