4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2015, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define M_PI 3.14159265358979323846
59 * Multi-platform pthread implementations
63 /* Use native win32 threads on Windows */
64 typedef struct win32_pthread *pthread_t;
65 typedef int pthread_attr_t;
67 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
68 static int pthread_join(pthread_t th, void **thread_return);
69 #elif defined(ENABLE_THREAD_SAFETY)
70 /* Use platform-dependent pthread capability */
73 /* No threads implementation, use none (-j 1) */
74 #define pthread_t void *
78 /********************************************************************
79 * some configurable parameters */
81 /* max number of clients allowed */
83 #define MAXCLIENTS (FD_SETSIZE - 10)
85 #define MAXCLIENTS 1024
88 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
89 #define DEFAULT_NXACTS 10 /* default nxacts */
91 #define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */
93 int nxacts = 0; /* number of transactions per client */
94 int duration = 0; /* duration in seconds */
97 * scaling factor. for example, scale = 10 will make 1000000 tuples in
98 * pgbench_accounts table.
103 * fillfactor. for example, fillfactor = 90 will use only 90 percent
104 * space during inserts and leave 10 percent free.
106 int fillfactor = 100;
109 * create foreign key constraints on the tables?
111 int foreign_keys = 0;
114 * use unlogged tables?
116 int unlogged_tables = 0;
119 * log sampling rate (1.0 = log everything, 0.0 = option not given)
121 double sample_rate = 0.0;
124 * When threads are throttled to a given rate limit, this is the target delay
125 * to reach that rate in usec. 0 is the default and means no throttling.
127 int64 throttle_delay = 0;
130 * Transactions which take longer than this limit (in usec) are counted as
131 * late, and reported as such, although they are completed anyway. When
132 * throttling is enabled, execution time slots that are more than this late
133 * are skipped altogether, and counted separately.
135 int64 latency_limit = 0;
138 * tablespace selection
140 char *tablespace = NULL;
141 char *index_tablespace = NULL;
144 * end of configurable parameters
145 *********************************************************************/
147 #define nbranches 1 /* Makes little sense to change this. Change
150 #define naccounts 100000
153 * The scale factor at/beyond which 32bit integers are incapable of storing
156 * Although the actual threshold is 21474, we use 20000 because it is easier to
157 * document and remember, and isn't that far away from the real threshold.
159 #define SCALE_32BIT_THRESHOLD 20000
161 bool use_log; /* log transaction latencies to a file */
162 bool use_quiet; /* quiet logging onto stderr */
163 int agg_interval; /* log aggregates instead of individual
165 int progress = 0; /* thread progress report every this seconds */
166 int progress_nclients = 0; /* number of clients for progress
168 int progress_nthreads = 0; /* number of threads for progress
170 bool is_connect; /* establish connection for each transaction */
171 bool is_latencies; /* report per-command latencies */
172 int main_pid; /* main process id used in log filename */
178 const char *progname;
180 volatile bool timer_exceeded = false; /* flag from signal handler */
182 /* variable definitions */
185 char *name; /* variable name */
186 char *value; /* its value */
189 #define MAX_FILES 128 /* max number of SQL script files allowed */
190 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
193 * structures used in custom query mode
198 PGconn *con; /* connection handle to DB */
199 int id; /* client No. */
200 int state; /* state No. */
201 int listen; /* 0 indicates that an async query has been
203 int sleeping; /* 1 indicates that the client is napping */
204 bool throttling; /* whether nap is for throttling */
205 Variable *variables; /* array of variable definitions */
207 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
208 instr_time txn_begin; /* used for measuring schedule lag times */
209 instr_time stmt_begin; /* used for measuring statement latencies */
210 bool is_throttled; /* whether transaction throttling is done */
211 int use_file; /* index in sql_files for this client */
212 bool prepared[MAX_FILES];
214 /* per client collected stats */
215 int cnt; /* xacts count */
216 int ecnt; /* error count */
217 int64 txn_latencies; /* cumulated latencies */
218 int64 txn_sqlats; /* cumulated square latencies */
226 int tid; /* thread id */
227 pthread_t thread; /* thread handle */
228 CState *state; /* array of CState */
229 int nstate; /* length of state[] */
230 instr_time start_time; /* thread start time */
231 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
232 int *exec_count; /* number of cmd executions (per Command) */
233 unsigned short random_state[3]; /* separate randomness for each thread */
234 int64 throttle_trigger; /* previous/next throttling (us) */
236 /* per thread collected stats */
237 instr_time conn_time;
238 int64 throttle_lag; /* total transaction lag behind throttling */
239 int64 throttle_lag_max; /* max transaction lag */
240 int64 throttle_latency_skipped; /* lagging transactions
242 int64 latency_late; /* late transactions */
245 #define INVALID_THREAD ((pthread_t) 0)
248 * queries read from files
250 #define SQL_COMMAND 1
251 #define META_COMMAND 2
254 typedef enum QueryMode
256 QUERY_SIMPLE, /* simple query */
257 QUERY_EXTENDED, /* extended query */
258 QUERY_PREPARED, /* extended query with prepared statements */
262 static QueryMode querymode = QUERY_SIMPLE;
263 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
267 char *line; /* full text of command line */
268 int command_num; /* unique index of this Command struct */
269 int type; /* command type (SQL_COMMAND or META_COMMAND) */
270 int argc; /* number of command words */
271 char *argv[MAX_ARGS]; /* command word list */
272 int cols[MAX_ARGS]; /* corresponding column starting from 1 */
273 PgBenchExpr *expr; /* parsed expression */
279 long start_time; /* when does the interval start */
280 int cnt; /* number of transactions */
281 int skipped; /* number of transactions skipped under --rate
282 * and --latency-limit */
284 double min_latency; /* min/max latencies */
286 double sum_latency; /* sum(latency), sum(latency^2) - for
292 double sum_lag; /* sum(lag) */
293 double sum2_lag; /* sum(lag*lag) */
296 static Command **sql_files[MAX_FILES]; /* SQL script files */
297 static int num_files; /* number of script files */
298 static int num_commands = 0; /* total number of Command structs */
299 static int debug = 0; /* debug flag */
301 /* default scenario */
302 static char *tpc_b = {
303 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
304 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
305 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
306 "\\setrandom aid 1 :naccounts\n"
307 "\\setrandom bid 1 :nbranches\n"
308 "\\setrandom tid 1 :ntellers\n"
309 "\\setrandom delta -5000 5000\n"
311 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
312 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
313 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
314 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
315 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
320 static char *simple_update = {
321 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
322 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
323 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
324 "\\setrandom aid 1 :naccounts\n"
325 "\\setrandom bid 1 :nbranches\n"
326 "\\setrandom tid 1 :ntellers\n"
327 "\\setrandom delta -5000 5000\n"
329 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
330 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
331 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
336 static char *select_only = {
337 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
338 "\\setrandom aid 1 :naccounts\n"
339 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
342 /* Function prototypes */
343 static void setalarm(int seconds);
344 static void *threadRun(void *arg);
346 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
347 AggVals *agg, bool skipped);
352 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
354 " %s [OPTION]... [DBNAME]\n"
355 "\nInitialization options:\n"
356 " -i, --initialize invokes initialization mode\n"
357 " -F, --fillfactor=NUM set fill factor\n"
358 " -n, --no-vacuum do not run VACUUM after initialization\n"
359 " -q, --quiet quiet logging (one message each 5 seconds)\n"
360 " -s, --scale=NUM scaling factor\n"
361 " --foreign-keys create foreign key constraints between tables\n"
362 " --index-tablespace=TABLESPACE\n"
363 " create indexes in the specified tablespace\n"
364 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
365 " --unlogged-tables create tables as unlogged tables\n"
366 "\nBenchmarking options:\n"
367 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
368 " -C, --connect establish new connection for each transaction\n"
369 " -D, --define=VARNAME=VALUE\n"
370 " define variable for use by custom script\n"
371 " -f, --file=FILENAME read transaction script from FILENAME\n"
372 " -j, --jobs=NUM number of threads (default: 1)\n"
373 " -l, --log write transaction times to log file\n"
374 " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n"
376 " -M, --protocol=simple|extended|prepared\n"
377 " protocol for submitting queries (default: simple)\n"
378 " -n, --no-vacuum do not run VACUUM before tests\n"
379 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
380 " -P, --progress=NUM show thread progress report every NUM seconds\n"
381 " -r, --report-latencies report average latency per command\n"
382 " -R, --rate=NUM target rate in transactions per second\n"
383 " -s, --scale=NUM report this scale factor in output\n"
384 " -S, --select-only perform SELECT-only transactions\n"
385 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
386 " -T, --time=NUM duration of benchmark test in seconds\n"
387 " -v, --vacuum-all vacuum all four standard tables before tests\n"
388 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
389 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
390 "\nCommon options:\n"
391 " -d, --debug print debugging output\n"
392 " -h, --host=HOSTNAME database server host or socket directory\n"
393 " -p, --port=PORT database server port number\n"
394 " -U, --username=USERNAME connect as specified database user\n"
395 " -V, --version output version information, then exit\n"
396 " -?, --help show this help, then exit\n"
398 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
403 * strtoint64 -- convert a string to 64-bit integer
405 * This function is a modified version of scanint8() from
406 * src/backend/utils/adt/int8.c.
409 strtoint64(const char *str)
411 const char *ptr = str;
416 * Do our own scan, rather than relying on sscanf which might be broken
420 /* skip leading spaces */
421 while (*ptr && isspace((unsigned char) *ptr))
430 * Do an explicit check for INT64_MIN. Ugly though this is, it's
431 * cleaner than trying to get the loop below to handle it portably.
433 if (strncmp(ptr, "9223372036854775808", 19) == 0)
435 result = PG_INT64_MIN;
441 else if (*ptr == '+')
444 /* require at least one digit */
445 if (!isdigit((unsigned char) *ptr))
446 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
449 while (*ptr && isdigit((unsigned char) *ptr))
451 int64 tmp = result * 10 + (*ptr++ - '0');
453 if ((tmp / 10) != result) /* overflow? */
454 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
460 /* allow trailing whitespace, but not other trailing chars */
461 while (*ptr != '\0' && isspace((unsigned char) *ptr))
465 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
467 return ((sign < 0) ? -result : result);
470 /* random number generator: uniform distribution from min to max inclusive */
472 getrand(TState *thread, int64 min, int64 max)
475 * Odd coding is so that min and max have approximately the same chance of
476 * being selected as do numbers between them.
478 * pg_erand48() is thread-safe and concurrent, which is why we use it
479 * rather than random(), which in glibc is non-reentrant, and therefore
480 * protected by a mutex, and therefore a bottleneck on machines with many
483 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
487 * random number generator: exponential distribution from min to max inclusive.
488 * the threshold is so that the density of probability for the last cut-off max
489 * value is exp(-threshold).
492 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
498 Assert(threshold > 0.0);
499 cut = exp(-threshold);
500 /* erand in [0, 1), uniform in (0, 1] */
501 uniform = 1.0 - pg_erand48(thread->random_state);
504 * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1)
506 Assert((1.0 - cut) != 0.0);
507 rand = -log(cut + (1.0 - cut) * uniform) / threshold;
508 /* return int64 random number within between min and max */
509 return min + (int64) ((max - min + 1) * rand);
512 /* random number generator: gaussian distribution from min to max inclusive */
514 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
520 * Get user specified random number from this loop, with -threshold <
523 * This loop is executed until the number is in the expected range.
525 * As the minimum threshold is 2.0, the probability of looping is low:
526 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
527 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
528 * the worst case. For a 5.0 threshold value, the looping probability is
529 * about e^{-5} * 2 / pi ~ 0.43%.
534 * pg_erand48 generates [0,1), but for the basic version of the
535 * Box-Muller transform the two uniformly distributed random numbers
536 * are expected in (0, 1] (see
537 * http://en.wikipedia.org/wiki/Box_muller)
539 double rand1 = 1.0 - pg_erand48(thread->random_state);
540 double rand2 = 1.0 - pg_erand48(thread->random_state);
542 /* Box-Muller basic form transform */
543 double var_sqrt = sqrt(-2.0 * log(rand1));
545 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
548 * we may try with cos, but there may be a bias induced if the
549 * previous value fails the test. To be on the safe side, let us try
553 while (stdev < -threshold || stdev >= threshold);
555 /* stdev is in [-threshold, threshold), normalization to [0,1) */
556 rand = (stdev + threshold) / (threshold * 2.0);
558 /* return int64 random number within between min and max */
559 return min + (int64) ((max - min + 1) * rand);
563 * random number generator: generate a value, such that the series of values
564 * will approximate a Poisson distribution centered on the given value.
567 getPoissonRand(TState *thread, int64 center)
570 * Use inverse transform sampling to generate a value > 0, such that the
571 * expected (i.e. average) value is the given argument.
575 /* erand in [0, 1), uniform in (0, 1] */
576 uniform = 1.0 - pg_erand48(thread->random_state);
578 return (int64) (-log(uniform) * ((double) center) + 0.5);
581 /* call PQexec() and exit() on failure */
583 executeStatement(PGconn *con, const char *sql)
587 res = PQexec(con, sql);
588 if (PQresultStatus(res) != PGRES_COMMAND_OK)
590 fprintf(stderr, "%s", PQerrorMessage(con));
596 /* call PQexec() and complain, but without exiting, on failure */
598 tryExecuteStatement(PGconn *con, const char *sql)
602 res = PQexec(con, sql);
603 if (PQresultStatus(res) != PGRES_COMMAND_OK)
605 fprintf(stderr, "%s", PQerrorMessage(con));
606 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
611 /* set up a connection to the backend */
616 static char *password = NULL;
620 * Start the connection. Loop until we have a password if requested by
625 #define PARAMS_ARRAY_SIZE 7
627 const char *keywords[PARAMS_ARRAY_SIZE];
628 const char *values[PARAMS_ARRAY_SIZE];
630 keywords[0] = "host";
632 keywords[1] = "port";
634 keywords[2] = "user";
636 keywords[3] = "password";
637 values[3] = password;
638 keywords[4] = "dbname";
640 keywords[5] = "fallback_application_name";
641 values[5] = progname;
647 conn = PQconnectdbParams(keywords, values, true);
651 fprintf(stderr, "connection to database \"%s\" failed\n",
656 if (PQstatus(conn) == CONNECTION_BAD &&
657 PQconnectionNeedsPassword(conn) &&
661 password = simple_prompt("Password: ", 100, false);
666 /* check to see that the backend connection was successfully made */
667 if (PQstatus(conn) == CONNECTION_BAD)
669 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
670 dbName, PQerrorMessage(conn));
678 /* throw away response from backend */
680 discard_response(CState *state)
686 res = PQgetResult(state->con);
693 compareVariables(const void *v1, const void *v2)
695 return strcmp(((const Variable *) v1)->name,
696 ((const Variable *) v2)->name);
700 getVariable(CState *st, char *name)
705 /* On some versions of Solaris, bsearch of zero items dumps core */
706 if (st->nvariables <= 0)
710 var = (Variable *) bsearch((void *) &key,
711 (void *) st->variables,
721 /* check whether the name consists of alphabets, numerals and underscores. */
723 isLegalVariableName(const char *name)
727 for (i = 0; name[i] != '\0'; i++)
729 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
737 putVariable(CState *st, const char *context, char *name, char *value)
743 /* On some versions of Solaris, bsearch of zero items dumps core */
744 if (st->nvariables > 0)
745 var = (Variable *) bsearch((void *) &key,
746 (void *) st->variables,
758 * Check for the name only when declaring a new variable to avoid
761 if (!isLegalVariableName(name))
763 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
769 newvars = (Variable *) pg_realloc(st->variables,
770 (st->nvariables + 1) * sizeof(Variable));
772 newvars = (Variable *) pg_malloc(sizeof(Variable));
774 st->variables = newvars;
776 var = &newvars[st->nvariables];
778 var->name = pg_strdup(name);
779 var->value = pg_strdup(value);
783 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
790 /* dup then free, in case value is pointing at this variable */
791 val = pg_strdup(value);
801 parseVariable(const char *sql, int *eaten)
809 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
814 memcpy(name, &sql[1], i - 1);
822 replaceVariable(char **sql, char *param, int len, char *value)
824 int valueln = strlen(value);
828 size_t offset = param - *sql;
830 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
831 param = *sql + offset;
835 memmove(param + valueln, param + len, strlen(param + len) + 1);
836 memcpy(param, value, valueln);
838 return param + valueln;
842 assignVariables(CState *st, char *sql)
849 while ((p = strchr(p, ':')) != NULL)
853 name = parseVariable(p, &eaten);
863 val = getVariable(st, name);
871 p = replaceVariable(&sql, p, eaten, val);
878 getQueryParams(CState *st, const Command *command, const char **params)
882 for (i = 0; i < command->argc - 1; i++)
883 params[i] = getVariable(st, command->argv[i + 1]);
887 * Recursive evaluation of an expression in a pgbench script
888 * using the current state of variables.
889 * Returns whether the evaluation was ok,
890 * the value itself is returned through the retval pointer.
893 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
897 case ENODE_INTEGER_CONSTANT:
899 *retval = expr->u.integer_constant.ival;
907 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
909 fprintf(stderr, "undefined variable \"%s\"\n",
910 expr->u.variable.varname);
913 *retval = strtoint64(var);
922 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
924 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
926 switch (expr->u.operator.operator)
929 *retval = lval + rval;
933 *retval = lval - rval;
937 *retval = lval * rval;
943 fprintf(stderr, "division by zero\n");
946 *retval = lval / rval;
952 fprintf(stderr, "division by zero\n");
955 *retval = lval % rval;
959 fprintf(stderr, "bad operator\n");
967 fprintf(stderr, "bad expression\n");
972 * Run a shell command. The result is assigned to the variable if not NULL.
973 * Return true if succeeded, or false on error.
976 runShellCommand(CState *st, char *variable, char **argv, int argc)
978 char command[SHELL_COMMAND_SIZE];
987 * Join arguments with whitespace separators. Arguments starting with
988 * exactly one colon are treated as variables:
989 * name - append a string "name"
990 * :var - append a variable named 'var'
991 * ::name - append a string ":name"
994 for (i = 0; i < argc; i++)
999 if (argv[i][0] != ':')
1001 arg = argv[i]; /* a string literal */
1003 else if (argv[i][1] == ':')
1005 arg = argv[i] + 1; /* a string literal starting with colons */
1007 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1009 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1014 arglen = strlen(arg);
1015 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1017 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1022 command[len++] = ' ';
1023 memcpy(command + len, arg, arglen);
1027 command[len] = '\0';
1029 /* Fast path for non-assignment case */
1030 if (variable == NULL)
1032 if (system(command))
1034 if (!timer_exceeded)
1035 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1041 /* Execute the command with pipe and read the standard output. */
1042 if ((fp = popen(command, "r")) == NULL)
1044 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1047 if (fgets(res, sizeof(res), fp) == NULL)
1049 if (!timer_exceeded)
1050 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1056 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1060 /* Check whether the result is an integer and assign it to the variable */
1061 retval = (int) strtol(res, &endptr, 10);
1062 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1064 if (*res == '\0' || *endptr != '\0')
1066 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1070 snprintf(res, sizeof(res), "%d", retval);
1071 if (!putVariable(st, "setshell", variable, res))
1075 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1080 #define MAX_PREPARE_NAME 32
1082 preparedStatementName(char *buffer, int file, int state)
1084 sprintf(buffer, "P%d_%d", file, state);
1088 clientDone(CState *st, bool ok)
1090 (void) ok; /* unused */
1092 if (st->con != NULL)
1097 return false; /* always false */
1101 agg_vals_init(AggVals *aggs, instr_time start)
1103 /* basic counters */
1104 aggs->cnt = 0; /* number of transactions (includes skipped) */
1105 aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
1107 aggs->sum_latency = 0; /* SUM(latency) */
1108 aggs->sum2_latency = 0; /* SUM(latency*latency) */
1110 /* min and max transaction duration */
1111 aggs->min_latency = 0;
1112 aggs->max_latency = 0;
1114 /* schedule lag counters */
1120 /* start of the current interval */
1121 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1124 /* return false iff client should be disconnected */
1126 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1130 bool trans_needs_throttle = false;
1134 * gettimeofday() isn't free, so we get the current timestamp lazily the
1135 * first time it's needed, and reuse the same value throughout this
1136 * function after that. This also ensures that e.g. the calculated latency
1137 * reported in the log file and in the totals are the same. Zero means
1140 INSTR_TIME_SET_ZERO(now);
1143 commands = sql_files[st->use_file];
1146 * Handle throttling once per transaction by sleeping. It is simpler to
1147 * do this here rather than at the end, because so much complicated logic
1148 * happens below when statements finish.
1150 if (throttle_delay && !st->is_throttled)
1153 * Generate a delay such that the series of delays will approximate a
1154 * Poisson distribution centered on the throttle_delay time.
1156 * If transactions are too slow or a given wait is shorter than a
1157 * transaction, the next transaction will start right away.
1159 int64 wait = getPoissonRand(thread, throttle_delay);
1161 thread->throttle_trigger += wait;
1162 st->txn_scheduled = thread->throttle_trigger;
1165 * If this --latency-limit is used, and this slot is already late so
1166 * that the transaction will miss the latency limit even if it
1167 * completed immediately, we skip this time slot and iterate till the
1168 * next slot that isn't late yet.
1174 if (INSTR_TIME_IS_ZERO(now))
1175 INSTR_TIME_SET_CURRENT(now);
1176 now_us = INSTR_TIME_GET_MICROSEC(now);
1177 while (thread->throttle_trigger < now_us - latency_limit)
1179 thread->throttle_latency_skipped++;
1182 doLog(thread, st, logfile, &now, agg, true);
1184 wait = getPoissonRand(thread, throttle_delay);
1185 thread->throttle_trigger += wait;
1186 st->txn_scheduled = thread->throttle_trigger;
1191 st->throttling = true;
1192 st->is_throttled = true;
1194 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1199 { /* are we sleeping? */
1202 if (INSTR_TIME_IS_ZERO(now))
1203 INSTR_TIME_SET_CURRENT(now);
1204 now_us = INSTR_TIME_GET_MICROSEC(now);
1205 if (st->txn_scheduled <= now_us)
1207 st->sleeping = 0; /* Done sleeping, go ahead with next command */
1210 /* Measure lag of throttled transaction relative to target */
1211 int64 lag = now_us - st->txn_scheduled;
1213 thread->throttle_lag += lag;
1214 if (lag > thread->throttle_lag_max)
1215 thread->throttle_lag_max = lag;
1216 st->throttling = false;
1220 return true; /* Still sleeping, nothing to do here */
1224 { /* are we receiver? */
1225 if (commands[st->state]->type == SQL_COMMAND)
1228 fprintf(stderr, "client %d receiving\n", st->id);
1229 if (!PQconsumeInput(st->con))
1230 { /* there's something wrong */
1231 fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1232 return clientDone(st, false);
1234 if (PQisBusy(st->con))
1235 return true; /* don't have the whole result yet */
1239 * command finished: accumulate per-command execution times in
1240 * thread-local data structure, if per-command latencies are requested
1244 int cnum = commands[st->state]->command_num;
1246 if (INSTR_TIME_IS_ZERO(now))
1247 INSTR_TIME_SET_CURRENT(now);
1248 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1249 now, st->stmt_begin);
1250 thread->exec_count[cnum]++;
1253 /* transaction finished: calculate latency and log the transaction */
1254 if (commands[st->state + 1] == NULL)
1256 /* only calculate latency if an option is used that needs it */
1257 if (progress || throttle_delay || latency_limit)
1261 if (INSTR_TIME_IS_ZERO(now))
1262 INSTR_TIME_SET_CURRENT(now);
1264 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1266 st->txn_latencies += latency;
1269 * XXX In a long benchmark run of high-latency transactions,
1270 * this int64 addition eventually overflows. For example, 100
1271 * threads running 10s transactions will overflow it in 2.56
1272 * hours. With a more-typical OLTP workload of .1s
1273 * transactions, overflow would take 256 hours.
1275 st->txn_sqlats += latency * latency;
1277 /* record over the limit transactions if needed. */
1278 if (latency_limit && latency > latency_limit)
1279 thread->latency_late++;
1282 /* record the time it took in the log */
1284 doLog(thread, st, logfile, &now, agg, false);
1287 if (commands[st->state]->type == SQL_COMMAND)
1290 * Read and discard the query result; note this is not included in
1291 * the statement latency numbers.
1293 res = PQgetResult(st->con);
1294 switch (PQresultStatus(res))
1296 case PGRES_COMMAND_OK:
1297 case PGRES_TUPLES_OK:
1300 fprintf(stderr, "client %d aborted in state %d: %s",
1301 st->id, st->state, PQerrorMessage(st->con));
1303 return clientDone(st, false);
1306 discard_response(st);
1309 if (commands[st->state + 1] == NULL)
1318 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1319 return clientDone(st, true); /* exit success */
1322 /* increment state counter */
1324 if (commands[st->state] == NULL)
1327 st->use_file = (int) getrand(thread, 0, num_files - 1);
1328 commands = sql_files[st->use_file];
1329 st->is_throttled = false;
1332 * No transaction is underway anymore, which means there is
1333 * nothing to listen to right now. When throttling rate limits
1334 * are active, a sleep will happen next, as the next transaction
1335 * starts. And then in any case the next SQL command will set
1339 trans_needs_throttle = (throttle_delay > 0);
1343 if (st->con == NULL)
1348 INSTR_TIME_SET_CURRENT(start);
1349 if ((st->con = doConnect()) == NULL)
1351 fprintf(stderr, "client %d aborted while establishing connection\n",
1353 return clientDone(st, false);
1355 INSTR_TIME_SET_CURRENT(end);
1356 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1360 * This ensures that a throttling delay is inserted before proceeding with
1361 * sql commands, after the first transaction. The first transaction
1362 * throttling is performed when first entering doCustom.
1364 if (trans_needs_throttle)
1366 trans_needs_throttle = false;
1370 /* Record transaction start time under logging, progress or throttling */
1371 if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1373 INSTR_TIME_SET_CURRENT(st->txn_begin);
1376 * When not throttling, this is also the transaction's scheduled start
1379 if (!throttle_delay)
1380 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1383 /* Record statement start time if per-command latencies are requested */
1385 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1387 if (commands[st->state]->type == SQL_COMMAND)
1389 const Command *command = commands[st->state];
1392 if (querymode == QUERY_SIMPLE)
1396 sql = pg_strdup(command->argv[0]);
1397 sql = assignVariables(st, sql);
1400 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1401 r = PQsendQuery(st->con, sql);
1404 else if (querymode == QUERY_EXTENDED)
1406 const char *sql = command->argv[0];
1407 const char *params[MAX_ARGS];
1409 getQueryParams(st, command, params);
1412 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1413 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1414 NULL, params, NULL, NULL, 0);
1416 else if (querymode == QUERY_PREPARED)
1418 char name[MAX_PREPARE_NAME];
1419 const char *params[MAX_ARGS];
1421 if (!st->prepared[st->use_file])
1425 for (j = 0; commands[j] != NULL; j++)
1428 char name[MAX_PREPARE_NAME];
1430 if (commands[j]->type != SQL_COMMAND)
1432 preparedStatementName(name, st->use_file, j);
1433 res = PQprepare(st->con, name,
1434 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1435 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1436 fprintf(stderr, "%s", PQerrorMessage(st->con));
1439 st->prepared[st->use_file] = true;
1442 getQueryParams(st, command, params);
1443 preparedStatementName(name, st->use_file, st->state);
1446 fprintf(stderr, "client %d sending %s\n", st->id, name);
1447 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1448 params, NULL, NULL, 0);
1450 else /* unknown sql mode */
1456 fprintf(stderr, "client %d could not send %s\n",
1457 st->id, command->argv[0]);
1461 st->listen = 1; /* flags that should be listened */
1463 else if (commands[st->state]->type == META_COMMAND)
1465 int argc = commands[st->state]->argc,
1467 char **argv = commands[st->state]->argv;
1471 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1472 for (i = 1; i < argc; i++)
1473 fprintf(stderr, " %s", argv[i]);
1474 fprintf(stderr, "\n");
1477 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1482 double threshold = 0;
1485 if (*argv[2] == ':')
1487 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1489 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1494 min = strtoint64(var);
1497 min = strtoint64(argv[2]);
1499 if (*argv[3] == ':')
1501 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1503 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1508 max = strtoint64(var);
1511 max = strtoint64(argv[3]);
1515 fprintf(stderr, "%s: \\setrandom maximum is less than minimum\n",
1522 * Generate random number functions need to be able to subtract
1523 * max from min and add one to the result without overflowing.
1524 * Since we know max > min, we can detect overflow just by
1525 * checking for a negative result. But we must check both that the
1526 * subtraction doesn't overflow, and that adding one to the result
1527 * doesn't overflow either.
1529 if (max - min < 0 || (max - min) + 1 < 0)
1531 fprintf(stderr, "%s: \\setrandom range is too large\n",
1537 if (argc == 4 || /* uniform without or with "uniform" keyword */
1538 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1541 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1543 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1545 else if (argc == 6 &&
1546 ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1547 (pg_strcasecmp(argv[4], "exponential") == 0)))
1549 if (*argv[5] == ':')
1551 if ((var = getVariable(st, argv[5] + 1)) == NULL)
1553 fprintf(stderr, "%s: invalid threshold number: \"%s\"\n",
1558 threshold = strtod(var, NULL);
1561 threshold = strtod(argv[5], NULL);
1563 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1565 if (threshold < MIN_GAUSSIAN_THRESHOLD)
1567 fprintf(stderr, "gaussian threshold must be at least %f (not \"%s\")\n", MIN_GAUSSIAN_THRESHOLD, argv[5]);
1572 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1574 snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1576 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1578 if (threshold <= 0.0)
1580 fprintf(stderr, "exponential threshold must be greater than zero (not \"%s\")\n", argv[5]);
1585 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1587 snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1590 else /* this means an error somewhere in the parsing phase... */
1592 fprintf(stderr, "%s: invalid arguments for \\setrandom\n",
1598 if (!putVariable(st, argv[0], argv[1], res))
1606 else if (pg_strcasecmp(argv[0], "set") == 0)
1609 PgBenchExpr *expr = commands[st->state]->expr;
1612 if (!evaluateExpr(st, expr, &result))
1617 sprintf(res, INT64_FORMAT, result);
1619 if (!putVariable(st, argv[0], argv[1], res))
1627 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1633 if (*argv[1] == ':')
1635 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1637 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1645 usec = atoi(argv[1]);
1649 if (pg_strcasecmp(argv[2], "ms") == 0)
1651 else if (pg_strcasecmp(argv[2], "s") == 0)
1657 INSTR_TIME_SET_CURRENT(now);
1658 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1663 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1665 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1667 if (timer_exceeded) /* timeout */
1668 return clientDone(st, true);
1669 else if (!ret) /* on error */
1674 else /* succeeded */
1677 else if (pg_strcasecmp(argv[0], "shell") == 0)
1679 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1681 if (timer_exceeded) /* timeout */
1682 return clientDone(st, true);
1683 else if (!ret) /* on error */
1688 else /* succeeded */
1698 * print log entry after completing one transaction.
1701 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1708 * Skip the log entry if sampling is enabled and this row doesn't belong
1709 * to the random sample.
1711 if (sample_rate != 0.0 &&
1712 pg_erand48(thread->random_state) > sample_rate)
1715 if (INSTR_TIME_IS_ZERO(*now))
1716 INSTR_TIME_SET_CURRENT(*now);
1718 latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1722 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1724 /* should we aggregate the results or not? */
1725 if (agg_interval > 0)
1728 * Are we still in the same interval? If yes, accumulate the values
1729 * (print them otherwise)
1731 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1737 * there is no latency to record if the transaction was
1744 agg->sum_latency += latency;
1745 agg->sum2_latency += latency * latency;
1747 /* first in this aggregation interval */
1748 if ((agg->cnt == 1) || (latency < agg->min_latency))
1749 agg->min_latency = latency;
1751 if ((agg->cnt == 1) || (latency > agg->max_latency))
1752 agg->max_latency = latency;
1754 /* and the same for schedule lag */
1757 agg->sum_lag += lag;
1758 agg->sum2_lag += lag * lag;
1760 if ((agg->cnt == 1) || (lag < agg->min_lag))
1762 if ((agg->cnt == 1) || (lag > agg->max_lag))
1770 * Loop until we reach the interval of the current transaction
1771 * (and print all the empty intervals in between).
1773 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1776 * This is a non-Windows branch (thanks to the ifdef in
1777 * usage), so we don't need to handle this in a special way
1780 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1789 fprintf(logfile, " %.0f %.0f %.0f %.0f",
1795 fprintf(logfile, " %d", agg->skipped);
1797 fputc('\n', logfile);
1799 /* move to the next inteval */
1800 agg->start_time = agg->start_time + agg_interval;
1802 /* reset for "no transaction" intervals */
1805 agg->min_latency = 0;
1806 agg->max_latency = 0;
1807 agg->sum_latency = 0;
1808 agg->sum2_latency = 0;
1815 /* reset the values to include only the current transaction. */
1817 agg->skipped = skipped ? 1 : 0;
1818 agg->min_latency = latency;
1819 agg->max_latency = latency;
1820 agg->sum_latency = skipped ? 0.0 : latency;
1821 agg->sum2_latency = skipped ? 0.0 : latency * latency;
1825 agg->sum2_lag = lag * lag;
1830 /* no, print raw transactions */
1833 /* This is more than we really ought to know about instr_time */
1835 fprintf(logfile, "%d %d skipped %d %ld %ld",
1836 st->id, st->cnt, st->use_file,
1837 (long) now->tv_sec, (long) now->tv_usec);
1839 fprintf(logfile, "%d %d %.0f %d %ld %ld",
1840 st->id, st->cnt, latency, st->use_file,
1841 (long) now->tv_sec, (long) now->tv_usec);
1844 /* On Windows, instr_time doesn't provide a timestamp anyway */
1846 fprintf(logfile, "%d %d skipped %d 0 0",
1847 st->id, st->cnt, st->use_file);
1849 fprintf(logfile, "%d %d %.0f %d 0 0",
1850 st->id, st->cnt, latency, st->use_file);
1853 fprintf(logfile, " %.0f", lag);
1854 fputc('\n', logfile);
1858 /* discard connections */
1860 disconnect_all(CState *state, int length)
1864 for (i = 0; i < length; i++)
1868 PQfinish(state[i].con);
1869 state[i].con = NULL;
1874 /* create tables and setup data */
1876 init(bool is_no_vacuum)
1879 * The scale factor at/beyond which 32-bit integers are insufficient for
1880 * storing TPC-B account IDs.
1882 * Although the actual threshold is 21474, we use 20000 because it is easier to
1883 * document and remember, and isn't that far away from the real threshold.
1885 #define SCALE_32BIT_THRESHOLD 20000
1888 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1889 * fields in these table declarations were intended to comply with that.
1890 * The pgbench_accounts table complies with that because the "filler"
1891 * column is set to blank-padded empty string. But for all other tables
1892 * the columns default to NULL and so don't actually take any space. We
1893 * could fix that by giving them non-null default values. However, that
1894 * would completely break comparability of pgbench results with prior
1895 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1896 * anyway, we stick with the historical behavior.
1900 const char *table; /* table name */
1901 const char *smcols; /* column decls if accountIDs are 32 bits */
1902 const char *bigcols; /* column decls if accountIDs are 64 bits */
1903 int declare_fillfactor;
1905 static const struct ddlinfo DDLs[] = {
1908 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1909 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1914 "tid int not null,bid int,tbalance int,filler char(84)",
1915 "tid int not null,bid int,tbalance int,filler char(84)",
1920 "aid int not null,bid int,abalance int,filler char(84)",
1921 "aid bigint not null,bid int,abalance int,filler char(84)",
1926 "bid int not null,bbalance int,filler char(88)",
1927 "bid int not null,bbalance int,filler char(88)",
1931 static const char *const DDLINDEXes[] = {
1932 "alter table pgbench_branches add primary key (bid)",
1933 "alter table pgbench_tellers add primary key (tid)",
1934 "alter table pgbench_accounts add primary key (aid)"
1936 static const char *const DDLKEYs[] = {
1937 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1938 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1939 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1940 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1941 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1950 /* used to track elapsed time and estimate of the remaining time */
1955 int log_interval = 1;
1957 if ((con = doConnect()) == NULL)
1960 for (i = 0; i < lengthof(DDLs); i++)
1964 const struct ddlinfo *ddl = &DDLs[i];
1967 /* Remove old table, if it exists. */
1968 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1969 executeStatement(con, buffer);
1971 /* Construct new create table statement. */
1973 if (ddl->declare_fillfactor)
1974 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1975 " with (fillfactor=%d)", fillfactor);
1976 if (tablespace != NULL)
1978 char *escape_tablespace;
1980 escape_tablespace = PQescapeIdentifier(con, tablespace,
1981 strlen(tablespace));
1982 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1983 " tablespace %s", escape_tablespace);
1984 PQfreemem(escape_tablespace);
1987 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1989 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1990 unlogged_tables ? " unlogged" : "",
1991 ddl->table, cols, opts);
1993 executeStatement(con, buffer);
1996 executeStatement(con, "begin");
1998 for (i = 0; i < nbranches * scale; i++)
2000 /* "filler" column defaults to NULL */
2001 snprintf(sql, sizeof(sql),
2002 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2004 executeStatement(con, sql);
2007 for (i = 0; i < ntellers * scale; i++)
2009 /* "filler" column defaults to NULL */
2010 snprintf(sql, sizeof(sql),
2011 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2012 i + 1, i / ntellers + 1);
2013 executeStatement(con, sql);
2016 executeStatement(con, "commit");
2019 * fill the pgbench_accounts table with some data
2021 fprintf(stderr, "creating tables...\n");
2023 executeStatement(con, "begin");
2024 executeStatement(con, "truncate pgbench_accounts");
2026 res = PQexec(con, "copy pgbench_accounts from stdin");
2027 if (PQresultStatus(res) != PGRES_COPY_IN)
2029 fprintf(stderr, "%s", PQerrorMessage(con));
2034 INSTR_TIME_SET_CURRENT(start);
2036 for (k = 0; k < (int64) naccounts * scale; k++)
2040 /* "filler" column defaults to blank padded empty string */
2041 snprintf(sql, sizeof(sql),
2042 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2043 j, k / naccounts + 1, 0);
2044 if (PQputline(con, sql))
2046 fprintf(stderr, "PQputline failed\n");
2051 * If we want to stick with the original logging, print a message each
2052 * 100k inserted rows.
2054 if ((!use_quiet) && (j % 100000 == 0))
2056 INSTR_TIME_SET_CURRENT(diff);
2057 INSTR_TIME_SUBTRACT(diff, start);
2059 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2060 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2062 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2063 j, (int64) naccounts * scale,
2064 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2065 elapsed_sec, remaining_sec);
2067 /* let's not call the timing for each row, but only each 100 rows */
2068 else if (use_quiet && (j % 100 == 0))
2070 INSTR_TIME_SET_CURRENT(diff);
2071 INSTR_TIME_SUBTRACT(diff, start);
2073 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2074 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2076 /* have we reached the next interval (or end)? */
2077 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2079 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2080 j, (int64) naccounts * scale,
2081 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2083 /* skip to the next interval */
2084 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2089 if (PQputline(con, "\\.\n"))
2091 fprintf(stderr, "very last PQputline failed\n");
2096 fprintf(stderr, "PQendcopy failed\n");
2099 executeStatement(con, "commit");
2104 fprintf(stderr, "vacuum...\n");
2105 executeStatement(con, "vacuum analyze pgbench_branches");
2106 executeStatement(con, "vacuum analyze pgbench_tellers");
2107 executeStatement(con, "vacuum analyze pgbench_accounts");
2108 executeStatement(con, "vacuum analyze pgbench_history");
2114 fprintf(stderr, "set primary keys...\n");
2115 for (i = 0; i < lengthof(DDLINDEXes); i++)
2119 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2121 if (index_tablespace != NULL)
2123 char *escape_tablespace;
2125 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2126 strlen(index_tablespace));
2127 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2128 " using index tablespace %s", escape_tablespace);
2129 PQfreemem(escape_tablespace);
2132 executeStatement(con, buffer);
2136 * create foreign keys
2140 fprintf(stderr, "set foreign keys...\n");
2141 for (i = 0; i < lengthof(DDLKEYs); i++)
2143 executeStatement(con, DDLKEYs[i]);
2147 fprintf(stderr, "done.\n");
2152 * Parse the raw sql and replace :param to $n.
2155 parseQuery(Command *cmd, const char *raw_sql)
2160 sql = pg_strdup(raw_sql);
2164 while ((p = strchr(p, ':')) != NULL)
2170 name = parseVariable(p, &eaten);
2180 if (cmd->argc >= MAX_ARGS)
2182 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2187 sprintf(var, "$%d", cmd->argc);
2188 p = replaceVariable(&sql, p, eaten, var);
2190 cmd->argv[cmd->argc] = name;
2199 syntax_error(const char *source, const int lineno,
2200 const char *line, const char *command,
2201 const char *msg, const char *more, const int column)
2203 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2205 fprintf(stderr, " (%s)", more);
2207 fprintf(stderr, " at column %d", column);
2208 fprintf(stderr, " in command \"%s\"\n", command);
2211 fprintf(stderr, "%s\n", line);
2216 for (i = 0; i < column - 1; i++)
2217 fprintf(stderr, " ");
2218 fprintf(stderr, "^ error found here\n");
2224 /* Parse a command; return a Command struct, or NULL if it's a comment */
2226 process_commands(char *buf, const char *source, const int lineno)
2228 const char delim[] = " \f\n\r\t\v";
2230 Command *my_commands;
2235 /* Make the string buf end at the next newline */
2236 if ((p = strchr(buf, '\n')) != NULL)
2239 /* Skip leading whitespace */
2241 while (isspace((unsigned char) *p))
2244 /* If the line is empty or actually a comment, we're done */
2245 if (*p == '\0' || strncmp(p, "--", 2) == 0)
2248 /* Allocate and initialize Command structure */
2249 my_commands = (Command *) pg_malloc(sizeof(Command));
2250 my_commands->line = pg_strdup(buf);
2251 my_commands->command_num = num_commands++;
2252 my_commands->type = 0; /* until set */
2253 my_commands->argc = 0;
2259 my_commands->type = META_COMMAND;
2262 tok = strtok(++p, delim);
2264 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2269 my_commands->cols[j] = tok - buf + 1;
2270 my_commands->argv[j++] = pg_strdup(tok);
2271 my_commands->argc++;
2272 if (max_args >= 0 && my_commands->argc >= max_args)
2273 tok = strtok(NULL, "");
2275 tok = strtok(NULL, delim);
2278 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2281 * parsing: \setrandom variable min max [uniform] \setrandom
2282 * variable min max (gaussian|exponential) threshold
2285 if (my_commands->argc < 4)
2287 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2288 "missing arguments", NULL, -1);
2293 if (my_commands->argc == 4 || /* uniform without/with
2294 * "uniform" keyword */
2295 (my_commands->argc == 5 &&
2296 pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2300 else if ( /* argc >= 5 */
2301 (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2302 (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2304 if (my_commands->argc < 6)
2306 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2307 "missing threshold argument", my_commands->argv[4], -1);
2309 else if (my_commands->argc > 6)
2311 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2312 "too many arguments", my_commands->argv[4],
2313 my_commands->cols[6]);
2316 else /* cannot parse, unexpected arguments */
2318 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2319 "unexpected argument", my_commands->argv[4],
2320 my_commands->cols[4]);
2323 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2325 if (my_commands->argc < 3)
2327 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2328 "missing argument", NULL, -1);
2331 expr_scanner_init(my_commands->argv[2], source, lineno,
2332 my_commands->line, my_commands->argv[0],
2333 my_commands->cols[2] - 1);
2335 if (expr_yyparse() != 0)
2337 /* dead code: exit done from syntax_error called by yyerror */
2341 my_commands->expr = expr_parse_result;
2343 expr_scanner_finish();
2345 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2347 if (my_commands->argc < 2)
2349 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2350 "missing argument", NULL, -1);
2354 * Split argument into number and unit to allow "sleep 1ms" etc.
2355 * We don't have to terminate the number argument with null
2356 * because it will be parsed with atoi, which ignores trailing
2357 * non-digit characters.
2359 if (my_commands->argv[1][0] != ':')
2361 char *c = my_commands->argv[1];
2363 while (isdigit((unsigned char) *c))
2367 my_commands->argv[2] = c;
2368 if (my_commands->argc < 3)
2369 my_commands->argc = 3;
2373 if (my_commands->argc >= 3)
2375 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2376 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2377 pg_strcasecmp(my_commands->argv[2], "s") != 0)
2379 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2380 "unknown time unit, must be us, ms or s",
2381 my_commands->argv[2], my_commands->cols[2]);
2385 /* this should be an error?! */
2386 for (j = 3; j < my_commands->argc; j++)
2387 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2388 my_commands->argv[0], my_commands->argv[j]);
2390 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2392 if (my_commands->argc < 3)
2394 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2395 "missing argument", NULL, -1);
2398 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2400 if (my_commands->argc < 1)
2402 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2403 "missing command", NULL, -1);
2408 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2409 "invalid command", NULL, -1);
2414 my_commands->type = SQL_COMMAND;
2419 my_commands->argv[0] = pg_strdup(p);
2420 my_commands->argc++;
2422 case QUERY_EXTENDED:
2423 case QUERY_PREPARED:
2424 if (!parseQuery(my_commands, p))
2436 * Read a line from fd, and return it in a malloc'd buffer.
2437 * Return NULL at EOF.
2439 * The buffer will typically be larger than necessary, but we don't care
2440 * in this program, because we'll free it as soon as we've parsed the line.
2443 read_line_from_file(FILE *fd)
2445 char tmpbuf[BUFSIZ];
2447 size_t buflen = BUFSIZ;
2450 buf = (char *) palloc(buflen);
2453 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2455 size_t thislen = strlen(tmpbuf);
2457 /* Append tmpbuf to whatever we had already */
2458 memcpy(buf + used, tmpbuf, thislen + 1);
2461 /* Done if we collected a newline */
2462 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2465 /* Else, enlarge buf to ensure we can append next bufferload */
2467 buf = (char *) pg_realloc(buf, buflen);
2479 process_file(char *filename)
2481 #define COMMANDS_ALLOC_NUM 128
2483 Command **my_commands;
2490 if (num_files >= MAX_FILES)
2492 fprintf(stderr, "at most %d SQL files are allowed\n", MAX_FILES);
2496 alloc_num = COMMANDS_ALLOC_NUM;
2497 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2499 if (strcmp(filename, "-") == 0)
2501 else if ((fd = fopen(filename, "r")) == NULL)
2503 fprintf(stderr, "could not open file \"%s\": %s\n",
2504 filename, strerror(errno));
2505 pg_free(my_commands);
2512 while ((buf = read_line_from_file(fd)) != NULL)
2518 command = process_commands(buf, filename, lineno);
2522 if (command == NULL)
2525 my_commands[index] = command;
2528 if (index >= alloc_num)
2530 alloc_num += COMMANDS_ALLOC_NUM;
2531 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2536 my_commands[index] = NULL;
2538 sql_files[num_files++] = my_commands;
2544 process_builtin(char *tb, const char *source)
2546 #define COMMANDS_ALLOC_NUM 128
2548 Command **my_commands;
2554 alloc_num = COMMANDS_ALLOC_NUM;
2555 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2566 while (*tb && *tb != '\n')
2579 command = process_commands(buf, source, lineno);
2580 if (command == NULL)
2583 my_commands[index] = command;
2586 if (index >= alloc_num)
2588 alloc_num += COMMANDS_ALLOC_NUM;
2589 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2593 my_commands[index] = NULL;
2598 /* print out results */
2600 printResults(int ttype, int64 normal_xacts, int nclients,
2601 TState *threads, int nthreads,
2602 instr_time total_time, instr_time conn_total_time,
2603 int64 total_latencies, int64 total_sqlats,
2604 int64 throttle_lag, int64 throttle_lag_max,
2605 int64 throttle_latency_skipped, int64 latency_late)
2607 double time_include,
2612 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2613 tps_include = normal_xacts / time_include;
2614 tps_exclude = normal_xacts / (time_include -
2615 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2618 s = "TPC-B (sort of)";
2619 else if (ttype == 2)
2620 s = "Update only pgbench_accounts";
2621 else if (ttype == 1)
2626 printf("transaction type: %s\n", s);
2627 printf("scaling factor: %d\n", scale);
2628 printf("query mode: %s\n", QUERYMODE[querymode]);
2629 printf("number of clients: %d\n", nclients);
2630 printf("number of threads: %d\n", nthreads);
2633 printf("number of transactions per client: %d\n", nxacts);
2634 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2635 normal_xacts, (int64) nxacts * nclients);
2639 printf("duration: %d s\n", duration);
2640 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2644 /* Remaining stats are nonsensical if we failed to execute any xacts */
2645 if (normal_xacts <= 0)
2648 if (throttle_delay && latency_limit)
2649 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2650 throttle_latency_skipped,
2651 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2654 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2655 latency_limit / 1000.0, latency_late,
2656 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2658 if (throttle_delay || progress || latency_limit)
2660 /* compute and show latency average and standard deviation */
2661 double latency = 0.001 * total_latencies / normal_xacts;
2662 double sqlat = (double) total_sqlats / normal_xacts;
2664 printf("latency average: %.3f ms\n"
2665 "latency stddev: %.3f ms\n",
2666 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2670 /* only an average latency computed from the duration is available */
2671 printf("latency average: %.3f ms\n",
2672 1000.0 * duration * nclients / normal_xacts);
2678 * Report average transaction lag under rate limit throttling. This
2679 * is the delay between scheduled and actual start times for the
2680 * transaction. The measured lag may be caused by thread/client load,
2681 * the database load, or the Poisson throttling process.
2683 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2684 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2687 printf("tps = %f (including connections establishing)\n", tps_include);
2688 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2690 /* Report per-command latencies */
2695 for (i = 0; i < num_files; i++)
2700 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2702 printf("statement latencies in milliseconds:\n");
2704 for (commands = sql_files[i]; *commands != NULL; commands++)
2706 Command *command = *commands;
2707 int cnum = command->command_num;
2709 instr_time total_exec_elapsed;
2710 int total_exec_count;
2713 /* Accumulate per-thread data for command */
2714 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2715 total_exec_count = 0;
2716 for (t = 0; t < nthreads; t++)
2718 TState *thread = &threads[t];
2720 INSTR_TIME_ADD(total_exec_elapsed,
2721 thread->exec_elapsed[cnum]);
2722 total_exec_count += thread->exec_count[cnum];
2725 if (total_exec_count > 0)
2726 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2730 printf("\t%f\t%s\n", total_time, command->line);
2738 main(int argc, char **argv)
2740 static struct option long_options[] = {
2741 /* systematic long/short named options */
2742 {"client", required_argument, NULL, 'c'},
2743 {"connect", no_argument, NULL, 'C'},
2744 {"debug", no_argument, NULL, 'd'},
2745 {"define", required_argument, NULL, 'D'},
2746 {"file", required_argument, NULL, 'f'},
2747 {"fillfactor", required_argument, NULL, 'F'},
2748 {"host", required_argument, NULL, 'h'},
2749 {"initialize", no_argument, NULL, 'i'},
2750 {"jobs", required_argument, NULL, 'j'},
2751 {"log", no_argument, NULL, 'l'},
2752 {"no-vacuum", no_argument, NULL, 'n'},
2753 {"port", required_argument, NULL, 'p'},
2754 {"progress", required_argument, NULL, 'P'},
2755 {"protocol", required_argument, NULL, 'M'},
2756 {"quiet", no_argument, NULL, 'q'},
2757 {"report-latencies", no_argument, NULL, 'r'},
2758 {"scale", required_argument, NULL, 's'},
2759 {"select-only", no_argument, NULL, 'S'},
2760 {"skip-some-updates", no_argument, NULL, 'N'},
2761 {"time", required_argument, NULL, 'T'},
2762 {"transactions", required_argument, NULL, 't'},
2763 {"username", required_argument, NULL, 'U'},
2764 {"vacuum-all", no_argument, NULL, 'v'},
2765 /* long-named only options */
2766 {"foreign-keys", no_argument, &foreign_keys, 1},
2767 {"index-tablespace", required_argument, NULL, 3},
2768 {"tablespace", required_argument, NULL, 2},
2769 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2770 {"sampling-rate", required_argument, NULL, 4},
2771 {"aggregate-interval", required_argument, NULL, 5},
2772 {"rate", required_argument, NULL, 'R'},
2773 {"latency-limit", required_argument, NULL, 'L'},
2778 int nclients = 1; /* default number of simulated clients */
2779 int nthreads = 1; /* default number of threads */
2780 int is_init_mode = 0; /* initialize mode? */
2781 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2782 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2783 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2784 * 2: skip update of branches and tellers */
2786 char *filename = NULL;
2787 bool scale_given = false;
2789 bool benchmarking_option_set = false;
2790 bool initialization_option_set = false;
2792 CState *state; /* status of clients */
2793 TState *threads; /* array of thread */
2795 instr_time start_time; /* start up time */
2796 instr_time total_time;
2797 instr_time conn_total_time;
2798 int64 total_xacts = 0;
2799 int64 total_latencies = 0;
2800 int64 total_sqlats = 0;
2801 int64 throttle_lag = 0;
2802 int64 throttle_lag_max = 0;
2803 int64 throttle_latency_skipped = 0;
2804 int64 latency_late = 0;
2809 #ifdef HAVE_GETRLIMIT
2819 progname = get_progname(argv[0]);
2823 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2828 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2830 puts("pgbench (PostgreSQL) " PG_VERSION);
2836 /* stderr is buffered on Win32. */
2837 setvbuf(stderr, NULL, _IONBF, 0);
2840 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2842 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2844 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2847 state = (CState *) pg_malloc(sizeof(CState));
2848 memset(state, 0, sizeof(CState));
2850 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2858 pghost = pg_strdup(optarg);
2864 do_vacuum_accounts++;
2867 pgport = pg_strdup(optarg);
2874 benchmarking_option_set = true;
2878 benchmarking_option_set = true;
2881 benchmarking_option_set = true;
2882 nclients = atoi(optarg);
2883 if (nclients <= 0 || nclients > MAXCLIENTS)
2885 fprintf(stderr, "invalid number of clients: \"%s\"\n",
2889 #ifdef HAVE_GETRLIMIT
2890 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2891 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2892 #else /* but BSD doesn't ... */
2893 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2894 #endif /* RLIMIT_NOFILE */
2896 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2899 if (rlim.rlim_cur < nclients + 3)
2901 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
2902 nclients + 3, (long) rlim.rlim_cur);
2903 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
2906 #endif /* HAVE_GETRLIMIT */
2908 case 'j': /* jobs */
2909 benchmarking_option_set = true;
2910 nthreads = atoi(optarg);
2913 fprintf(stderr, "invalid number of threads: \"%s\"\n",
2917 #ifndef ENABLE_THREAD_SAFETY
2920 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
2923 #endif /* !ENABLE_THREAD_SAFETY */
2926 benchmarking_option_set = true;
2930 benchmarking_option_set = true;
2931 is_latencies = true;
2935 scale = atoi(optarg);
2938 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
2943 benchmarking_option_set = true;
2946 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2949 nxacts = atoi(optarg);
2952 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
2958 benchmarking_option_set = true;
2961 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2964 duration = atoi(optarg);
2967 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
2972 login = pg_strdup(optarg);
2975 benchmarking_option_set = true;
2979 initialization_option_set = true;
2983 benchmarking_option_set = true;
2985 filename = pg_strdup(optarg);
2986 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2993 benchmarking_option_set = true;
2995 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2997 fprintf(stderr, "invalid variable definition: \"%s\"\n",
3003 if (!putVariable(&state[0], "option", optarg, p))
3008 initialization_option_set = true;
3009 fillfactor = atoi(optarg);
3010 if (fillfactor < 10 || fillfactor > 100)
3012 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3017 benchmarking_option_set = true;
3020 fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f)\n");
3023 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3024 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3026 if (querymode >= NUM_QUERYMODE)
3028 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3034 benchmarking_option_set = true;
3035 progress = atoi(optarg);
3038 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3045 /* get a double from the beginning of option value */
3046 double throttle_value = atof(optarg);
3048 benchmarking_option_set = true;
3050 if (throttle_value <= 0.0)
3052 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3055 /* Invert rate limit into a time offset */
3056 throttle_delay = (int64) (1000000.0 / throttle_value);
3061 double limit_ms = atof(optarg);
3063 if (limit_ms <= 0.0)
3065 fprintf(stderr, "invalid latency limit: \"%s\"\n",
3069 benchmarking_option_set = true;
3070 latency_limit = (int64) (limit_ms * 1000);
3074 /* This covers long options which take no argument. */
3075 if (foreign_keys || unlogged_tables)
3076 initialization_option_set = true;
3078 case 2: /* tablespace */
3079 initialization_option_set = true;
3080 tablespace = pg_strdup(optarg);
3082 case 3: /* index-tablespace */
3083 initialization_option_set = true;
3084 index_tablespace = pg_strdup(optarg);
3087 benchmarking_option_set = true;
3088 sample_rate = atof(optarg);
3089 if (sample_rate <= 0.0 || sample_rate > 1.0)
3091 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3097 fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3100 benchmarking_option_set = true;
3101 agg_interval = atoi(optarg);
3102 if (agg_interval <= 0)
3104 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3111 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3118 * Don't need more threads than there are clients. (This is not merely an
3119 * optimization; throttle_delay is calculated incorrectly below if some
3120 * threads have no clients assigned to them.)
3122 if (nthreads > nclients)
3123 nthreads = nclients;
3125 /* compute a per thread delay */
3126 throttle_delay *= nthreads;
3129 dbName = argv[optind];
3132 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3134 else if (login != NULL && *login != '\0')
3142 if (benchmarking_option_set)
3144 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3153 if (initialization_option_set)
3155 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3160 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3161 if (nxacts <= 0 && duration <= 0)
3162 nxacts = DEFAULT_NXACTS;
3164 /* --sampling-rate may be used only with -l */
3165 if (sample_rate > 0.0 && !use_log)
3167 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3171 /* --sampling-rate may must not be used with --aggregate-interval */
3172 if (sample_rate > 0.0 && agg_interval > 0)
3174 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3178 if (agg_interval > 0 && !use_log)
3180 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3184 if (duration > 0 && agg_interval > duration)
3186 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3190 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3192 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3197 * save main process id in the global variable because process id will be
3198 * changed after fork.
3200 main_pid = (int) getpid();
3201 progress_nclients = nclients;
3202 progress_nthreads = nthreads;
3206 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3207 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3209 /* copy any -D switch values to all clients */
3210 for (i = 1; i < nclients; i++)
3215 for (j = 0; j < state[0].nvariables; j++)
3217 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3226 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3227 pghost, pgport, nclients, nxacts, dbName);
3229 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3230 pghost, pgport, nclients, duration, dbName);
3233 /* opening connection... */
3238 if (PQstatus(con) == CONNECTION_BAD)
3240 fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
3241 fprintf(stderr, "%s", PQerrorMessage(con));
3248 * get the scaling factor that should be same as count(*) from
3249 * pgbench_branches if this is not a custom query
3251 res = PQexec(con, "select count(*) from pgbench_branches");
3252 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3254 fprintf(stderr, "%s", PQerrorMessage(con));
3257 scale = atoi(PQgetvalue(res, 0, 0));
3260 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3261 PQgetvalue(res, 0, 0));
3266 /* warn if we override user-given -s switch */
3269 "scale option ignored, using count from pgbench_branches table (%d)\n",
3274 * :scale variables normally get -s or database scale, but don't override
3275 * an explicit -D switch
3277 if (getVariable(&state[0], "scale") == NULL)
3279 snprintf(val, sizeof(val), "%d", scale);
3280 for (i = 0; i < nclients; i++)
3282 if (!putVariable(&state[i], "startup", "scale", val))
3288 * Define a :client_id variable that is unique per connection. But don't
3289 * override an explicit -D switch.
3291 if (getVariable(&state[0], "client_id") == NULL)
3293 for (i = 0; i < nclients; i++)
3295 snprintf(val, sizeof(val), "%d", i);
3296 if (!putVariable(&state[i], "startup", "client_id", val))
3303 fprintf(stderr, "starting vacuum...");
3304 tryExecuteStatement(con, "vacuum pgbench_branches");
3305 tryExecuteStatement(con, "vacuum pgbench_tellers");
3306 tryExecuteStatement(con, "truncate pgbench_history");
3307 fprintf(stderr, "end.\n");
3309 if (do_vacuum_accounts)
3311 fprintf(stderr, "starting vacuum pgbench_accounts...");
3312 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3313 fprintf(stderr, "end.\n");
3318 /* set random seed */
3319 INSTR_TIME_SET_CURRENT(start_time);
3320 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3322 /* process builtin SQL scripts */
3326 sql_files[0] = process_builtin(tpc_b,
3327 "<builtin: TPC-B (sort of)>");
3332 sql_files[0] = process_builtin(select_only,
3333 "<builtin: select only>");
3338 sql_files[0] = process_builtin(simple_update,
3339 "<builtin: simple update>");
3347 /* set up thread data structures */
3348 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3351 for (i = 0; i < nthreads; i++)
3353 TState *thread = &threads[i];
3356 thread->state = &state[nclients_dealt];
3358 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
3359 thread->random_state[0] = random();
3360 thread->random_state[1] = random();
3361 thread->random_state[2] = random();
3362 thread->throttle_latency_skipped = 0;
3363 thread->latency_late = 0;
3365 nclients_dealt += thread->nstate;
3369 /* Reserve memory for the thread to store per-command latencies */
3372 thread->exec_elapsed = (instr_time *)
3373 pg_malloc(sizeof(instr_time) * num_commands);
3374 thread->exec_count = (int *)
3375 pg_malloc(sizeof(int) * num_commands);
3377 for (t = 0; t < num_commands; t++)
3379 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3380 thread->exec_count[t] = 0;
3385 thread->exec_elapsed = NULL;
3386 thread->exec_count = NULL;
3390 /* all clients must be assigned to a thread */
3391 Assert(nclients_dealt == nclients);
3393 /* get start up time */
3394 INSTR_TIME_SET_CURRENT(start_time);
3396 /* set alarm if duration is specified. */
3401 #ifdef ENABLE_THREAD_SAFETY
3402 for (i = 0; i < nthreads; i++)
3404 TState *thread = &threads[i];
3406 INSTR_TIME_SET_CURRENT(thread->start_time);
3408 /* the first thread (i = 0) is executed by main thread */
3411 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
3413 if (err != 0 || thread->thread == INVALID_THREAD)
3415 fprintf(stderr, "could not create thread: %s\n", strerror(err));
3421 thread->thread = INVALID_THREAD;
3425 INSTR_TIME_SET_CURRENT(threads[0].start_time);
3426 threads[0].thread = INVALID_THREAD;
3427 #endif /* ENABLE_THREAD_SAFETY */
3429 /* wait for threads and accumulate results */
3430 INSTR_TIME_SET_ZERO(conn_total_time);
3431 for (i = 0; i < nthreads; i++)
3433 TState *thread = &threads[i];
3436 #ifdef ENABLE_THREAD_SAFETY
3437 if (threads[i].thread == INVALID_THREAD)
3438 /* actually run this thread directly in the main thread */
3439 (void) threadRun(thread);
3441 /* wait of other threads. should check that 0 is returned? */
3442 pthread_join(thread->thread, NULL);
3444 (void) threadRun(thread);
3445 #endif /* ENABLE_THREAD_SAFETY */
3447 /* thread level stats */
3448 throttle_lag += thread->throttle_lag;
3449 throttle_latency_skipped = threads->throttle_latency_skipped;
3450 latency_late = thread->latency_late;
3451 if (throttle_lag_max > thread->throttle_lag_max)
3452 throttle_lag_max = thread->throttle_lag_max;
3453 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
3455 /* client-level stats */
3456 for (j = 0; j < thread->nstate; j++)
3458 total_xacts += thread->state[j].cnt;
3459 total_latencies += thread->state[i].txn_latencies;
3460 total_sqlats += thread->state[i].txn_sqlats;
3463 disconnect_all(state, nclients);
3466 * XXX We compute results as though every client of every thread started
3467 * and finished at the same time. That model can diverge noticeably from
3468 * reality for a short benchmark run involving relatively many threads.
3469 * The first thread may process notably many transactions before the last
3470 * thread begins. Improving the model alone would bring limited benefit,
3471 * because performance during those periods of partial thread count can
3472 * easily exceed steady state performance. This is one of the many ways
3473 * short runs convey deceptive performance figures.
3475 INSTR_TIME_SET_CURRENT(total_time);
3476 INSTR_TIME_SUBTRACT(total_time, start_time);
3477 printResults(ttype, total_xacts, nclients, threads, nthreads,
3478 total_time, conn_total_time, total_latencies, total_sqlats,
3479 throttle_lag, throttle_lag_max, throttle_latency_skipped,
3486 threadRun(void *arg)
3488 TState *thread = (TState *) arg;
3489 CState *state = thread->state;
3490 FILE *logfile = NULL; /* per-thread log file */
3493 int nstate = thread->nstate;
3494 int remains = nstate; /* number of remaining clients */
3497 /* for reporting progress: */
3498 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3499 int64 last_report = thread_start;
3500 int64 next_report = last_report + (int64) progress * 1000000;
3501 int64 last_count = 0,
3510 * Initialize throttling rate target for all of the thread's clients. It
3511 * might be a little more accurate to reset thread->start_time here too.
3512 * The possible drift seems too small relative to typical throttle delay
3513 * times to worry about it.
3515 INSTR_TIME_SET_CURRENT(start);
3516 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3517 thread->throttle_lag = 0;
3518 thread->throttle_lag_max = 0;
3520 INSTR_TIME_SET_ZERO(thread->conn_time);
3522 /* open log file if requested */
3527 if (thread->tid == 0)
3528 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3530 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3531 logfile = fopen(logpath, "w");
3533 if (logfile == NULL)
3535 fprintf(stderr, "could not open logfile \"%s\": %s\n",
3536 logpath, strerror(errno));
3543 /* make connections to the database */
3544 for (i = 0; i < nstate; i++)
3546 if ((state[i].con = doConnect()) == NULL)
3551 /* time after thread and connections set up */
3552 INSTR_TIME_SET_CURRENT(thread->conn_time);
3553 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
3555 agg_vals_init(&aggs, thread->start_time);
3557 /* send start up queries in async manner */
3558 for (i = 0; i < nstate; i++)
3560 CState *st = &state[i];
3561 Command **commands = sql_files[st->use_file];
3562 int prev_ecnt = st->ecnt;
3564 st->use_file = getrand(thread, 0, num_files - 1);
3565 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3566 remains--; /* I've aborted */
3568 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3570 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3572 remains--; /* I've aborted */
3581 int maxsock; /* max socket number to be waited */
3585 FD_ZERO(&input_mask);
3588 min_usec = PG_INT64_MAX;
3589 for (i = 0; i < nstate; i++)
3591 CState *st = &state[i];
3592 Command **commands = sql_files[st->use_file];
3595 if (st->con == NULL)
3599 else if (st->sleeping)
3601 if (st->throttling && timer_exceeded)
3603 /* interrupt client which has not started a transaction */
3606 st->throttling = false;
3611 else /* just a nap from the script */
3615 if (min_usec == PG_INT64_MAX)
3619 INSTR_TIME_SET_CURRENT(now);
3620 now_usec = INSTR_TIME_GET_MICROSEC(now);
3623 this_usec = st->txn_scheduled - now_usec;
3624 if (min_usec > this_usec)
3625 min_usec = this_usec;
3628 else if (commands[st->state]->type == META_COMMAND)
3630 min_usec = 0; /* the connection is ready to run */
3634 sock = PQsocket(st->con);
3637 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3641 FD_SET(sock, &input_mask);
3647 /* also wake up to print the next progress report on time */
3648 if (progress && min_usec > 0)
3650 /* get current time if needed */
3655 INSTR_TIME_SET_CURRENT(now);
3656 now_usec = INSTR_TIME_GET_MICROSEC(now);
3659 if (now_usec >= next_report)
3661 else if ((next_report - now_usec) < min_usec)
3662 min_usec = next_report - now_usec;
3666 * Sleep until we receive data from the server, or a nap-time
3667 * specified in the script ends, or it's time to print a progress
3670 if (min_usec > 0 && maxsock != -1)
3672 int nsocks; /* return from select(2) */
3674 if (min_usec != PG_INT64_MAX)
3676 struct timeval timeout;
3678 timeout.tv_sec = min_usec / 1000000;
3679 timeout.tv_usec = min_usec % 1000000;
3680 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3683 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3688 /* must be something wrong */
3689 fprintf(stderr, "select() failed: %s\n", strerror(errno));
3694 /* ok, backend returns reply */
3695 for (i = 0; i < nstate; i++)
3697 CState *st = &state[i];
3698 Command **commands = sql_files[st->use_file];
3699 int prev_ecnt = st->ecnt;
3701 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3702 || commands[st->state]->type == META_COMMAND))
3704 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3705 remains--; /* I've aborted */
3708 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3710 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3712 remains--; /* I've aborted */
3718 /* progress report by thread 0 for all threads */
3719 if (progress && thread->tid == 0)
3721 instr_time now_time;
3724 INSTR_TIME_SET_CURRENT(now_time);
3725 now = INSTR_TIME_GET_MICROSEC(now_time);
3726 if (now >= next_report)
3728 /* generate and show report */
3734 int64 run = now - last_report;
3743 * Add up the statistics of all threads.
3745 * XXX: No locking. There is no guarantee that we get an
3746 * atomic snapshot of the transaction count and latencies, so
3747 * these figures can well be off by a small amount. The
3748 * progress is report's purpose is to give a quick overview of
3749 * how the test is going, so that shouldn't matter too much.
3750 * (If a read from a 64-bit integer is not atomic, you might
3751 * get a "torn" read and completely bogus latencies though!)
3753 for (i = 0; i < progress_nclients; i++)
3755 count += state[i].cnt;
3756 lats += state[i].txn_latencies;
3757 sqlats += state[i].txn_sqlats;
3760 for (i = 0; i < progress_nthreads; i++)
3761 lags += thread[i].throttle_lag;
3763 total_run = (now - thread_start) / 1000000.0;
3764 tps = 1000000.0 * (count - last_count) / run;
3765 latency = 0.001 * (lats - last_lats) / (count - last_count);
3766 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3767 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3768 lag = 0.001 * (lags - last_lags) / (count - last_count);
3769 skipped = thread->throttle_latency_skipped - last_skipped;
3772 "progress: %.1f s, %.1f tps, "
3773 "lat %.3f ms stddev %.3f",
3774 total_run, tps, latency, stdev);
3777 fprintf(stderr, ", lag %.3f ms", lag);
3779 fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
3781 fprintf(stderr, "\n");
3785 last_sqlats = sqlats;
3788 last_skipped = thread->throttle_latency_skipped;
3791 * Ensure that the next report is in the future, in case
3792 * pgbench/postgres got stuck somewhere.
3796 next_report += (int64) progress *1000000;
3797 } while (now >= next_report);
3803 INSTR_TIME_SET_CURRENT(start);
3804 disconnect_all(state, nstate);
3805 INSTR_TIME_SET_CURRENT(end);
3806 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
3813 * Support for duration option: set timer_exceeded after so many seconds.
3819 handle_sig_alarm(SIGNAL_ARGS)
3821 timer_exceeded = true;
3825 setalarm(int seconds)
3827 pqsignal(SIGALRM, handle_sig_alarm);
3833 static VOID CALLBACK
3834 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3836 timer_exceeded = true;
3840 setalarm(int seconds)
3845 /* This function will be called at most once, so we can cheat a bit. */
3846 queue = CreateTimerQueue();
3847 if (seconds > ((DWORD) -1) / 1000 ||
3848 !CreateTimerQueueTimer(&timer, queue,
3849 win32_timer_callback, NULL, seconds * 1000, 0,
3850 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3852 fprintf(stderr, "failed to set timer\n");
3857 /* partial pthread implementation for Windows */
3859 typedef struct win32_pthread
3862 void *(*routine) (void *);
3867 static unsigned __stdcall
3868 win32_pthread_run(void *arg)
3870 win32_pthread *th = (win32_pthread *) arg;
3872 th->result = th->routine(th->arg);
3878 pthread_create(pthread_t *thread,
3879 pthread_attr_t *attr,
3880 void *(*start_routine) (void *),
3886 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3887 th->routine = start_routine;
3891 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3892 if (th->handle == NULL)
3904 pthread_join(pthread_t th, void **thread_return)
3906 if (th == NULL || th->handle == NULL)
3907 return errno = EINVAL;
3909 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3911 _dosmaperr(GetLastError());
3916 *thread_return = th->result;
3918 CloseHandle(th->handle);