4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2015, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define M_PI 3.14159265358979323846
59 * Multi-platform pthread implementations
63 /* Use native win32 threads on Windows */
64 typedef struct win32_pthread *pthread_t;
65 typedef int pthread_attr_t;
67 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
68 static int pthread_join(pthread_t th, void **thread_return);
69 #elif defined(ENABLE_THREAD_SAFETY)
70 /* Use platform-dependent pthread capability */
73 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
74 #define PTHREAD_FORK_EMULATION
77 #define pthread_t pg_pthread_t
78 #define pthread_attr_t pg_pthread_attr_t
79 #define pthread_create pg_pthread_create
80 #define pthread_join pg_pthread_join
82 typedef struct fork_pthread *pthread_t;
83 typedef int pthread_attr_t;
85 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
86 static int pthread_join(pthread_t th, void **thread_return);
90 /********************************************************************
91 * some configurable parameters */
93 /* max number of clients allowed */
95 #define MAXCLIENTS (FD_SETSIZE - 10)
97 #define MAXCLIENTS 1024
100 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
101 #define DEFAULT_NXACTS 10 /* default nxacts */
103 #define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */
105 int nxacts = 0; /* number of transactions per client */
106 int duration = 0; /* duration in seconds */
109 * scaling factor. for example, scale = 10 will make 1000000 tuples in
110 * pgbench_accounts table.
115 * fillfactor. for example, fillfactor = 90 will use only 90 percent
116 * space during inserts and leave 10 percent free.
118 int fillfactor = 100;
121 * create foreign key constraints on the tables?
123 int foreign_keys = 0;
126 * use unlogged tables?
128 int unlogged_tables = 0;
131 * log sampling rate (1.0 = log everything, 0.0 = option not given)
133 double sample_rate = 0.0;
136 * When threads are throttled to a given rate limit, this is the target delay
137 * to reach that rate in usec. 0 is the default and means no throttling.
139 int64 throttle_delay = 0;
142 * Transactions which take longer than this limit (in usec) are counted as
143 * late, and reported as such, although they are completed anyway. When
144 * throttling is enabled, execution time slots that are more than this late
145 * are skipped altogether, and counted separately.
147 int64 latency_limit = 0;
150 * tablespace selection
152 char *tablespace = NULL;
153 char *index_tablespace = NULL;
156 * end of configurable parameters
157 *********************************************************************/
159 #define nbranches 1 /* Makes little sense to change this. Change
162 #define naccounts 100000
165 * The scale factor at/beyond which 32bit integers are incapable of storing
168 * Although the actual threshold is 21474, we use 20000 because it is easier to
169 * document and remember, and isn't that far away from the real threshold.
171 #define SCALE_32BIT_THRESHOLD 20000
173 bool use_log; /* log transaction latencies to a file */
174 bool use_quiet; /* quiet logging onto stderr */
175 int agg_interval; /* log aggregates instead of individual
177 int progress = 0; /* thread progress report every this seconds */
178 int progress_nclients = 0; /* number of clients for progress
180 int progress_nthreads = 0; /* number of threads for progress
182 bool is_connect; /* establish connection for each transaction */
183 bool is_latencies; /* report per-command latencies */
184 int main_pid; /* main process id used in log filename */
190 const char *progname;
192 volatile bool timer_exceeded = false; /* flag from signal handler */
194 /* variable definitions */
197 char *name; /* variable name */
198 char *value; /* its value */
201 #define MAX_FILES 128 /* max number of SQL script files allowed */
202 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
205 * structures used in custom query mode
210 PGconn *con; /* connection handle to DB */
211 int id; /* client No. */
212 int state; /* state No. */
213 int cnt; /* xacts count */
214 int ecnt; /* error count */
215 int listen; /* 0 indicates that an async query has been
217 int sleeping; /* 1 indicates that the client is napping */
218 bool throttling; /* whether nap is for throttling */
219 Variable *variables; /* array of variable definitions */
221 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
222 instr_time txn_begin; /* used for measuring schedule lag times */
223 instr_time stmt_begin; /* used for measuring statement latencies */
224 int64 txn_latencies; /* cumulated latencies */
225 int64 txn_sqlats; /* cumulated square latencies */
226 bool is_throttled; /* whether transaction throttling is done */
227 int use_file; /* index in sql_files for this client */
228 bool prepared[MAX_FILES];
232 * Thread state and result
236 int tid; /* thread id */
237 pthread_t thread; /* thread handle */
238 CState *state; /* array of CState */
239 int nstate; /* length of state[] */
240 instr_time start_time; /* thread start time */
241 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
242 int *exec_count; /* number of cmd executions (per Command) */
243 unsigned short random_state[3]; /* separate randomness for each thread */
244 int64 throttle_trigger; /* previous/next throttling (us) */
245 int64 throttle_lag; /* total transaction lag behind throttling */
246 int64 throttle_lag_max; /* max transaction lag */
247 int64 throttle_latency_skipped; /* lagging transactions
249 int64 latency_late; /* late transactions */
252 #define INVALID_THREAD ((pthread_t) 0)
256 instr_time conn_time;
261 int64 throttle_lag_max;
262 int64 throttle_latency_skipped;
267 * queries read from files
269 #define SQL_COMMAND 1
270 #define META_COMMAND 2
273 typedef enum QueryMode
275 QUERY_SIMPLE, /* simple query */
276 QUERY_EXTENDED, /* extended query */
277 QUERY_PREPARED, /* extended query with prepared statements */
281 static QueryMode querymode = QUERY_SIMPLE;
282 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
286 char *line; /* full text of command line */
287 int command_num; /* unique index of this Command struct */
288 int type; /* command type (SQL_COMMAND or META_COMMAND) */
289 int argc; /* number of command words */
290 char *argv[MAX_ARGS]; /* command word list */
291 int cols[MAX_ARGS]; /* corresponding column starting from 1 */
292 PgBenchExpr *expr; /* parsed expression */
298 long start_time; /* when does the interval start */
299 int cnt; /* number of transactions */
300 int skipped; /* number of transactions skipped under --rate
301 * and --latency-limit */
303 double min_latency; /* min/max latencies */
305 double sum_latency; /* sum(latency), sum(latency^2) - for
311 double sum_lag; /* sum(lag) */
312 double sum2_lag; /* sum(lag*lag) */
315 static Command **sql_files[MAX_FILES]; /* SQL script files */
316 static int num_files; /* number of script files */
317 static int num_commands = 0; /* total number of Command structs */
318 static int debug = 0; /* debug flag */
320 /* default scenario */
321 static char *tpc_b = {
322 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
323 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
324 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
325 "\\setrandom aid 1 :naccounts\n"
326 "\\setrandom bid 1 :nbranches\n"
327 "\\setrandom tid 1 :ntellers\n"
328 "\\setrandom delta -5000 5000\n"
330 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
331 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
332 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
333 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
334 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
339 static char *simple_update = {
340 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
341 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
342 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
343 "\\setrandom aid 1 :naccounts\n"
344 "\\setrandom bid 1 :nbranches\n"
345 "\\setrandom tid 1 :ntellers\n"
346 "\\setrandom delta -5000 5000\n"
348 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
349 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
350 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
355 static char *select_only = {
356 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
357 "\\setrandom aid 1 :naccounts\n"
358 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
361 /* Function prototypes */
362 static void setalarm(int seconds);
363 static void *threadRun(void *arg);
365 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
366 AggVals *agg, bool skipped);
371 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
373 " %s [OPTION]... [DBNAME]\n"
374 "\nInitialization options:\n"
375 " -i, --initialize invokes initialization mode\n"
376 " -F, --fillfactor=NUM set fill factor\n"
377 " -n, --no-vacuum do not run VACUUM after initialization\n"
378 " -q, --quiet quiet logging (one message each 5 seconds)\n"
379 " -s, --scale=NUM scaling factor\n"
380 " --foreign-keys create foreign key constraints between tables\n"
381 " --index-tablespace=TABLESPACE\n"
382 " create indexes in the specified tablespace\n"
383 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
384 " --unlogged-tables create tables as unlogged tables\n"
385 "\nBenchmarking options:\n"
386 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
387 " -C, --connect establish new connection for each transaction\n"
388 " -D, --define=VARNAME=VALUE\n"
389 " define variable for use by custom script\n"
390 " -f, --file=FILENAME read transaction script from FILENAME\n"
391 " -j, --jobs=NUM number of threads (default: 1)\n"
392 " -l, --log write transaction times to log file\n"
393 " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n"
395 " -M, --protocol=simple|extended|prepared\n"
396 " protocol for submitting queries (default: simple)\n"
397 " -n, --no-vacuum do not run VACUUM before tests\n"
398 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
399 " -P, --progress=NUM show thread progress report every NUM seconds\n"
400 " -r, --report-latencies report average latency per command\n"
401 " -R, --rate=NUM target rate in transactions per second\n"
402 " -s, --scale=NUM report this scale factor in output\n"
403 " -S, --select-only perform SELECT-only transactions\n"
404 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
405 " -T, --time=NUM duration of benchmark test in seconds\n"
406 " -v, --vacuum-all vacuum all four standard tables before tests\n"
407 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
408 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
409 "\nCommon options:\n"
410 " -d, --debug print debugging output\n"
411 " -h, --host=HOSTNAME database server host or socket directory\n"
412 " -p, --port=PORT database server port number\n"
413 " -U, --username=USERNAME connect as specified database user\n"
414 " -V, --version output version information, then exit\n"
415 " -?, --help show this help, then exit\n"
417 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
422 * strtoint64 -- convert a string to 64-bit integer
424 * This function is a modified version of scanint8() from
425 * src/backend/utils/adt/int8.c.
428 strtoint64(const char *str)
430 const char *ptr = str;
435 * Do our own scan, rather than relying on sscanf which might be broken
439 /* skip leading spaces */
440 while (*ptr && isspace((unsigned char) *ptr))
449 * Do an explicit check for INT64_MIN. Ugly though this is, it's
450 * cleaner than trying to get the loop below to handle it portably.
452 if (strncmp(ptr, "9223372036854775808", 19) == 0)
454 result = PG_INT64_MIN;
460 else if (*ptr == '+')
463 /* require at least one digit */
464 if (!isdigit((unsigned char) *ptr))
465 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
468 while (*ptr && isdigit((unsigned char) *ptr))
470 int64 tmp = result * 10 + (*ptr++ - '0');
472 if ((tmp / 10) != result) /* overflow? */
473 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
479 /* allow trailing whitespace, but not other trailing chars */
480 while (*ptr != '\0' && isspace((unsigned char) *ptr))
484 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
486 return ((sign < 0) ? -result : result);
489 /* random number generator: uniform distribution from min to max inclusive */
491 getrand(TState *thread, int64 min, int64 max)
494 * Odd coding is so that min and max have approximately the same chance of
495 * being selected as do numbers between them.
497 * pg_erand48() is thread-safe and concurrent, which is why we use it
498 * rather than random(), which in glibc is non-reentrant, and therefore
499 * protected by a mutex, and therefore a bottleneck on machines with many
502 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
506 * random number generator: exponential distribution from min to max inclusive.
507 * the threshold is so that the density of probability for the last cut-off max
508 * value is exp(-threshold).
511 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
517 Assert(threshold > 0.0);
518 cut = exp(-threshold);
519 /* erand in [0, 1), uniform in (0, 1] */
520 uniform = 1.0 - pg_erand48(thread->random_state);
523 * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1)
525 Assert((1.0 - cut) != 0.0);
526 rand = -log(cut + (1.0 - cut) * uniform) / threshold;
527 /* return int64 random number within between min and max */
528 return min + (int64) ((max - min + 1) * rand);
531 /* random number generator: gaussian distribution from min to max inclusive */
533 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
539 * Get user specified random number from this loop, with -threshold <
542 * This loop is executed until the number is in the expected range.
544 * As the minimum threshold is 2.0, the probability of looping is low:
545 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
546 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
547 * the worst case. For a 5.0 threshold value, the looping probability is
548 * about e^{-5} * 2 / pi ~ 0.43%.
553 * pg_erand48 generates [0,1), but for the basic version of the
554 * Box-Muller transform the two uniformly distributed random numbers
555 * are expected in (0, 1] (see
556 * http://en.wikipedia.org/wiki/Box_muller)
558 double rand1 = 1.0 - pg_erand48(thread->random_state);
559 double rand2 = 1.0 - pg_erand48(thread->random_state);
561 /* Box-Muller basic form transform */
562 double var_sqrt = sqrt(-2.0 * log(rand1));
564 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
567 * we may try with cos, but there may be a bias induced if the
568 * previous value fails the test. To be on the safe side, let us try
572 while (stdev < -threshold || stdev >= threshold);
574 /* stdev is in [-threshold, threshold), normalization to [0,1) */
575 rand = (stdev + threshold) / (threshold * 2.0);
577 /* return int64 random number within between min and max */
578 return min + (int64) ((max - min + 1) * rand);
582 * random number generator: generate a value, such that the series of values
583 * will approximate a Poisson distribution centered on the given value.
586 getPoissonRand(TState *thread, int64 center)
589 * Use inverse transform sampling to generate a value > 0, such that the
590 * expected (i.e. average) value is the given argument.
594 /* erand in [0, 1), uniform in (0, 1] */
595 uniform = 1.0 - pg_erand48(thread->random_state);
597 return (int64) (-log(uniform) * ((double) center) + 0.5);
600 /* call PQexec() and exit() on failure */
602 executeStatement(PGconn *con, const char *sql)
606 res = PQexec(con, sql);
607 if (PQresultStatus(res) != PGRES_COMMAND_OK)
609 fprintf(stderr, "%s", PQerrorMessage(con));
615 /* call PQexec() and complain, but without exiting, on failure */
617 tryExecuteStatement(PGconn *con, const char *sql)
621 res = PQexec(con, sql);
622 if (PQresultStatus(res) != PGRES_COMMAND_OK)
624 fprintf(stderr, "%s", PQerrorMessage(con));
625 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
630 /* set up a connection to the backend */
635 static char *password = NULL;
639 * Start the connection. Loop until we have a password if requested by
644 #define PARAMS_ARRAY_SIZE 7
646 const char *keywords[PARAMS_ARRAY_SIZE];
647 const char *values[PARAMS_ARRAY_SIZE];
649 keywords[0] = "host";
651 keywords[1] = "port";
653 keywords[2] = "user";
655 keywords[3] = "password";
656 values[3] = password;
657 keywords[4] = "dbname";
659 keywords[5] = "fallback_application_name";
660 values[5] = progname;
666 conn = PQconnectdbParams(keywords, values, true);
670 fprintf(stderr, "Connection to database \"%s\" failed\n",
675 if (PQstatus(conn) == CONNECTION_BAD &&
676 PQconnectionNeedsPassword(conn) &&
680 password = simple_prompt("Password: ", 100, false);
685 /* check to see that the backend connection was successfully made */
686 if (PQstatus(conn) == CONNECTION_BAD)
688 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
689 dbName, PQerrorMessage(conn));
697 /* throw away response from backend */
699 discard_response(CState *state)
705 res = PQgetResult(state->con);
712 compareVariables(const void *v1, const void *v2)
714 return strcmp(((const Variable *) v1)->name,
715 ((const Variable *) v2)->name);
719 getVariable(CState *st, char *name)
724 /* On some versions of Solaris, bsearch of zero items dumps core */
725 if (st->nvariables <= 0)
729 var = (Variable *) bsearch((void *) &key,
730 (void *) st->variables,
740 /* check whether the name consists of alphabets, numerals and underscores. */
742 isLegalVariableName(const char *name)
746 for (i = 0; name[i] != '\0'; i++)
748 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
756 putVariable(CState *st, const char *context, char *name, char *value)
762 /* On some versions of Solaris, bsearch of zero items dumps core */
763 if (st->nvariables > 0)
764 var = (Variable *) bsearch((void *) &key,
765 (void *) st->variables,
777 * Check for the name only when declaring a new variable to avoid
780 if (!isLegalVariableName(name))
782 fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
787 newvars = (Variable *) pg_realloc(st->variables,
788 (st->nvariables + 1) * sizeof(Variable));
790 newvars = (Variable *) pg_malloc(sizeof(Variable));
792 st->variables = newvars;
794 var = &newvars[st->nvariables];
796 var->name = pg_strdup(name);
797 var->value = pg_strdup(value);
801 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
808 /* dup then free, in case value is pointing at this variable */
809 val = pg_strdup(value);
819 parseVariable(const char *sql, int *eaten)
827 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
832 memcpy(name, &sql[1], i - 1);
840 replaceVariable(char **sql, char *param, int len, char *value)
842 int valueln = strlen(value);
846 size_t offset = param - *sql;
848 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
849 param = *sql + offset;
853 memmove(param + valueln, param + len, strlen(param + len) + 1);
854 memcpy(param, value, valueln);
856 return param + valueln;
860 assignVariables(CState *st, char *sql)
867 while ((p = strchr(p, ':')) != NULL)
871 name = parseVariable(p, &eaten);
881 val = getVariable(st, name);
889 p = replaceVariable(&sql, p, eaten, val);
896 getQueryParams(CState *st, const Command *command, const char **params)
900 for (i = 0; i < command->argc - 1; i++)
901 params[i] = getVariable(st, command->argv[i + 1]);
905 * Recursive evaluation of an expression in a pgbench script
906 * using the current state of variables.
907 * Returns whether the evaluation was ok,
908 * the value itself is returned through the retval pointer.
911 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
915 case ENODE_INTEGER_CONSTANT:
917 *retval = expr->u.integer_constant.ival;
925 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
927 fprintf(stderr, "undefined variable %s\n",
928 expr->u.variable.varname);
931 *retval = strtoint64(var);
940 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
942 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
944 switch (expr->u.operator.operator)
947 *retval = lval + rval;
951 *retval = lval - rval;
955 *retval = lval * rval;
961 fprintf(stderr, "division by zero\n");
964 *retval = lval / rval;
970 fprintf(stderr, "division by zero\n");
973 *retval = lval % rval;
977 fprintf(stderr, "bad operator\n");
985 fprintf(stderr, "bad expression\n");
990 * Run a shell command. The result is assigned to the variable if not NULL.
991 * Return true if succeeded, or false on error.
994 runShellCommand(CState *st, char *variable, char **argv, int argc)
996 char command[SHELL_COMMAND_SIZE];
1005 * Join arguments with whitespace separators. Arguments starting with
1006 * exactly one colon are treated as variables:
1007 * name - append a string "name"
1008 * :var - append a variable named 'var'
1009 * ::name - append a string ":name"
1012 for (i = 0; i < argc; i++)
1017 if (argv[i][0] != ':')
1019 arg = argv[i]; /* a string literal */
1021 else if (argv[i][1] == ':')
1023 arg = argv[i] + 1; /* a string literal starting with colons */
1025 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1027 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
1031 arglen = strlen(arg);
1032 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1034 fprintf(stderr, "%s: too long shell command\n", argv[0]);
1039 command[len++] = ' ';
1040 memcpy(command + len, arg, arglen);
1044 command[len] = '\0';
1046 /* Fast path for non-assignment case */
1047 if (variable == NULL)
1049 if (system(command))
1051 if (!timer_exceeded)
1052 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
1058 /* Execute the command with pipe and read the standard output. */
1059 if ((fp = popen(command, "r")) == NULL)
1061 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
1064 if (fgets(res, sizeof(res), fp) == NULL)
1066 if (!timer_exceeded)
1067 fprintf(stderr, "%s: cannot read the result\n", argv[0]);
1073 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
1077 /* Check whether the result is an integer and assign it to the variable */
1078 retval = (int) strtol(res, &endptr, 10);
1079 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1081 if (*res == '\0' || *endptr != '\0')
1083 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
1086 snprintf(res, sizeof(res), "%d", retval);
1087 if (!putVariable(st, "setshell", variable, res))
1091 printf("shell parameter name: %s, value: %s\n", argv[1], res);
1096 #define MAX_PREPARE_NAME 32
1098 preparedStatementName(char *buffer, int file, int state)
1100 sprintf(buffer, "P%d_%d", file, state);
1104 clientDone(CState *st, bool ok)
1106 (void) ok; /* unused */
1108 if (st->con != NULL)
1113 return false; /* always false */
1117 agg_vals_init(AggVals *aggs, instr_time start)
1119 /* basic counters */
1120 aggs->cnt = 0; /* number of transactions (includes skipped) */
1121 aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
1123 aggs->sum_latency = 0; /* SUM(latency) */
1124 aggs->sum2_latency = 0; /* SUM(latency*latency) */
1126 /* min and max transaction duration */
1127 aggs->min_latency = 0;
1128 aggs->max_latency = 0;
1130 /* schedule lag counters */
1136 /* start of the current interval */
1137 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1140 /* return false iff client should be disconnected */
1142 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1146 bool trans_needs_throttle = false;
1150 * gettimeofday() isn't free, so we get the current timestamp lazily the
1151 * first time it's needed, and reuse the same value throughout this
1152 * function after that. This also ensures that e.g. the calculated latency
1153 * reported in the log file and in the totals are the same. Zero means
1156 INSTR_TIME_SET_ZERO(now);
1159 commands = sql_files[st->use_file];
1162 * Handle throttling once per transaction by sleeping. It is simpler to
1163 * do this here rather than at the end, because so much complicated logic
1164 * happens below when statements finish.
1166 if (throttle_delay && !st->is_throttled)
1169 * Generate a delay such that the series of delays will approximate a
1170 * Poisson distribution centered on the throttle_delay time.
1172 * If transactions are too slow or a given wait is shorter than a
1173 * transaction, the next transaction will start right away.
1175 int64 wait = getPoissonRand(thread, throttle_delay);
1177 thread->throttle_trigger += wait;
1178 st->txn_scheduled = thread->throttle_trigger;
1181 * If this --latency-limit is used, and this slot is already late so
1182 * that the transaction will miss the latency limit even if it
1183 * completed immediately, we skip this time slot and iterate till the
1184 * next slot that isn't late yet.
1190 if (INSTR_TIME_IS_ZERO(now))
1191 INSTR_TIME_SET_CURRENT(now);
1192 now_us = INSTR_TIME_GET_MICROSEC(now);
1193 while (thread->throttle_trigger < now_us - latency_limit)
1195 thread->throttle_latency_skipped++;
1198 doLog(thread, st, logfile, &now, agg, true);
1200 wait = getPoissonRand(thread, throttle_delay);
1201 thread->throttle_trigger += wait;
1202 st->txn_scheduled = thread->throttle_trigger;
1207 st->throttling = true;
1208 st->is_throttled = true;
1210 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1215 { /* are we sleeping? */
1218 if (INSTR_TIME_IS_ZERO(now))
1219 INSTR_TIME_SET_CURRENT(now);
1220 now_us = INSTR_TIME_GET_MICROSEC(now);
1221 if (st->txn_scheduled <= now_us)
1223 st->sleeping = 0; /* Done sleeping, go ahead with next command */
1226 /* Measure lag of throttled transaction relative to target */
1227 int64 lag = now_us - st->txn_scheduled;
1229 thread->throttle_lag += lag;
1230 if (lag > thread->throttle_lag_max)
1231 thread->throttle_lag_max = lag;
1232 st->throttling = false;
1236 return true; /* Still sleeping, nothing to do here */
1240 { /* are we receiver? */
1241 if (commands[st->state]->type == SQL_COMMAND)
1244 fprintf(stderr, "client %d receiving\n", st->id);
1245 if (!PQconsumeInput(st->con))
1246 { /* there's something wrong */
1247 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
1248 return clientDone(st, false);
1250 if (PQisBusy(st->con))
1251 return true; /* don't have the whole result yet */
1255 * command finished: accumulate per-command execution times in
1256 * thread-local data structure, if per-command latencies are requested
1260 int cnum = commands[st->state]->command_num;
1262 if (INSTR_TIME_IS_ZERO(now))
1263 INSTR_TIME_SET_CURRENT(now);
1264 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1265 now, st->stmt_begin);
1266 thread->exec_count[cnum]++;
1269 /* transaction finished: calculate latency and log the transaction */
1270 if (commands[st->state + 1] == NULL)
1272 /* only calculate latency if an option is used that needs it */
1273 if (progress || throttle_delay || latency_limit)
1277 if (INSTR_TIME_IS_ZERO(now))
1278 INSTR_TIME_SET_CURRENT(now);
1280 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1282 st->txn_latencies += latency;
1285 * XXX In a long benchmark run of high-latency transactions,
1286 * this int64 addition eventually overflows. For example, 100
1287 * threads running 10s transactions will overflow it in 2.56
1288 * hours. With a more-typical OLTP workload of .1s
1289 * transactions, overflow would take 256 hours.
1291 st->txn_sqlats += latency * latency;
1293 /* record over the limit transactions if needed. */
1294 if (latency_limit && latency > latency_limit)
1295 thread->latency_late++;
1298 /* record the time it took in the log */
1300 doLog(thread, st, logfile, &now, agg, false);
1303 if (commands[st->state]->type == SQL_COMMAND)
1306 * Read and discard the query result; note this is not included in
1307 * the statement latency numbers.
1309 res = PQgetResult(st->con);
1310 switch (PQresultStatus(res))
1312 case PGRES_COMMAND_OK:
1313 case PGRES_TUPLES_OK:
1316 fprintf(stderr, "Client %d aborted in state %d: %s",
1317 st->id, st->state, PQerrorMessage(st->con));
1319 return clientDone(st, false);
1322 discard_response(st);
1325 if (commands[st->state + 1] == NULL)
1334 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1335 return clientDone(st, true); /* exit success */
1338 /* increment state counter */
1340 if (commands[st->state] == NULL)
1343 st->use_file = (int) getrand(thread, 0, num_files - 1);
1344 commands = sql_files[st->use_file];
1345 st->is_throttled = false;
1348 * No transaction is underway anymore, which means there is
1349 * nothing to listen to right now. When throttling rate limits
1350 * are active, a sleep will happen next, as the next transaction
1351 * starts. And then in any case the next SQL command will set
1355 trans_needs_throttle = (throttle_delay > 0);
1359 if (st->con == NULL)
1364 INSTR_TIME_SET_CURRENT(start);
1365 if ((st->con = doConnect()) == NULL)
1367 fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1368 return clientDone(st, false);
1370 INSTR_TIME_SET_CURRENT(end);
1371 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1375 * This ensures that a throttling delay is inserted before proceeding with
1376 * sql commands, after the first transaction. The first transaction
1377 * throttling is performed when first entering doCustom.
1379 if (trans_needs_throttle)
1381 trans_needs_throttle = false;
1385 /* Record transaction start time under logging, progress or throttling */
1386 if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1388 INSTR_TIME_SET_CURRENT(st->txn_begin);
1391 * When not throttling, this is also the transaction's scheduled start
1394 if (!throttle_delay)
1395 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1398 /* Record statement start time if per-command latencies are requested */
1400 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1402 if (commands[st->state]->type == SQL_COMMAND)
1404 const Command *command = commands[st->state];
1407 if (querymode == QUERY_SIMPLE)
1411 sql = pg_strdup(command->argv[0]);
1412 sql = assignVariables(st, sql);
1415 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1416 r = PQsendQuery(st->con, sql);
1419 else if (querymode == QUERY_EXTENDED)
1421 const char *sql = command->argv[0];
1422 const char *params[MAX_ARGS];
1424 getQueryParams(st, command, params);
1427 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1428 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1429 NULL, params, NULL, NULL, 0);
1431 else if (querymode == QUERY_PREPARED)
1433 char name[MAX_PREPARE_NAME];
1434 const char *params[MAX_ARGS];
1436 if (!st->prepared[st->use_file])
1440 for (j = 0; commands[j] != NULL; j++)
1443 char name[MAX_PREPARE_NAME];
1445 if (commands[j]->type != SQL_COMMAND)
1447 preparedStatementName(name, st->use_file, j);
1448 res = PQprepare(st->con, name,
1449 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1450 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1451 fprintf(stderr, "%s", PQerrorMessage(st->con));
1454 st->prepared[st->use_file] = true;
1457 getQueryParams(st, command, params);
1458 preparedStatementName(name, st->use_file, st->state);
1461 fprintf(stderr, "client %d sending %s\n", st->id, name);
1462 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1463 params, NULL, NULL, 0);
1465 else /* unknown sql mode */
1471 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1475 st->listen = 1; /* flags that should be listened */
1477 else if (commands[st->state]->type == META_COMMAND)
1479 int argc = commands[st->state]->argc,
1481 char **argv = commands[st->state]->argv;
1485 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1486 for (i = 1; i < argc; i++)
1487 fprintf(stderr, " %s", argv[i]);
1488 fprintf(stderr, "\n");
1491 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1496 double threshold = 0;
1499 if (*argv[2] == ':')
1501 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1503 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1507 min = strtoint64(var);
1510 min = strtoint64(argv[2]);
1515 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1521 if (*argv[3] == ':')
1523 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1525 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1529 max = strtoint64(var);
1532 max = strtoint64(argv[3]);
1536 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1542 * Generate random number functions need to be able to subtract
1543 * max from min and add one to the result without overflowing.
1544 * Since we know max > min, we can detect overflow just by
1545 * checking for a negative result. But we must check both that the
1546 * subtraction doesn't overflow, and that adding one to the result
1547 * doesn't overflow either.
1549 if (max - min < 0 || (max - min) + 1 < 0)
1551 fprintf(stderr, "%s: range too large\n", argv[0]);
1556 if (argc == 4 || /* uniform without or with "uniform" keyword */
1557 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1560 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1562 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1564 else if (argc == 6 &&
1565 ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1566 (pg_strcasecmp(argv[4], "exponential") == 0)))
1568 if (*argv[5] == ':')
1570 if ((var = getVariable(st, argv[5] + 1)) == NULL)
1572 fprintf(stderr, "%s: invalid threshold number %s\n", argv[0], argv[5]);
1576 threshold = strtod(var, NULL);
1579 threshold = strtod(argv[5], NULL);
1581 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1583 if (threshold < MIN_GAUSSIAN_THRESHOLD)
1585 fprintf(stderr, "%s: gaussian threshold must be at least %f\n,", argv[5], MIN_GAUSSIAN_THRESHOLD);
1590 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1592 snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1594 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1596 if (threshold <= 0.0)
1598 fprintf(stderr, "%s: exponential threshold must be strictly positive\n,", argv[5]);
1603 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1605 snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1608 else /* this means an error somewhere in the parsing phase... */
1610 fprintf(stderr, "%s: unexpected arguments\n", argv[0]);
1615 if (!putVariable(st, argv[0], argv[1], res))
1623 else if (pg_strcasecmp(argv[0], "set") == 0)
1626 PgBenchExpr *expr = commands[st->state]->expr;
1629 if (!evaluateExpr(st, expr, &result))
1634 sprintf(res, INT64_FORMAT, result);
1636 if (!putVariable(st, argv[0], argv[1], res))
1644 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1650 if (*argv[1] == ':')
1652 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1654 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1661 usec = atoi(argv[1]);
1665 if (pg_strcasecmp(argv[2], "ms") == 0)
1667 else if (pg_strcasecmp(argv[2], "s") == 0)
1673 INSTR_TIME_SET_CURRENT(now);
1674 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1679 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1681 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1683 if (timer_exceeded) /* timeout */
1684 return clientDone(st, true);
1685 else if (!ret) /* on error */
1690 else /* succeeded */
1693 else if (pg_strcasecmp(argv[0], "shell") == 0)
1695 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1697 if (timer_exceeded) /* timeout */
1698 return clientDone(st, true);
1699 else if (!ret) /* on error */
1704 else /* succeeded */
1714 * print log entry after completing one transaction.
1717 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1724 * Skip the log entry if sampling is enabled and this row doesn't belong
1725 * to the random sample.
1727 if (sample_rate != 0.0 &&
1728 pg_erand48(thread->random_state) > sample_rate)
1731 if (INSTR_TIME_IS_ZERO(*now))
1732 INSTR_TIME_SET_CURRENT(*now);
1734 latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1738 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1740 /* should we aggregate the results or not? */
1741 if (agg_interval > 0)
1744 * Are we still in the same interval? If yes, accumulate the values
1745 * (print them otherwise)
1747 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1753 * there is no latency to record if the transaction was
1760 agg->sum_latency += latency;
1761 agg->sum2_latency += latency * latency;
1763 /* first in this aggregation interval */
1764 if ((agg->cnt == 1) || (latency < agg->min_latency))
1765 agg->min_latency = latency;
1767 if ((agg->cnt == 1) || (latency > agg->max_latency))
1768 agg->max_latency = latency;
1770 /* and the same for schedule lag */
1773 agg->sum_lag += lag;
1774 agg->sum2_lag += lag * lag;
1776 if ((agg->cnt == 1) || (lag < agg->min_lag))
1778 if ((agg->cnt == 1) || (lag > agg->max_lag))
1786 * Loop until we reach the interval of the current transaction
1787 * (and print all the empty intervals in between).
1789 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1792 * This is a non-Windows branch (thanks to the ifdef in
1793 * usage), so we don't need to handle this in a special way
1796 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1805 fprintf(logfile, " %.0f %.0f %.0f %.0f",
1811 fprintf(logfile, " %d", agg->skipped);
1813 fputc('\n', logfile);
1815 /* move to the next inteval */
1816 agg->start_time = agg->start_time + agg_interval;
1818 /* reset for "no transaction" intervals */
1821 agg->min_latency = 0;
1822 agg->max_latency = 0;
1823 agg->sum_latency = 0;
1824 agg->sum2_latency = 0;
1831 /* reset the values to include only the current transaction. */
1833 agg->skipped = skipped ? 1 : 0;
1834 agg->min_latency = latency;
1835 agg->max_latency = latency;
1836 agg->sum_latency = skipped ? 0.0 : latency;
1837 agg->sum2_latency = skipped ? 0.0 : latency * latency;
1841 agg->sum2_lag = lag * lag;
1846 /* no, print raw transactions */
1849 /* This is more than we really ought to know about instr_time */
1851 fprintf(logfile, "%d %d skipped %d %ld %ld",
1852 st->id, st->cnt, st->use_file,
1853 (long) now->tv_sec, (long) now->tv_usec);
1855 fprintf(logfile, "%d %d %.0f %d %ld %ld",
1856 st->id, st->cnt, latency, st->use_file,
1857 (long) now->tv_sec, (long) now->tv_usec);
1860 /* On Windows, instr_time doesn't provide a timestamp anyway */
1862 fprintf(logfile, "%d %d skipped %d 0 0",
1863 st->id, st->cnt, st->use_file);
1865 fprintf(logfile, "%d %d %.0f %d 0 0",
1866 st->id, st->cnt, latency, st->use_file);
1869 fprintf(logfile, " %.0f", lag);
1870 fputc('\n', logfile);
1874 /* discard connections */
1876 disconnect_all(CState *state, int length)
1880 for (i = 0; i < length; i++)
1884 PQfinish(state[i].con);
1885 state[i].con = NULL;
1890 /* create tables and setup data */
1892 init(bool is_no_vacuum)
1895 * The scale factor at/beyond which 32-bit integers are insufficient for
1896 * storing TPC-B account IDs.
1898 * Although the actual threshold is 21474, we use 20000 because it is easier to
1899 * document and remember, and isn't that far away from the real threshold.
1901 #define SCALE_32BIT_THRESHOLD 20000
1904 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1905 * fields in these table declarations were intended to comply with that.
1906 * The pgbench_accounts table complies with that because the "filler"
1907 * column is set to blank-padded empty string. But for all other tables
1908 * the columns default to NULL and so don't actually take any space. We
1909 * could fix that by giving them non-null default values. However, that
1910 * would completely break comparability of pgbench results with prior
1911 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1912 * anyway, we stick with the historical behavior.
1916 const char *table; /* table name */
1917 const char *smcols; /* column decls if accountIDs are 32 bits */
1918 const char *bigcols; /* column decls if accountIDs are 64 bits */
1919 int declare_fillfactor;
1921 static const struct ddlinfo DDLs[] = {
1924 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1925 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1930 "tid int not null,bid int,tbalance int,filler char(84)",
1931 "tid int not null,bid int,tbalance int,filler char(84)",
1936 "aid int not null,bid int,abalance int,filler char(84)",
1937 "aid bigint not null,bid int,abalance int,filler char(84)",
1942 "bid int not null,bbalance int,filler char(88)",
1943 "bid int not null,bbalance int,filler char(88)",
1947 static const char *const DDLINDEXes[] = {
1948 "alter table pgbench_branches add primary key (bid)",
1949 "alter table pgbench_tellers add primary key (tid)",
1950 "alter table pgbench_accounts add primary key (aid)"
1952 static const char *const DDLKEYs[] = {
1953 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1954 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1955 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1956 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1957 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1966 /* used to track elapsed time and estimate of the remaining time */
1971 int log_interval = 1;
1973 if ((con = doConnect()) == NULL)
1976 for (i = 0; i < lengthof(DDLs); i++)
1980 const struct ddlinfo *ddl = &DDLs[i];
1983 /* Remove old table, if it exists. */
1984 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1985 executeStatement(con, buffer);
1987 /* Construct new create table statement. */
1989 if (ddl->declare_fillfactor)
1990 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1991 " with (fillfactor=%d)", fillfactor);
1992 if (tablespace != NULL)
1994 char *escape_tablespace;
1996 escape_tablespace = PQescapeIdentifier(con, tablespace,
1997 strlen(tablespace));
1998 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1999 " tablespace %s", escape_tablespace);
2000 PQfreemem(escape_tablespace);
2003 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
2005 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
2006 unlogged_tables ? " unlogged" : "",
2007 ddl->table, cols, opts);
2009 executeStatement(con, buffer);
2012 executeStatement(con, "begin");
2014 for (i = 0; i < nbranches * scale; i++)
2016 /* "filler" column defaults to NULL */
2017 snprintf(sql, sizeof(sql),
2018 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2020 executeStatement(con, sql);
2023 for (i = 0; i < ntellers * scale; i++)
2025 /* "filler" column defaults to NULL */
2026 snprintf(sql, sizeof(sql),
2027 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2028 i + 1, i / ntellers + 1);
2029 executeStatement(con, sql);
2032 executeStatement(con, "commit");
2035 * fill the pgbench_accounts table with some data
2037 fprintf(stderr, "creating tables...\n");
2039 executeStatement(con, "begin");
2040 executeStatement(con, "truncate pgbench_accounts");
2042 res = PQexec(con, "copy pgbench_accounts from stdin");
2043 if (PQresultStatus(res) != PGRES_COPY_IN)
2045 fprintf(stderr, "%s", PQerrorMessage(con));
2050 INSTR_TIME_SET_CURRENT(start);
2052 for (k = 0; k < (int64) naccounts * scale; k++)
2056 /* "filler" column defaults to blank padded empty string */
2057 snprintf(sql, sizeof(sql),
2058 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2059 j, k / naccounts + 1, 0);
2060 if (PQputline(con, sql))
2062 fprintf(stderr, "PQputline failed\n");
2067 * If we want to stick with the original logging, print a message each
2068 * 100k inserted rows.
2070 if ((!use_quiet) && (j % 100000 == 0))
2072 INSTR_TIME_SET_CURRENT(diff);
2073 INSTR_TIME_SUBTRACT(diff, start);
2075 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2076 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2078 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2079 j, (int64) naccounts * scale,
2080 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2081 elapsed_sec, remaining_sec);
2083 /* let's not call the timing for each row, but only each 100 rows */
2084 else if (use_quiet && (j % 100 == 0))
2086 INSTR_TIME_SET_CURRENT(diff);
2087 INSTR_TIME_SUBTRACT(diff, start);
2089 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2090 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2092 /* have we reached the next interval (or end)? */
2093 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2095 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2096 j, (int64) naccounts * scale,
2097 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2099 /* skip to the next interval */
2100 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2105 if (PQputline(con, "\\.\n"))
2107 fprintf(stderr, "very last PQputline failed\n");
2112 fprintf(stderr, "PQendcopy failed\n");
2115 executeStatement(con, "commit");
2120 fprintf(stderr, "vacuum...\n");
2121 executeStatement(con, "vacuum analyze pgbench_branches");
2122 executeStatement(con, "vacuum analyze pgbench_tellers");
2123 executeStatement(con, "vacuum analyze pgbench_accounts");
2124 executeStatement(con, "vacuum analyze pgbench_history");
2130 fprintf(stderr, "set primary keys...\n");
2131 for (i = 0; i < lengthof(DDLINDEXes); i++)
2135 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2137 if (index_tablespace != NULL)
2139 char *escape_tablespace;
2141 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2142 strlen(index_tablespace));
2143 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2144 " using index tablespace %s", escape_tablespace);
2145 PQfreemem(escape_tablespace);
2148 executeStatement(con, buffer);
2152 * create foreign keys
2156 fprintf(stderr, "set foreign keys...\n");
2157 for (i = 0; i < lengthof(DDLKEYs); i++)
2159 executeStatement(con, DDLKEYs[i]);
2163 fprintf(stderr, "done.\n");
2168 * Parse the raw sql and replace :param to $n.
2171 parseQuery(Command *cmd, const char *raw_sql)
2176 sql = pg_strdup(raw_sql);
2180 while ((p = strchr(p, ':')) != NULL)
2186 name = parseVariable(p, &eaten);
2196 if (cmd->argc >= MAX_ARGS)
2198 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2203 sprintf(var, "$%d", cmd->argc);
2204 p = replaceVariable(&sql, p, eaten, var);
2206 cmd->argv[cmd->argc] = name;
2215 syntax_error(const char *source, const int lineno,
2216 const char *line, const char *command,
2217 const char *msg, const char *more, const int column)
2219 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2221 fprintf(stderr, " (%s)", more);
2223 fprintf(stderr, " at column %d", column);
2224 fprintf(stderr, " in command \"%s\"\n", command);
2227 fprintf(stderr, "%s\n", line);
2232 for (i = 0; i < column - 1; i++)
2233 fprintf(stderr, " ");
2234 fprintf(stderr, "^ error found here\n");
2240 /* Parse a command; return a Command struct, or NULL if it's a comment */
2242 process_commands(char *buf, const char *source, const int lineno)
2244 const char delim[] = " \f\n\r\t\v";
2246 Command *my_commands;
2251 /* Make the string buf end at the next newline */
2252 if ((p = strchr(buf, '\n')) != NULL)
2255 /* Skip leading whitespace */
2257 while (isspace((unsigned char) *p))
2260 /* If the line is empty or actually a comment, we're done */
2261 if (*p == '\0' || strncmp(p, "--", 2) == 0)
2264 /* Allocate and initialize Command structure */
2265 my_commands = (Command *) pg_malloc(sizeof(Command));
2266 my_commands->line = pg_strdup(buf);
2267 my_commands->command_num = num_commands++;
2268 my_commands->type = 0; /* until set */
2269 my_commands->argc = 0;
2275 my_commands->type = META_COMMAND;
2278 tok = strtok(++p, delim);
2280 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2285 my_commands->cols[j] = tok - buf + 1;
2286 my_commands->argv[j++] = pg_strdup(tok);
2287 my_commands->argc++;
2288 if (max_args >= 0 && my_commands->argc >= max_args)
2289 tok = strtok(NULL, "");
2291 tok = strtok(NULL, delim);
2294 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2297 * parsing: \setrandom variable min max [uniform] \setrandom
2298 * variable min max (gaussian|exponential) threshold
2301 if (my_commands->argc < 4)
2303 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2304 "missing arguments", NULL, -1);
2309 if (my_commands->argc == 4 || /* uniform without/with
2310 * "uniform" keyword */
2311 (my_commands->argc == 5 &&
2312 pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2316 else if ( /* argc >= 5 */
2317 (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2318 (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2320 if (my_commands->argc < 6)
2322 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2323 "missing threshold argument", my_commands->argv[4], -1);
2325 else if (my_commands->argc > 6)
2327 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2328 "too many arguments", my_commands->argv[4],
2329 my_commands->cols[6]);
2332 else /* cannot parse, unexpected arguments */
2334 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2335 "unexpected argument", my_commands->argv[4],
2336 my_commands->cols[4]);
2339 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2341 if (my_commands->argc < 3)
2343 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2344 "missing argument", NULL, -1);
2347 expr_scanner_init(my_commands->argv[2], source, lineno,
2348 my_commands->line, my_commands->argv[0],
2349 my_commands->cols[2] - 1);
2351 if (expr_yyparse() != 0)
2353 /* dead code: exit done from syntax_error called by yyerror */
2357 my_commands->expr = expr_parse_result;
2359 expr_scanner_finish();
2361 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2363 if (my_commands->argc < 2)
2365 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2366 "missing argument", NULL, -1);
2370 * Split argument into number and unit to allow "sleep 1ms" etc.
2371 * We don't have to terminate the number argument with null
2372 * because it will be parsed with atoi, which ignores trailing
2373 * non-digit characters.
2375 if (my_commands->argv[1][0] != ':')
2377 char *c = my_commands->argv[1];
2379 while (isdigit((unsigned char) *c))
2383 my_commands->argv[2] = c;
2384 if (my_commands->argc < 3)
2385 my_commands->argc = 3;
2389 if (my_commands->argc >= 3)
2391 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2392 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2393 pg_strcasecmp(my_commands->argv[2], "s") != 0)
2395 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2396 "unknown time unit, must be us, ms or s",
2397 my_commands->argv[2], my_commands->cols[2]);
2401 /* this should be an error?! */
2402 for (j = 3; j < my_commands->argc; j++)
2403 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2404 my_commands->argv[0], my_commands->argv[j]);
2406 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2408 if (my_commands->argc < 3)
2410 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2411 "missing argument", NULL, -1);
2414 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2416 if (my_commands->argc < 1)
2418 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2419 "missing command", NULL, -1);
2424 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2425 "invalid command", NULL, -1);
2430 my_commands->type = SQL_COMMAND;
2435 my_commands->argv[0] = pg_strdup(p);
2436 my_commands->argc++;
2438 case QUERY_EXTENDED:
2439 case QUERY_PREPARED:
2440 if (!parseQuery(my_commands, p))
2452 * Read a line from fd, and return it in a malloc'd buffer.
2453 * Return NULL at EOF.
2455 * The buffer will typically be larger than necessary, but we don't care
2456 * in this program, because we'll free it as soon as we've parsed the line.
2459 read_line_from_file(FILE *fd)
2461 char tmpbuf[BUFSIZ];
2463 size_t buflen = BUFSIZ;
2466 buf = (char *) palloc(buflen);
2469 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2471 size_t thislen = strlen(tmpbuf);
2473 /* Append tmpbuf to whatever we had already */
2474 memcpy(buf + used, tmpbuf, thislen + 1);
2477 /* Done if we collected a newline */
2478 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2481 /* Else, enlarge buf to ensure we can append next bufferload */
2483 buf = (char *) pg_realloc(buf, buflen);
2495 process_file(char *filename)
2497 #define COMMANDS_ALLOC_NUM 128
2499 Command **my_commands;
2506 if (num_files >= MAX_FILES)
2508 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
2512 alloc_num = COMMANDS_ALLOC_NUM;
2513 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2515 if (strcmp(filename, "-") == 0)
2517 else if ((fd = fopen(filename, "r")) == NULL)
2519 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
2520 pg_free(my_commands);
2527 while ((buf = read_line_from_file(fd)) != NULL)
2533 command = process_commands(buf, filename, lineno);
2537 if (command == NULL)
2540 my_commands[index] = command;
2543 if (index >= alloc_num)
2545 alloc_num += COMMANDS_ALLOC_NUM;
2546 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2551 my_commands[index] = NULL;
2553 sql_files[num_files++] = my_commands;
2559 process_builtin(char *tb, const char *source)
2561 #define COMMANDS_ALLOC_NUM 128
2563 Command **my_commands;
2569 alloc_num = COMMANDS_ALLOC_NUM;
2570 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2581 while (*tb && *tb != '\n')
2594 command = process_commands(buf, source, lineno);
2595 if (command == NULL)
2598 my_commands[index] = command;
2601 if (index >= alloc_num)
2603 alloc_num += COMMANDS_ALLOC_NUM;
2604 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2608 my_commands[index] = NULL;
2613 /* print out results */
2615 printResults(int ttype, int64 normal_xacts, int nclients,
2616 TState *threads, int nthreads,
2617 instr_time total_time, instr_time conn_total_time,
2618 int64 total_latencies, int64 total_sqlats,
2619 int64 throttle_lag, int64 throttle_lag_max,
2620 int64 throttle_latency_skipped, int64 latency_late)
2622 double time_include,
2627 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2628 tps_include = normal_xacts / time_include;
2629 tps_exclude = normal_xacts / (time_include -
2630 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2633 s = "TPC-B (sort of)";
2634 else if (ttype == 2)
2635 s = "Update only pgbench_accounts";
2636 else if (ttype == 1)
2641 printf("transaction type: %s\n", s);
2642 printf("scaling factor: %d\n", scale);
2643 printf("query mode: %s\n", QUERYMODE[querymode]);
2644 printf("number of clients: %d\n", nclients);
2645 printf("number of threads: %d\n", nthreads);
2648 printf("number of transactions per client: %d\n", nxacts);
2649 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2650 normal_xacts, (int64) nxacts * nclients);
2654 printf("duration: %d s\n", duration);
2655 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2659 /* Remaining stats are nonsensical if we failed to execute any xacts */
2660 if (normal_xacts <= 0)
2663 if (throttle_delay && latency_limit)
2664 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2665 throttle_latency_skipped,
2666 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2669 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2670 latency_limit / 1000.0, latency_late,
2671 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2673 if (throttle_delay || progress || latency_limit)
2675 /* compute and show latency average and standard deviation */
2676 double latency = 0.001 * total_latencies / normal_xacts;
2677 double sqlat = (double) total_sqlats / normal_xacts;
2679 printf("latency average: %.3f ms\n"
2680 "latency stddev: %.3f ms\n",
2681 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2685 /* only an average latency computed from the duration is available */
2686 printf("latency average: %.3f ms\n",
2687 1000.0 * duration * nclients / normal_xacts);
2693 * Report average transaction lag under rate limit throttling. This
2694 * is the delay between scheduled and actual start times for the
2695 * transaction. The measured lag may be caused by thread/client load,
2696 * the database load, or the Poisson throttling process.
2698 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2699 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2702 printf("tps = %f (including connections establishing)\n", tps_include);
2703 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2705 /* Report per-command latencies */
2710 for (i = 0; i < num_files; i++)
2715 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2717 printf("statement latencies in milliseconds:\n");
2719 for (commands = sql_files[i]; *commands != NULL; commands++)
2721 Command *command = *commands;
2722 int cnum = command->command_num;
2724 instr_time total_exec_elapsed;
2725 int total_exec_count;
2728 /* Accumulate per-thread data for command */
2729 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2730 total_exec_count = 0;
2731 for (t = 0; t < nthreads; t++)
2733 TState *thread = &threads[t];
2735 INSTR_TIME_ADD(total_exec_elapsed,
2736 thread->exec_elapsed[cnum]);
2737 total_exec_count += thread->exec_count[cnum];
2740 if (total_exec_count > 0)
2741 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2745 printf("\t%f\t%s\n", total_time, command->line);
2753 main(int argc, char **argv)
2755 static struct option long_options[] = {
2756 /* systematic long/short named options */
2757 {"client", required_argument, NULL, 'c'},
2758 {"connect", no_argument, NULL, 'C'},
2759 {"debug", no_argument, NULL, 'd'},
2760 {"define", required_argument, NULL, 'D'},
2761 {"file", required_argument, NULL, 'f'},
2762 {"fillfactor", required_argument, NULL, 'F'},
2763 {"host", required_argument, NULL, 'h'},
2764 {"initialize", no_argument, NULL, 'i'},
2765 {"jobs", required_argument, NULL, 'j'},
2766 {"log", no_argument, NULL, 'l'},
2767 {"no-vacuum", no_argument, NULL, 'n'},
2768 {"port", required_argument, NULL, 'p'},
2769 {"progress", required_argument, NULL, 'P'},
2770 {"protocol", required_argument, NULL, 'M'},
2771 {"quiet", no_argument, NULL, 'q'},
2772 {"report-latencies", no_argument, NULL, 'r'},
2773 {"scale", required_argument, NULL, 's'},
2774 {"select-only", no_argument, NULL, 'S'},
2775 {"skip-some-updates", no_argument, NULL, 'N'},
2776 {"time", required_argument, NULL, 'T'},
2777 {"transactions", required_argument, NULL, 't'},
2778 {"username", required_argument, NULL, 'U'},
2779 {"vacuum-all", no_argument, NULL, 'v'},
2780 /* long-named only options */
2781 {"foreign-keys", no_argument, &foreign_keys, 1},
2782 {"index-tablespace", required_argument, NULL, 3},
2783 {"tablespace", required_argument, NULL, 2},
2784 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2785 {"sampling-rate", required_argument, NULL, 4},
2786 {"aggregate-interval", required_argument, NULL, 5},
2787 {"rate", required_argument, NULL, 'R'},
2788 {"latency-limit", required_argument, NULL, 'L'},
2793 int nclients = 1; /* default number of simulated clients */
2794 int nthreads = 1; /* default number of threads */
2795 int is_init_mode = 0; /* initialize mode? */
2796 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2797 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2798 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2799 * 2: skip update of branches and tellers */
2801 char *filename = NULL;
2802 bool scale_given = false;
2804 bool benchmarking_option_set = false;
2805 bool initialization_option_set = false;
2807 CState *state; /* status of clients */
2808 TState *threads; /* array of thread */
2810 instr_time start_time; /* start up time */
2811 instr_time total_time;
2812 instr_time conn_total_time;
2813 int64 total_xacts = 0;
2814 int64 total_latencies = 0;
2815 int64 total_sqlats = 0;
2816 int64 throttle_lag = 0;
2817 int64 throttle_lag_max = 0;
2818 int64 throttle_latency_skipped = 0;
2819 int64 latency_late = 0;
2824 #ifdef HAVE_GETRLIMIT
2834 progname = get_progname(argv[0]);
2838 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2843 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2845 puts("pgbench (PostgreSQL) " PG_VERSION);
2851 /* stderr is buffered on Win32. */
2852 setvbuf(stderr, NULL, _IONBF, 0);
2855 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2857 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2859 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2862 state = (CState *) pg_malloc(sizeof(CState));
2863 memset(state, 0, sizeof(CState));
2865 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2873 pghost = pg_strdup(optarg);
2879 do_vacuum_accounts++;
2882 pgport = pg_strdup(optarg);
2889 benchmarking_option_set = true;
2893 benchmarking_option_set = true;
2896 benchmarking_option_set = true;
2897 nclients = atoi(optarg);
2898 if (nclients <= 0 || nclients > MAXCLIENTS)
2900 fprintf(stderr, "invalid number of clients: %d\n", nclients);
2903 #ifdef HAVE_GETRLIMIT
2904 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2905 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2906 #else /* but BSD doesn't ... */
2907 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2908 #endif /* RLIMIT_NOFILE */
2910 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2913 if (rlim.rlim_cur <= (nclients + 2))
2915 fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2916 fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2919 #endif /* HAVE_GETRLIMIT */
2921 case 'j': /* jobs */
2922 benchmarking_option_set = true;
2923 nthreads = atoi(optarg);
2926 fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2931 benchmarking_option_set = true;
2935 benchmarking_option_set = true;
2936 is_latencies = true;
2940 scale = atoi(optarg);
2943 fprintf(stderr, "invalid scaling factor: %d\n", scale);
2948 benchmarking_option_set = true;
2951 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2954 nxacts = atoi(optarg);
2957 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2962 benchmarking_option_set = true;
2965 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2968 duration = atoi(optarg);
2971 fprintf(stderr, "invalid duration: %d\n", duration);
2976 login = pg_strdup(optarg);
2979 benchmarking_option_set = true;
2983 initialization_option_set = true;
2987 benchmarking_option_set = true;
2989 filename = pg_strdup(optarg);
2990 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2997 benchmarking_option_set = true;
2999 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
3001 fprintf(stderr, "invalid variable definition: %s\n", optarg);
3006 if (!putVariable(&state[0], "option", optarg, p))
3011 initialization_option_set = true;
3012 fillfactor = atoi(optarg);
3013 if ((fillfactor < 10) || (fillfactor > 100))
3015 fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
3020 benchmarking_option_set = true;
3023 fprintf(stderr, "query mode (-M) should be specified before transaction scripts (-f)\n");
3026 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3027 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3029 if (querymode >= NUM_QUERYMODE)
3031 fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
3036 benchmarking_option_set = true;
3037 progress = atoi(optarg);
3041 "thread progress delay (-P) must be positive (%s)\n",
3048 /* get a double from the beginning of option value */
3049 double throttle_value = atof(optarg);
3051 benchmarking_option_set = true;
3053 if (throttle_value <= 0.0)
3055 fprintf(stderr, "invalid rate limit: %s\n", optarg);
3058 /* Invert rate limit into a time offset */
3059 throttle_delay = (int64) (1000000.0 / throttle_value);
3064 double limit_ms = atof(optarg);
3066 if (limit_ms <= 0.0)
3068 fprintf(stderr, "invalid latency limit: %s\n", optarg);
3071 benchmarking_option_set = true;
3072 latency_limit = (int64) (limit_ms * 1000);
3076 /* This covers long options which take no argument. */
3077 if (foreign_keys || unlogged_tables)
3078 initialization_option_set = true;
3080 case 2: /* tablespace */
3081 initialization_option_set = true;
3082 tablespace = pg_strdup(optarg);
3084 case 3: /* index-tablespace */
3085 initialization_option_set = true;
3086 index_tablespace = pg_strdup(optarg);
3089 benchmarking_option_set = true;
3090 sample_rate = atof(optarg);
3091 if (sample_rate <= 0.0 || sample_rate > 1.0)
3093 fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
3099 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
3102 benchmarking_option_set = true;
3103 agg_interval = atoi(optarg);
3104 if (agg_interval <= 0)
3106 fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
3112 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3119 * Don't need more threads than there are clients. (This is not merely an
3120 * optimization; throttle_delay is calculated incorrectly below if some
3121 * threads have no clients assigned to them.)
3123 if (nthreads > nclients)
3124 nthreads = nclients;
3126 /* compute a per thread delay */
3127 throttle_delay *= nthreads;
3130 dbName = argv[optind];
3133 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3135 else if (login != NULL && *login != '\0')
3143 if (benchmarking_option_set)
3145 fprintf(stderr, "some options cannot be used in initialization (-i) mode\n");
3154 if (initialization_option_set)
3156 fprintf(stderr, "some options cannot be used in benchmarking mode\n");
3161 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3162 if (nxacts <= 0 && duration <= 0)
3163 nxacts = DEFAULT_NXACTS;
3165 /* --sampling-rate may be used only with -l */
3166 if (sample_rate > 0.0 && !use_log)
3168 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
3172 /* --sampling-rate may must not be used with --aggregate-interval */
3173 if (sample_rate > 0.0 && agg_interval > 0)
3175 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
3179 if (agg_interval > 0 && (!use_log))
3181 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3185 if ((duration > 0) && (agg_interval > duration))
3187 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
3191 if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
3193 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3198 * is_latencies only works with multiple threads in thread-based
3199 * implementations, not fork-based ones, because it supposes that the
3200 * parent can see changes made to the per-thread execution stats by child
3201 * threads. It seems useful enough to accept despite this limitation, but
3202 * perhaps we should FIXME someday (by passing the stats data back up
3203 * through the parent-to-child pipes).
3205 #ifndef ENABLE_THREAD_SAFETY
3206 if (is_latencies && nthreads > 1)
3208 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
3214 * save main process id in the global variable because process id will be
3215 * changed after fork.
3217 main_pid = (int) getpid();
3218 progress_nclients = nclients;
3219 progress_nthreads = nthreads;
3223 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3224 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3226 /* copy any -D switch values to all clients */
3227 for (i = 1; i < nclients; i++)
3232 for (j = 0; j < state[0].nvariables; j++)
3234 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3243 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3244 pghost, pgport, nclients, nxacts, dbName);
3246 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3247 pghost, pgport, nclients, duration, dbName);
3250 /* opening connection... */
3255 if (PQstatus(con) == CONNECTION_BAD)
3257 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
3258 fprintf(stderr, "%s", PQerrorMessage(con));
3265 * get the scaling factor that should be same as count(*) from
3266 * pgbench_branches if this is not a custom query
3268 res = PQexec(con, "select count(*) from pgbench_branches");
3269 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3271 fprintf(stderr, "%s", PQerrorMessage(con));
3274 scale = atoi(PQgetvalue(res, 0, 0));
3277 fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
3282 /* warn if we override user-given -s switch */
3285 "Scale option ignored, using pgbench_branches table count = %d\n",
3290 * :scale variables normally get -s or database scale, but don't override
3291 * an explicit -D switch
3293 if (getVariable(&state[0], "scale") == NULL)
3295 snprintf(val, sizeof(val), "%d", scale);
3296 for (i = 0; i < nclients; i++)
3298 if (!putVariable(&state[i], "startup", "scale", val))
3304 * Define a :client_id variable that is unique per connection. But don't
3305 * override an explicit -D switch.
3307 if (getVariable(&state[0], "client_id") == NULL)
3309 for (i = 0; i < nclients; i++)
3311 snprintf(val, sizeof(val), "%d", i);
3312 if (!putVariable(&state[i], "startup", "client_id", val))
3319 fprintf(stderr, "starting vacuum...");
3320 tryExecuteStatement(con, "vacuum pgbench_branches");
3321 tryExecuteStatement(con, "vacuum pgbench_tellers");
3322 tryExecuteStatement(con, "truncate pgbench_history");
3323 fprintf(stderr, "end.\n");
3325 if (do_vacuum_accounts)
3327 fprintf(stderr, "starting vacuum pgbench_accounts...");
3328 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3329 fprintf(stderr, "end.\n");
3334 /* set random seed */
3335 INSTR_TIME_SET_CURRENT(start_time);
3336 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3338 /* process builtin SQL scripts */
3342 sql_files[0] = process_builtin(tpc_b,
3343 "<builtin: TPC-B (sort of)>");
3348 sql_files[0] = process_builtin(select_only,
3349 "<builtin: select only>");
3354 sql_files[0] = process_builtin(simple_update,
3355 "<builtin: simple update>");
3363 /* set up thread data structures */
3364 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3367 for (i = 0; i < nthreads; i++)
3369 TState *thread = &threads[i];
3372 thread->state = &state[nclients_dealt];
3374 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
3375 thread->random_state[0] = random();
3376 thread->random_state[1] = random();
3377 thread->random_state[2] = random();
3378 thread->throttle_latency_skipped = 0;
3379 thread->latency_late = 0;
3381 nclients_dealt += thread->nstate;
3385 /* Reserve memory for the thread to store per-command latencies */
3388 thread->exec_elapsed = (instr_time *)
3389 pg_malloc(sizeof(instr_time) * num_commands);
3390 thread->exec_count = (int *)
3391 pg_malloc(sizeof(int) * num_commands);
3393 for (t = 0; t < num_commands; t++)
3395 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3396 thread->exec_count[t] = 0;
3401 thread->exec_elapsed = NULL;
3402 thread->exec_count = NULL;
3406 /* all clients must be assigned to a thread */
3407 Assert(nclients_dealt == nclients);
3409 /* get start up time */
3410 INSTR_TIME_SET_CURRENT(start_time);
3412 /* set alarm if duration is specified. */
3417 for (i = 0; i < nthreads; i++)
3419 TState *thread = &threads[i];
3421 INSTR_TIME_SET_CURRENT(thread->start_time);
3423 /* the first thread (i = 0) is executed by main thread */
3426 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
3428 if (err != 0 || thread->thread == INVALID_THREAD)
3430 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
3436 thread->thread = INVALID_THREAD;
3440 /* wait for threads and accumulate results */
3441 INSTR_TIME_SET_ZERO(conn_total_time);
3442 for (i = 0; i < nthreads; i++)
3446 if (threads[i].thread == INVALID_THREAD)
3447 ret = threadRun(&threads[i]);
3449 pthread_join(threads[i].thread, &ret);
3453 TResult *r = (TResult *) ret;
3455 total_xacts += r->xacts;
3456 total_latencies += r->latencies;
3457 total_sqlats += r->sqlats;
3458 throttle_lag += r->throttle_lag;
3459 throttle_latency_skipped += r->throttle_latency_skipped;
3460 latency_late += r->latency_late;
3461 if (r->throttle_lag_max > throttle_lag_max)
3462 throttle_lag_max = r->throttle_lag_max;
3463 INSTR_TIME_ADD(conn_total_time, r->conn_time);
3467 disconnect_all(state, nclients);
3470 * XXX We compute results as though every client of every thread started
3471 * and finished at the same time. That model can diverge noticeably from
3472 * reality for a short benchmark run involving relatively many threads.
3473 * The first thread may process notably many transactions before the last
3474 * thread begins. Improving the model alone would bring limited benefit,
3475 * because performance during those periods of partial thread count can
3476 * easily exceed steady state performance. This is one of the many ways
3477 * short runs convey deceptive performance figures.
3479 INSTR_TIME_SET_CURRENT(total_time);
3480 INSTR_TIME_SUBTRACT(total_time, start_time);
3481 printResults(ttype, total_xacts, nclients, threads, nthreads,
3482 total_time, conn_total_time, total_latencies, total_sqlats,
3483 throttle_lag, throttle_lag_max, throttle_latency_skipped,
3490 threadRun(void *arg)
3492 TState *thread = (TState *) arg;
3493 CState *state = thread->state;
3495 FILE *logfile = NULL; /* per-thread log file */
3498 int nstate = thread->nstate;
3499 int remains = nstate; /* number of remaining clients */
3502 /* for reporting progress: */
3503 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3504 int64 last_report = thread_start;
3505 int64 next_report = last_report + (int64) progress * 1000000;
3506 int64 last_count = 0,
3515 * Initialize throttling rate target for all of the thread's clients. It
3516 * might be a little more accurate to reset thread->start_time here too.
3517 * The possible drift seems too small relative to typical throttle delay
3518 * times to worry about it.
3520 INSTR_TIME_SET_CURRENT(start);
3521 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3522 thread->throttle_lag = 0;
3523 thread->throttle_lag_max = 0;
3525 result = pg_malloc(sizeof(TResult));
3527 INSTR_TIME_SET_ZERO(result->conn_time);
3529 /* open log file if requested */
3534 if (thread->tid == 0)
3535 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3537 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3538 logfile = fopen(logpath, "w");
3540 if (logfile == NULL)
3542 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
3549 /* make connections to the database */
3550 for (i = 0; i < nstate; i++)
3552 if ((state[i].con = doConnect()) == NULL)
3557 /* time after thread and connections set up */
3558 INSTR_TIME_SET_CURRENT(result->conn_time);
3559 INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
3561 agg_vals_init(&aggs, thread->start_time);
3563 /* send start up queries in async manner */
3564 for (i = 0; i < nstate; i++)
3566 CState *st = &state[i];
3567 Command **commands = sql_files[st->use_file];
3568 int prev_ecnt = st->ecnt;
3570 st->use_file = getrand(thread, 0, num_files - 1);
3571 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3572 remains--; /* I've aborted */
3574 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3576 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
3577 remains--; /* I've aborted */
3586 int maxsock; /* max socket number to be waited */
3590 FD_ZERO(&input_mask);
3593 min_usec = PG_INT64_MAX;
3594 for (i = 0; i < nstate; i++)
3596 CState *st = &state[i];
3597 Command **commands = sql_files[st->use_file];
3600 if (st->con == NULL)
3604 else if (st->sleeping)
3606 if (st->throttling && timer_exceeded)
3608 /* interrupt client which has not started a transaction */
3611 st->throttling = false;
3616 else /* just a nap from the script */
3620 if (min_usec == PG_INT64_MAX)
3624 INSTR_TIME_SET_CURRENT(now);
3625 now_usec = INSTR_TIME_GET_MICROSEC(now);
3628 this_usec = st->txn_scheduled - now_usec;
3629 if (min_usec > this_usec)
3630 min_usec = this_usec;
3633 else if (commands[st->state]->type == META_COMMAND)
3635 min_usec = 0; /* the connection is ready to run */
3639 sock = PQsocket(st->con);
3642 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3646 FD_SET(sock, &input_mask);
3652 if (min_usec > 0 && maxsock != -1)
3654 int nsocks; /* return from select(2) */
3656 if (min_usec != PG_INT64_MAX)
3658 struct timeval timeout;
3660 timeout.tv_sec = min_usec / 1000000;
3661 timeout.tv_usec = min_usec % 1000000;
3662 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3665 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3670 /* must be something wrong */
3671 fprintf(stderr, "select failed: %s\n", strerror(errno));
3676 /* ok, backend returns reply */
3677 for (i = 0; i < nstate; i++)
3679 CState *st = &state[i];
3680 Command **commands = sql_files[st->use_file];
3681 int prev_ecnt = st->ecnt;
3683 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3684 || commands[st->state]->type == META_COMMAND))
3686 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3687 remains--; /* I've aborted */
3690 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3692 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
3693 remains--; /* I've aborted */
3699 #ifdef PTHREAD_FORK_EMULATION
3700 /* each process reports its own progression */
3703 instr_time now_time;
3706 INSTR_TIME_SET_CURRENT(now_time);
3707 now = INSTR_TIME_GET_MICROSEC(now_time);
3708 if (now >= next_report)
3710 /* generate and show report */
3715 int64 lags = thread->throttle_lag;
3716 int64 run = now - last_report;
3724 for (i = 0; i < nstate; i++)
3726 count += state[i].cnt;
3727 lats += state[i].txn_latencies;
3728 sqlats += state[i].txn_sqlats;
3731 total_run = (now - thread_start) / 1000000.0;
3732 tps = 1000000.0 * (count - last_count) / run;
3733 latency = 0.001 * (lats - last_lats) / (count - last_count);
3734 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3735 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3736 lag = 0.001 * (lags - last_lags) / (count - last_count);
3737 skipped = thread->throttle_latency_skipped - last_skipped;
3740 "progress %d: %.1f s, %.1f tps, "
3741 "lat %.3f ms stddev %.3f",
3742 thread->tid, total_run, tps, latency, stdev);
3745 fprintf(stderr, ", lag %.3f ms", lag);
3747 fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
3749 fprintf(stderr, "\n");
3753 last_sqlats = sqlats;
3756 last_skipped = thread->throttle_latency_skipped;
3757 next_report += (int64) progress *1000000;
3761 /* progress report by thread 0 for all threads */
3762 if (progress && thread->tid == 0)
3764 instr_time now_time;
3767 INSTR_TIME_SET_CURRENT(now_time);
3768 now = INSTR_TIME_GET_MICROSEC(now_time);
3769 if (now >= next_report)
3771 /* generate and show report */
3777 int64 run = now - last_report;
3785 for (i = 0; i < progress_nclients; i++)
3787 count += state[i].cnt;
3788 lats += state[i].txn_latencies;
3789 sqlats += state[i].txn_sqlats;
3792 for (i = 0; i < progress_nthreads; i++)
3793 lags += thread[i].throttle_lag;
3795 total_run = (now - thread_start) / 1000000.0;
3796 tps = 1000000.0 * (count - last_count) / run;
3797 latency = 0.001 * (lats - last_lats) / (count - last_count);
3798 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3799 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3800 lag = 0.001 * (lags - last_lags) / (count - last_count);
3801 skipped = thread->throttle_latency_skipped - last_skipped;
3804 "progress: %.1f s, %.1f tps, "
3805 "lat %.3f ms stddev %.3f",
3806 total_run, tps, latency, stdev);
3809 fprintf(stderr, ", lag %.3f ms", lag);
3811 fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
3813 fprintf(stderr, "\n");
3817 last_sqlats = sqlats;
3820 last_skipped = thread->throttle_latency_skipped;
3821 next_report += (int64) progress *1000000;
3824 #endif /* PTHREAD_FORK_EMULATION */
3828 INSTR_TIME_SET_CURRENT(start);
3829 disconnect_all(state, nstate);
3831 result->latencies = 0;
3833 for (i = 0; i < nstate; i++)
3835 result->xacts += state[i].cnt;
3836 result->latencies += state[i].txn_latencies;
3837 result->sqlats += state[i].txn_sqlats;
3839 result->throttle_lag = thread->throttle_lag;
3840 result->throttle_lag_max = thread->throttle_lag_max;
3841 result->throttle_latency_skipped = thread->throttle_latency_skipped;
3842 result->latency_late = thread->latency_late;
3844 INSTR_TIME_SET_CURRENT(end);
3845 INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
3852 * Support for duration option: set timer_exceeded after so many seconds.
3858 handle_sig_alarm(SIGNAL_ARGS)
3860 timer_exceeded = true;
3864 setalarm(int seconds)
3866 pqsignal(SIGALRM, handle_sig_alarm);
3870 #ifndef ENABLE_THREAD_SAFETY
3873 * implements pthread using fork.
3876 typedef struct fork_pthread
3883 pthread_create(pthread_t *thread,
3884 pthread_attr_t *attr,
3885 void *(*start_routine) (void *),
3892 th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
3893 if (pipe(th->pipes) < 0)
3900 if (th->pid == -1) /* error */
3905 if (th->pid != 0) /* in parent process */
3907 close(th->pipes[1]);
3912 /* in child process */
3913 close(th->pipes[0]);
3915 /* set alarm again because the child does not inherit timers */
3919 ret = start_routine(arg);
3920 rc = write(th->pipes[1], ret, sizeof(TResult));
3922 close(th->pipes[1]);
3928 pthread_join(pthread_t th, void **thread_return)
3932 while (waitpid(th->pid, &status, 0) != th->pid)
3938 if (thread_return != NULL)
3940 /* assume result is TResult */
3941 *thread_return = pg_malloc(sizeof(TResult));
3942 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
3944 free(*thread_return);
3945 *thread_return = NULL;
3948 close(th->pipes[0]);
3956 static VOID CALLBACK
3957 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3959 timer_exceeded = true;
3963 setalarm(int seconds)
3968 /* This function will be called at most once, so we can cheat a bit. */
3969 queue = CreateTimerQueue();
3970 if (seconds > ((DWORD) -1) / 1000 ||
3971 !CreateTimerQueueTimer(&timer, queue,
3972 win32_timer_callback, NULL, seconds * 1000, 0,
3973 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3975 fprintf(stderr, "Failed to set timer\n");
3980 /* partial pthread implementation for Windows */
3982 typedef struct win32_pthread
3985 void *(*routine) (void *);
3990 static unsigned __stdcall
3991 win32_pthread_run(void *arg)
3993 win32_pthread *th = (win32_pthread *) arg;
3995 th->result = th->routine(th->arg);
4001 pthread_create(pthread_t *thread,
4002 pthread_attr_t *attr,
4003 void *(*start_routine) (void *),
4009 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
4010 th->routine = start_routine;
4014 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
4015 if (th->handle == NULL)
4027 pthread_join(pthread_t th, void **thread_return)
4029 if (th == NULL || th->handle == NULL)
4030 return errno = EINVAL;
4032 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
4034 _dosmaperr(GetLastError());
4039 *thread_return = th->result;
4041 CloseHandle(th->handle);