4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2015, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define M_PI 3.14159265358979323846
59 * Multi-platform pthread implementations
63 /* Use native win32 threads on Windows */
64 typedef struct win32_pthread *pthread_t;
65 typedef int pthread_attr_t;
67 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
68 static int pthread_join(pthread_t th, void **thread_return);
69 #elif defined(ENABLE_THREAD_SAFETY)
70 /* Use platform-dependent pthread capability */
73 /* No threads implementation, use none (-j 1) */
74 #define pthread_t void *
78 /********************************************************************
79 * some configurable parameters */
81 /* max number of clients allowed */
83 #define MAXCLIENTS (FD_SETSIZE - 10)
85 #define MAXCLIENTS 1024
88 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
89 #define DEFAULT_NXACTS 10 /* default nxacts */
91 #define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */
93 int nxacts = 0; /* number of transactions per client */
94 int duration = 0; /* duration in seconds */
97 * scaling factor. for example, scale = 10 will make 1000000 tuples in
98 * pgbench_accounts table.
103 * fillfactor. for example, fillfactor = 90 will use only 90 percent
104 * space during inserts and leave 10 percent free.
106 int fillfactor = 100;
109 * create foreign key constraints on the tables?
111 int foreign_keys = 0;
114 * use unlogged tables?
116 int unlogged_tables = 0;
119 * log sampling rate (1.0 = log everything, 0.0 = option not given)
121 double sample_rate = 0.0;
124 * When threads are throttled to a given rate limit, this is the target delay
125 * to reach that rate in usec. 0 is the default and means no throttling.
127 int64 throttle_delay = 0;
130 * Transactions which take longer than this limit (in usec) are counted as
131 * late, and reported as such, although they are completed anyway. When
132 * throttling is enabled, execution time slots that are more than this late
133 * are skipped altogether, and counted separately.
135 int64 latency_limit = 0;
138 * tablespace selection
140 char *tablespace = NULL;
141 char *index_tablespace = NULL;
144 * end of configurable parameters
145 *********************************************************************/
147 #define nbranches 1 /* Makes little sense to change this. Change
150 #define naccounts 100000
153 * The scale factor at/beyond which 32bit integers are incapable of storing
156 * Although the actual threshold is 21474, we use 20000 because it is easier to
157 * document and remember, and isn't that far away from the real threshold.
159 #define SCALE_32BIT_THRESHOLD 20000
161 bool use_log; /* log transaction latencies to a file */
162 bool use_quiet; /* quiet logging onto stderr */
163 int agg_interval; /* log aggregates instead of individual
165 int progress = 0; /* thread progress report every this seconds */
166 int progress_nclients = 0; /* number of clients for progress
168 int progress_nthreads = 0; /* number of threads for progress
170 bool is_connect; /* establish connection for each transaction */
171 bool is_latencies; /* report per-command latencies */
172 int main_pid; /* main process id used in log filename */
178 const char *progname;
180 volatile bool timer_exceeded = false; /* flag from signal handler */
182 /* variable definitions */
185 char *name; /* variable name */
186 char *value; /* its value */
189 #define MAX_FILES 128 /* max number of SQL script files allowed */
190 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
193 * structures used in custom query mode
198 PGconn *con; /* connection handle to DB */
199 int id; /* client No. */
200 int state; /* state No. */
201 int listen; /* 0 indicates that an async query has been
203 int sleeping; /* 1 indicates that the client is napping */
204 bool throttling; /* whether nap is for throttling */
205 Variable *variables; /* array of variable definitions */
207 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
208 instr_time txn_begin; /* used for measuring schedule lag times */
209 instr_time stmt_begin; /* used for measuring statement latencies */
210 bool is_throttled; /* whether transaction throttling is done */
211 int use_file; /* index in sql_files for this client */
212 bool prepared[MAX_FILES];
214 /* per client collected stats */
215 int cnt; /* xacts count */
216 int ecnt; /* error count */
217 int64 txn_latencies; /* cumulated latencies */
218 int64 txn_sqlats; /* cumulated square latencies */
226 int tid; /* thread id */
227 pthread_t thread; /* thread handle */
228 CState *state; /* array of CState */
229 int nstate; /* length of state[] */
230 instr_time start_time; /* thread start time */
231 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
232 int *exec_count; /* number of cmd executions (per Command) */
233 unsigned short random_state[3]; /* separate randomness for each thread */
234 int64 throttle_trigger; /* previous/next throttling (us) */
236 /* per thread collected stats */
237 instr_time conn_time;
238 int64 throttle_lag; /* total transaction lag behind throttling */
239 int64 throttle_lag_max; /* max transaction lag */
240 int64 throttle_latency_skipped; /* lagging transactions
242 int64 latency_late; /* late transactions */
245 #define INVALID_THREAD ((pthread_t) 0)
248 * queries read from files
250 #define SQL_COMMAND 1
251 #define META_COMMAND 2
254 typedef enum QueryMode
256 QUERY_SIMPLE, /* simple query */
257 QUERY_EXTENDED, /* extended query */
258 QUERY_PREPARED, /* extended query with prepared statements */
262 static QueryMode querymode = QUERY_SIMPLE;
263 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
267 char *line; /* full text of command line */
268 int command_num; /* unique index of this Command struct */
269 int type; /* command type (SQL_COMMAND or META_COMMAND) */
270 int argc; /* number of command words */
271 char *argv[MAX_ARGS]; /* command word list */
272 int cols[MAX_ARGS]; /* corresponding column starting from 1 */
273 PgBenchExpr *expr; /* parsed expression */
279 long start_time; /* when does the interval start */
280 int cnt; /* number of transactions */
281 int skipped; /* number of transactions skipped under --rate
282 * and --latency-limit */
284 double min_latency; /* min/max latencies */
286 double sum_latency; /* sum(latency), sum(latency^2) - for
292 double sum_lag; /* sum(lag) */
293 double sum2_lag; /* sum(lag*lag) */
296 static Command **sql_files[MAX_FILES]; /* SQL script files */
297 static int num_files; /* number of script files */
298 static int num_commands = 0; /* total number of Command structs */
299 static int debug = 0; /* debug flag */
301 /* default scenario */
302 static char *tpc_b = {
303 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
304 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
305 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
306 "\\setrandom aid 1 :naccounts\n"
307 "\\setrandom bid 1 :nbranches\n"
308 "\\setrandom tid 1 :ntellers\n"
309 "\\setrandom delta -5000 5000\n"
311 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
312 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
313 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
314 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
315 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
320 static char *simple_update = {
321 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
322 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
323 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
324 "\\setrandom aid 1 :naccounts\n"
325 "\\setrandom bid 1 :nbranches\n"
326 "\\setrandom tid 1 :ntellers\n"
327 "\\setrandom delta -5000 5000\n"
329 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
330 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
331 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
336 static char *select_only = {
337 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
338 "\\setrandom aid 1 :naccounts\n"
339 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
342 /* Function prototypes */
343 static void setalarm(int seconds);
344 static void *threadRun(void *arg);
346 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
347 AggVals *agg, bool skipped);
352 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
354 " %s [OPTION]... [DBNAME]\n"
355 "\nInitialization options:\n"
356 " -i, --initialize invokes initialization mode\n"
357 " -F, --fillfactor=NUM set fill factor\n"
358 " -n, --no-vacuum do not run VACUUM after initialization\n"
359 " -q, --quiet quiet logging (one message each 5 seconds)\n"
360 " -s, --scale=NUM scaling factor\n"
361 " --foreign-keys create foreign key constraints between tables\n"
362 " --index-tablespace=TABLESPACE\n"
363 " create indexes in the specified tablespace\n"
364 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
365 " --unlogged-tables create tables as unlogged tables\n"
366 "\nBenchmarking options:\n"
367 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
368 " -C, --connect establish new connection for each transaction\n"
369 " -D, --define=VARNAME=VALUE\n"
370 " define variable for use by custom script\n"
371 " -f, --file=FILENAME read transaction script from FILENAME\n"
372 " -j, --jobs=NUM number of threads (default: 1)\n"
373 " -l, --log write transaction times to log file\n"
374 " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n"
376 " -M, --protocol=simple|extended|prepared\n"
377 " protocol for submitting queries (default: simple)\n"
378 " -n, --no-vacuum do not run VACUUM before tests\n"
379 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
380 " -P, --progress=NUM show thread progress report every NUM seconds\n"
381 " -r, --report-latencies report average latency per command\n"
382 " -R, --rate=NUM target rate in transactions per second\n"
383 " -s, --scale=NUM report this scale factor in output\n"
384 " -S, --select-only perform SELECT-only transactions\n"
385 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
386 " -T, --time=NUM duration of benchmark test in seconds\n"
387 " -v, --vacuum-all vacuum all four standard tables before tests\n"
388 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
389 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
390 "\nCommon options:\n"
391 " -d, --debug print debugging output\n"
392 " -h, --host=HOSTNAME database server host or socket directory\n"
393 " -p, --port=PORT database server port number\n"
394 " -U, --username=USERNAME connect as specified database user\n"
395 " -V, --version output version information, then exit\n"
396 " -?, --help show this help, then exit\n"
398 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
403 * strtoint64 -- convert a string to 64-bit integer
405 * This function is a modified version of scanint8() from
406 * src/backend/utils/adt/int8.c.
409 strtoint64(const char *str)
411 const char *ptr = str;
416 * Do our own scan, rather than relying on sscanf which might be broken
420 /* skip leading spaces */
421 while (*ptr && isspace((unsigned char) *ptr))
430 * Do an explicit check for INT64_MIN. Ugly though this is, it's
431 * cleaner than trying to get the loop below to handle it portably.
433 if (strncmp(ptr, "9223372036854775808", 19) == 0)
435 result = PG_INT64_MIN;
441 else if (*ptr == '+')
444 /* require at least one digit */
445 if (!isdigit((unsigned char) *ptr))
446 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
449 while (*ptr && isdigit((unsigned char) *ptr))
451 int64 tmp = result * 10 + (*ptr++ - '0');
453 if ((tmp / 10) != result) /* overflow? */
454 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
460 /* allow trailing whitespace, but not other trailing chars */
461 while (*ptr != '\0' && isspace((unsigned char) *ptr))
465 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
467 return ((sign < 0) ? -result : result);
470 /* random number generator: uniform distribution from min to max inclusive */
472 getrand(TState *thread, int64 min, int64 max)
475 * Odd coding is so that min and max have approximately the same chance of
476 * being selected as do numbers between them.
478 * pg_erand48() is thread-safe and concurrent, which is why we use it
479 * rather than random(), which in glibc is non-reentrant, and therefore
480 * protected by a mutex, and therefore a bottleneck on machines with many
483 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
487 * random number generator: exponential distribution from min to max inclusive.
488 * the threshold is so that the density of probability for the last cut-off max
489 * value is exp(-threshold).
492 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
498 Assert(threshold > 0.0);
499 cut = exp(-threshold);
500 /* erand in [0, 1), uniform in (0, 1] */
501 uniform = 1.0 - pg_erand48(thread->random_state);
504 * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1)
506 Assert((1.0 - cut) != 0.0);
507 rand = -log(cut + (1.0 - cut) * uniform) / threshold;
508 /* return int64 random number within between min and max */
509 return min + (int64) ((max - min + 1) * rand);
512 /* random number generator: gaussian distribution from min to max inclusive */
514 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
520 * Get user specified random number from this loop, with -threshold <
523 * This loop is executed until the number is in the expected range.
525 * As the minimum threshold is 2.0, the probability of looping is low:
526 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
527 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
528 * the worst case. For a 5.0 threshold value, the looping probability is
529 * about e^{-5} * 2 / pi ~ 0.43%.
534 * pg_erand48 generates [0,1), but for the basic version of the
535 * Box-Muller transform the two uniformly distributed random numbers
536 * are expected in (0, 1] (see
537 * http://en.wikipedia.org/wiki/Box_muller)
539 double rand1 = 1.0 - pg_erand48(thread->random_state);
540 double rand2 = 1.0 - pg_erand48(thread->random_state);
542 /* Box-Muller basic form transform */
543 double var_sqrt = sqrt(-2.0 * log(rand1));
545 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
548 * we may try with cos, but there may be a bias induced if the
549 * previous value fails the test. To be on the safe side, let us try
553 while (stdev < -threshold || stdev >= threshold);
555 /* stdev is in [-threshold, threshold), normalization to [0,1) */
556 rand = (stdev + threshold) / (threshold * 2.0);
558 /* return int64 random number within between min and max */
559 return min + (int64) ((max - min + 1) * rand);
563 * random number generator: generate a value, such that the series of values
564 * will approximate a Poisson distribution centered on the given value.
567 getPoissonRand(TState *thread, int64 center)
570 * Use inverse transform sampling to generate a value > 0, such that the
571 * expected (i.e. average) value is the given argument.
575 /* erand in [0, 1), uniform in (0, 1] */
576 uniform = 1.0 - pg_erand48(thread->random_state);
578 return (int64) (-log(uniform) * ((double) center) + 0.5);
581 /* call PQexec() and exit() on failure */
583 executeStatement(PGconn *con, const char *sql)
587 res = PQexec(con, sql);
588 if (PQresultStatus(res) != PGRES_COMMAND_OK)
590 fprintf(stderr, "%s", PQerrorMessage(con));
596 /* call PQexec() and complain, but without exiting, on failure */
598 tryExecuteStatement(PGconn *con, const char *sql)
602 res = PQexec(con, sql);
603 if (PQresultStatus(res) != PGRES_COMMAND_OK)
605 fprintf(stderr, "%s", PQerrorMessage(con));
606 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
611 /* set up a connection to the backend */
616 static char *password = NULL;
620 * Start the connection. Loop until we have a password if requested by
625 #define PARAMS_ARRAY_SIZE 7
627 const char *keywords[PARAMS_ARRAY_SIZE];
628 const char *values[PARAMS_ARRAY_SIZE];
630 keywords[0] = "host";
632 keywords[1] = "port";
634 keywords[2] = "user";
636 keywords[3] = "password";
637 values[3] = password;
638 keywords[4] = "dbname";
640 keywords[5] = "fallback_application_name";
641 values[5] = progname;
647 conn = PQconnectdbParams(keywords, values, true);
651 fprintf(stderr, "connection to database \"%s\" failed\n",
656 if (PQstatus(conn) == CONNECTION_BAD &&
657 PQconnectionNeedsPassword(conn) &&
661 password = simple_prompt("Password: ", 100, false);
666 /* check to see that the backend connection was successfully made */
667 if (PQstatus(conn) == CONNECTION_BAD)
669 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
670 dbName, PQerrorMessage(conn));
678 /* throw away response from backend */
680 discard_response(CState *state)
686 res = PQgetResult(state->con);
693 compareVariables(const void *v1, const void *v2)
695 return strcmp(((const Variable *) v1)->name,
696 ((const Variable *) v2)->name);
700 getVariable(CState *st, char *name)
705 /* On some versions of Solaris, bsearch of zero items dumps core */
706 if (st->nvariables <= 0)
710 var = (Variable *) bsearch((void *) &key,
711 (void *) st->variables,
721 /* check whether the name consists of alphabets, numerals and underscores. */
723 isLegalVariableName(const char *name)
727 for (i = 0; name[i] != '\0'; i++)
729 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
737 putVariable(CState *st, const char *context, char *name, char *value)
743 /* On some versions of Solaris, bsearch of zero items dumps core */
744 if (st->nvariables > 0)
745 var = (Variable *) bsearch((void *) &key,
746 (void *) st->variables,
758 * Check for the name only when declaring a new variable to avoid
761 if (!isLegalVariableName(name))
763 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
769 newvars = (Variable *) pg_realloc(st->variables,
770 (st->nvariables + 1) * sizeof(Variable));
772 newvars = (Variable *) pg_malloc(sizeof(Variable));
774 st->variables = newvars;
776 var = &newvars[st->nvariables];
778 var->name = pg_strdup(name);
779 var->value = pg_strdup(value);
783 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
790 /* dup then free, in case value is pointing at this variable */
791 val = pg_strdup(value);
801 parseVariable(const char *sql, int *eaten)
809 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
814 memcpy(name, &sql[1], i - 1);
822 replaceVariable(char **sql, char *param, int len, char *value)
824 int valueln = strlen(value);
828 size_t offset = param - *sql;
830 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
831 param = *sql + offset;
835 memmove(param + valueln, param + len, strlen(param + len) + 1);
836 memcpy(param, value, valueln);
838 return param + valueln;
842 assignVariables(CState *st, char *sql)
849 while ((p = strchr(p, ':')) != NULL)
853 name = parseVariable(p, &eaten);
863 val = getVariable(st, name);
871 p = replaceVariable(&sql, p, eaten, val);
878 getQueryParams(CState *st, const Command *command, const char **params)
882 for (i = 0; i < command->argc - 1; i++)
883 params[i] = getVariable(st, command->argv[i + 1]);
887 * Recursive evaluation of an expression in a pgbench script
888 * using the current state of variables.
889 * Returns whether the evaluation was ok,
890 * the value itself is returned through the retval pointer.
893 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
897 case ENODE_INTEGER_CONSTANT:
899 *retval = expr->u.integer_constant.ival;
907 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
909 fprintf(stderr, "undefined variable \"%s\"\n",
910 expr->u.variable.varname);
913 *retval = strtoint64(var);
922 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
924 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
926 switch (expr->u.operator.operator)
929 *retval = lval + rval;
933 *retval = lval - rval;
937 *retval = lval * rval;
943 fprintf(stderr, "division by zero\n");
946 *retval = lval / rval;
952 fprintf(stderr, "division by zero\n");
955 *retval = lval % rval;
959 fprintf(stderr, "bad operator\n");
967 fprintf(stderr, "bad expression\n");
972 * Run a shell command. The result is assigned to the variable if not NULL.
973 * Return true if succeeded, or false on error.
976 runShellCommand(CState *st, char *variable, char **argv, int argc)
978 char command[SHELL_COMMAND_SIZE];
987 * Join arguments with whitespace separators. Arguments starting with
988 * exactly one colon are treated as variables:
989 * name - append a string "name"
990 * :var - append a variable named 'var'
991 * ::name - append a string ":name"
994 for (i = 0; i < argc; i++)
999 if (argv[i][0] != ':')
1001 arg = argv[i]; /* a string literal */
1003 else if (argv[i][1] == ':')
1005 arg = argv[i] + 1; /* a string literal starting with colons */
1007 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1009 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1014 arglen = strlen(arg);
1015 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1017 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1022 command[len++] = ' ';
1023 memcpy(command + len, arg, arglen);
1027 command[len] = '\0';
1029 /* Fast path for non-assignment case */
1030 if (variable == NULL)
1032 if (system(command))
1034 if (!timer_exceeded)
1035 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1041 /* Execute the command with pipe and read the standard output. */
1042 if ((fp = popen(command, "r")) == NULL)
1044 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1047 if (fgets(res, sizeof(res), fp) == NULL)
1049 if (!timer_exceeded)
1050 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1056 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1060 /* Check whether the result is an integer and assign it to the variable */
1061 retval = (int) strtol(res, &endptr, 10);
1062 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1064 if (*res == '\0' || *endptr != '\0')
1066 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1070 snprintf(res, sizeof(res), "%d", retval);
1071 if (!putVariable(st, "setshell", variable, res))
1075 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1080 #define MAX_PREPARE_NAME 32
1082 preparedStatementName(char *buffer, int file, int state)
1084 sprintf(buffer, "P%d_%d", file, state);
1088 clientDone(CState *st, bool ok)
1090 (void) ok; /* unused */
1092 if (st->con != NULL)
1097 return false; /* always false */
1101 agg_vals_init(AggVals *aggs, instr_time start)
1103 /* basic counters */
1104 aggs->cnt = 0; /* number of transactions (includes skipped) */
1105 aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
1107 aggs->sum_latency = 0; /* SUM(latency) */
1108 aggs->sum2_latency = 0; /* SUM(latency*latency) */
1110 /* min and max transaction duration */
1111 aggs->min_latency = 0;
1112 aggs->max_latency = 0;
1114 /* schedule lag counters */
1120 /* start of the current interval */
1121 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1124 /* return false iff client should be disconnected */
1126 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1130 bool trans_needs_throttle = false;
1134 * gettimeofday() isn't free, so we get the current timestamp lazily the
1135 * first time it's needed, and reuse the same value throughout this
1136 * function after that. This also ensures that e.g. the calculated latency
1137 * reported in the log file and in the totals are the same. Zero means
1138 * "not set yet". Reset "now" when we step to the next command with "goto
1142 INSTR_TIME_SET_ZERO(now);
1144 commands = sql_files[st->use_file];
1147 * Handle throttling once per transaction by sleeping. It is simpler to
1148 * do this here rather than at the end, because so much complicated logic
1149 * happens below when statements finish.
1151 if (throttle_delay && !st->is_throttled)
1154 * Generate a delay such that the series of delays will approximate a
1155 * Poisson distribution centered on the throttle_delay time.
1157 * If transactions are too slow or a given wait is shorter than a
1158 * transaction, the next transaction will start right away.
1160 int64 wait = getPoissonRand(thread, throttle_delay);
1162 thread->throttle_trigger += wait;
1163 st->txn_scheduled = thread->throttle_trigger;
1166 * If this --latency-limit is used, and this slot is already late so
1167 * that the transaction will miss the latency limit even if it
1168 * completed immediately, we skip this time slot and iterate till the
1169 * next slot that isn't late yet.
1175 if (INSTR_TIME_IS_ZERO(now))
1176 INSTR_TIME_SET_CURRENT(now);
1177 now_us = INSTR_TIME_GET_MICROSEC(now);
1178 while (thread->throttle_trigger < now_us - latency_limit)
1180 thread->throttle_latency_skipped++;
1183 doLog(thread, st, logfile, &now, agg, true);
1185 wait = getPoissonRand(thread, throttle_delay);
1186 thread->throttle_trigger += wait;
1187 st->txn_scheduled = thread->throttle_trigger;
1192 st->throttling = true;
1193 st->is_throttled = true;
1195 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1200 { /* are we sleeping? */
1203 if (INSTR_TIME_IS_ZERO(now))
1204 INSTR_TIME_SET_CURRENT(now);
1205 now_us = INSTR_TIME_GET_MICROSEC(now);
1206 if (st->txn_scheduled <= now_us)
1208 st->sleeping = 0; /* Done sleeping, go ahead with next command */
1211 /* Measure lag of throttled transaction relative to target */
1212 int64 lag = now_us - st->txn_scheduled;
1214 thread->throttle_lag += lag;
1215 if (lag > thread->throttle_lag_max)
1216 thread->throttle_lag_max = lag;
1217 st->throttling = false;
1221 return true; /* Still sleeping, nothing to do here */
1225 { /* are we receiver? */
1226 if (commands[st->state]->type == SQL_COMMAND)
1229 fprintf(stderr, "client %d receiving\n", st->id);
1230 if (!PQconsumeInput(st->con))
1231 { /* there's something wrong */
1232 fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1233 return clientDone(st, false);
1235 if (PQisBusy(st->con))
1236 return true; /* don't have the whole result yet */
1240 * command finished: accumulate per-command execution times in
1241 * thread-local data structure, if per-command latencies are requested
1245 int cnum = commands[st->state]->command_num;
1247 if (INSTR_TIME_IS_ZERO(now))
1248 INSTR_TIME_SET_CURRENT(now);
1249 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1250 now, st->stmt_begin);
1251 thread->exec_count[cnum]++;
1254 /* transaction finished: calculate latency and log the transaction */
1255 if (commands[st->state + 1] == NULL)
1257 /* only calculate latency if an option is used that needs it */
1258 if (progress || throttle_delay || latency_limit)
1262 if (INSTR_TIME_IS_ZERO(now))
1263 INSTR_TIME_SET_CURRENT(now);
1265 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1267 st->txn_latencies += latency;
1270 * XXX In a long benchmark run of high-latency transactions,
1271 * this int64 addition eventually overflows. For example, 100
1272 * threads running 10s transactions will overflow it in 2.56
1273 * hours. With a more-typical OLTP workload of .1s
1274 * transactions, overflow would take 256 hours.
1276 st->txn_sqlats += latency * latency;
1278 /* record over the limit transactions if needed. */
1279 if (latency_limit && latency > latency_limit)
1280 thread->latency_late++;
1283 /* record the time it took in the log */
1285 doLog(thread, st, logfile, &now, agg, false);
1288 if (commands[st->state]->type == SQL_COMMAND)
1291 * Read and discard the query result; note this is not included in
1292 * the statement latency numbers.
1294 res = PQgetResult(st->con);
1295 switch (PQresultStatus(res))
1297 case PGRES_COMMAND_OK:
1298 case PGRES_TUPLES_OK:
1301 fprintf(stderr, "client %d aborted in state %d: %s",
1302 st->id, st->state, PQerrorMessage(st->con));
1304 return clientDone(st, false);
1307 discard_response(st);
1310 if (commands[st->state + 1] == NULL)
1319 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1320 return clientDone(st, true); /* exit success */
1323 /* increment state counter */
1325 if (commands[st->state] == NULL)
1328 st->use_file = (int) getrand(thread, 0, num_files - 1);
1329 commands = sql_files[st->use_file];
1330 st->is_throttled = false;
1333 * No transaction is underway anymore, which means there is
1334 * nothing to listen to right now. When throttling rate limits
1335 * are active, a sleep will happen next, as the next transaction
1336 * starts. And then in any case the next SQL command will set
1340 trans_needs_throttle = (throttle_delay > 0);
1344 if (st->con == NULL)
1349 INSTR_TIME_SET_CURRENT(start);
1350 if ((st->con = doConnect()) == NULL)
1352 fprintf(stderr, "client %d aborted while establishing connection\n",
1354 return clientDone(st, false);
1356 INSTR_TIME_SET_CURRENT(end);
1357 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1361 * This ensures that a throttling delay is inserted before proceeding with
1362 * sql commands, after the first transaction. The first transaction
1363 * throttling is performed when first entering doCustom.
1365 if (trans_needs_throttle)
1367 trans_needs_throttle = false;
1371 /* Record transaction start time under logging, progress or throttling */
1372 if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1374 INSTR_TIME_SET_CURRENT(st->txn_begin);
1377 * When not throttling, this is also the transaction's scheduled start
1380 if (!throttle_delay)
1381 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1384 /* Record statement start time if per-command latencies are requested */
1386 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1388 if (commands[st->state]->type == SQL_COMMAND)
1390 const Command *command = commands[st->state];
1393 if (querymode == QUERY_SIMPLE)
1397 sql = pg_strdup(command->argv[0]);
1398 sql = assignVariables(st, sql);
1401 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1402 r = PQsendQuery(st->con, sql);
1405 else if (querymode == QUERY_EXTENDED)
1407 const char *sql = command->argv[0];
1408 const char *params[MAX_ARGS];
1410 getQueryParams(st, command, params);
1413 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1414 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1415 NULL, params, NULL, NULL, 0);
1417 else if (querymode == QUERY_PREPARED)
1419 char name[MAX_PREPARE_NAME];
1420 const char *params[MAX_ARGS];
1422 if (!st->prepared[st->use_file])
1426 for (j = 0; commands[j] != NULL; j++)
1429 char name[MAX_PREPARE_NAME];
1431 if (commands[j]->type != SQL_COMMAND)
1433 preparedStatementName(name, st->use_file, j);
1434 res = PQprepare(st->con, name,
1435 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1436 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1437 fprintf(stderr, "%s", PQerrorMessage(st->con));
1440 st->prepared[st->use_file] = true;
1443 getQueryParams(st, command, params);
1444 preparedStatementName(name, st->use_file, st->state);
1447 fprintf(stderr, "client %d sending %s\n", st->id, name);
1448 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1449 params, NULL, NULL, 0);
1451 else /* unknown sql mode */
1457 fprintf(stderr, "client %d could not send %s\n",
1458 st->id, command->argv[0]);
1462 st->listen = 1; /* flags that should be listened */
1464 else if (commands[st->state]->type == META_COMMAND)
1466 int argc = commands[st->state]->argc,
1468 char **argv = commands[st->state]->argv;
1472 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1473 for (i = 1; i < argc; i++)
1474 fprintf(stderr, " %s", argv[i]);
1475 fprintf(stderr, "\n");
1478 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1483 double threshold = 0;
1486 if (*argv[2] == ':')
1488 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1490 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1495 min = strtoint64(var);
1498 min = strtoint64(argv[2]);
1500 if (*argv[3] == ':')
1502 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1504 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1509 max = strtoint64(var);
1512 max = strtoint64(argv[3]);
1516 fprintf(stderr, "%s: \\setrandom maximum is less than minimum\n",
1523 * Generate random number functions need to be able to subtract
1524 * max from min and add one to the result without overflowing.
1525 * Since we know max > min, we can detect overflow just by
1526 * checking for a negative result. But we must check both that the
1527 * subtraction doesn't overflow, and that adding one to the result
1528 * doesn't overflow either.
1530 if (max - min < 0 || (max - min) + 1 < 0)
1532 fprintf(stderr, "%s: \\setrandom range is too large\n",
1538 if (argc == 4 || /* uniform without or with "uniform" keyword */
1539 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1542 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1544 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1546 else if (argc == 6 &&
1547 ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1548 (pg_strcasecmp(argv[4], "exponential") == 0)))
1550 if (*argv[5] == ':')
1552 if ((var = getVariable(st, argv[5] + 1)) == NULL)
1554 fprintf(stderr, "%s: invalid threshold number: \"%s\"\n",
1559 threshold = strtod(var, NULL);
1562 threshold = strtod(argv[5], NULL);
1564 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1566 if (threshold < MIN_GAUSSIAN_THRESHOLD)
1568 fprintf(stderr, "gaussian threshold must be at least %f (not \"%s\")\n", MIN_GAUSSIAN_THRESHOLD, argv[5]);
1573 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1575 snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1577 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1579 if (threshold <= 0.0)
1581 fprintf(stderr, "exponential threshold must be greater than zero (not \"%s\")\n", argv[5]);
1586 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1588 snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1591 else /* this means an error somewhere in the parsing phase... */
1593 fprintf(stderr, "%s: invalid arguments for \\setrandom\n",
1599 if (!putVariable(st, argv[0], argv[1], res))
1607 else if (pg_strcasecmp(argv[0], "set") == 0)
1610 PgBenchExpr *expr = commands[st->state]->expr;
1613 if (!evaluateExpr(st, expr, &result))
1618 sprintf(res, INT64_FORMAT, result);
1620 if (!putVariable(st, argv[0], argv[1], res))
1628 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1634 if (*argv[1] == ':')
1636 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1638 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1646 usec = atoi(argv[1]);
1650 if (pg_strcasecmp(argv[2], "ms") == 0)
1652 else if (pg_strcasecmp(argv[2], "s") == 0)
1658 INSTR_TIME_SET_CURRENT(now);
1659 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1664 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1666 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1668 if (timer_exceeded) /* timeout */
1669 return clientDone(st, true);
1670 else if (!ret) /* on error */
1675 else /* succeeded */
1678 else if (pg_strcasecmp(argv[0], "shell") == 0)
1680 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1682 if (timer_exceeded) /* timeout */
1683 return clientDone(st, true);
1684 else if (!ret) /* on error */
1689 else /* succeeded */
1699 * print log entry after completing one transaction.
1702 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1709 * Skip the log entry if sampling is enabled and this row doesn't belong
1710 * to the random sample.
1712 if (sample_rate != 0.0 &&
1713 pg_erand48(thread->random_state) > sample_rate)
1716 if (INSTR_TIME_IS_ZERO(*now))
1717 INSTR_TIME_SET_CURRENT(*now);
1719 latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1723 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1725 /* should we aggregate the results or not? */
1726 if (agg_interval > 0)
1729 * Are we still in the same interval? If yes, accumulate the values
1730 * (print them otherwise)
1732 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1738 * there is no latency to record if the transaction was
1745 agg->sum_latency += latency;
1746 agg->sum2_latency += latency * latency;
1748 /* first in this aggregation interval */
1749 if ((agg->cnt == 1) || (latency < agg->min_latency))
1750 agg->min_latency = latency;
1752 if ((agg->cnt == 1) || (latency > agg->max_latency))
1753 agg->max_latency = latency;
1755 /* and the same for schedule lag */
1758 agg->sum_lag += lag;
1759 agg->sum2_lag += lag * lag;
1761 if ((agg->cnt == 1) || (lag < agg->min_lag))
1763 if ((agg->cnt == 1) || (lag > agg->max_lag))
1771 * Loop until we reach the interval of the current transaction
1772 * (and print all the empty intervals in between).
1774 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1777 * This is a non-Windows branch (thanks to the ifdef in
1778 * usage), so we don't need to handle this in a special way
1781 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1790 fprintf(logfile, " %.0f %.0f %.0f %.0f",
1796 fprintf(logfile, " %d", agg->skipped);
1798 fputc('\n', logfile);
1800 /* move to the next inteval */
1801 agg->start_time = agg->start_time + agg_interval;
1803 /* reset for "no transaction" intervals */
1806 agg->min_latency = 0;
1807 agg->max_latency = 0;
1808 agg->sum_latency = 0;
1809 agg->sum2_latency = 0;
1816 /* reset the values to include only the current transaction. */
1818 agg->skipped = skipped ? 1 : 0;
1819 agg->min_latency = latency;
1820 agg->max_latency = latency;
1821 agg->sum_latency = skipped ? 0.0 : latency;
1822 agg->sum2_latency = skipped ? 0.0 : latency * latency;
1826 agg->sum2_lag = lag * lag;
1831 /* no, print raw transactions */
1834 /* This is more than we really ought to know about instr_time */
1836 fprintf(logfile, "%d %d skipped %d %ld %ld",
1837 st->id, st->cnt, st->use_file,
1838 (long) now->tv_sec, (long) now->tv_usec);
1840 fprintf(logfile, "%d %d %.0f %d %ld %ld",
1841 st->id, st->cnt, latency, st->use_file,
1842 (long) now->tv_sec, (long) now->tv_usec);
1845 /* On Windows, instr_time doesn't provide a timestamp anyway */
1847 fprintf(logfile, "%d %d skipped %d 0 0",
1848 st->id, st->cnt, st->use_file);
1850 fprintf(logfile, "%d %d %.0f %d 0 0",
1851 st->id, st->cnt, latency, st->use_file);
1854 fprintf(logfile, " %.0f", lag);
1855 fputc('\n', logfile);
1859 /* discard connections */
1861 disconnect_all(CState *state, int length)
1865 for (i = 0; i < length; i++)
1869 PQfinish(state[i].con);
1870 state[i].con = NULL;
1875 /* create tables and setup data */
1877 init(bool is_no_vacuum)
1880 * The scale factor at/beyond which 32-bit integers are insufficient for
1881 * storing TPC-B account IDs.
1883 * Although the actual threshold is 21474, we use 20000 because it is easier to
1884 * document and remember, and isn't that far away from the real threshold.
1886 #define SCALE_32BIT_THRESHOLD 20000
1889 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1890 * fields in these table declarations were intended to comply with that.
1891 * The pgbench_accounts table complies with that because the "filler"
1892 * column is set to blank-padded empty string. But for all other tables
1893 * the columns default to NULL and so don't actually take any space. We
1894 * could fix that by giving them non-null default values. However, that
1895 * would completely break comparability of pgbench results with prior
1896 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1897 * anyway, we stick with the historical behavior.
1901 const char *table; /* table name */
1902 const char *smcols; /* column decls if accountIDs are 32 bits */
1903 const char *bigcols; /* column decls if accountIDs are 64 bits */
1904 int declare_fillfactor;
1906 static const struct ddlinfo DDLs[] = {
1909 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1910 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1915 "tid int not null,bid int,tbalance int,filler char(84)",
1916 "tid int not null,bid int,tbalance int,filler char(84)",
1921 "aid int not null,bid int,abalance int,filler char(84)",
1922 "aid bigint not null,bid int,abalance int,filler char(84)",
1927 "bid int not null,bbalance int,filler char(88)",
1928 "bid int not null,bbalance int,filler char(88)",
1932 static const char *const DDLINDEXes[] = {
1933 "alter table pgbench_branches add primary key (bid)",
1934 "alter table pgbench_tellers add primary key (tid)",
1935 "alter table pgbench_accounts add primary key (aid)"
1937 static const char *const DDLKEYs[] = {
1938 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1939 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1940 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1941 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1942 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1951 /* used to track elapsed time and estimate of the remaining time */
1956 int log_interval = 1;
1958 if ((con = doConnect()) == NULL)
1961 for (i = 0; i < lengthof(DDLs); i++)
1965 const struct ddlinfo *ddl = &DDLs[i];
1968 /* Remove old table, if it exists. */
1969 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1970 executeStatement(con, buffer);
1972 /* Construct new create table statement. */
1974 if (ddl->declare_fillfactor)
1975 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1976 " with (fillfactor=%d)", fillfactor);
1977 if (tablespace != NULL)
1979 char *escape_tablespace;
1981 escape_tablespace = PQescapeIdentifier(con, tablespace,
1982 strlen(tablespace));
1983 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1984 " tablespace %s", escape_tablespace);
1985 PQfreemem(escape_tablespace);
1988 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1990 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1991 unlogged_tables ? " unlogged" : "",
1992 ddl->table, cols, opts);
1994 executeStatement(con, buffer);
1997 executeStatement(con, "begin");
1999 for (i = 0; i < nbranches * scale; i++)
2001 /* "filler" column defaults to NULL */
2002 snprintf(sql, sizeof(sql),
2003 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2005 executeStatement(con, sql);
2008 for (i = 0; i < ntellers * scale; i++)
2010 /* "filler" column defaults to NULL */
2011 snprintf(sql, sizeof(sql),
2012 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2013 i + 1, i / ntellers + 1);
2014 executeStatement(con, sql);
2017 executeStatement(con, "commit");
2020 * fill the pgbench_accounts table with some data
2022 fprintf(stderr, "creating tables...\n");
2024 executeStatement(con, "begin");
2025 executeStatement(con, "truncate pgbench_accounts");
2027 res = PQexec(con, "copy pgbench_accounts from stdin");
2028 if (PQresultStatus(res) != PGRES_COPY_IN)
2030 fprintf(stderr, "%s", PQerrorMessage(con));
2035 INSTR_TIME_SET_CURRENT(start);
2037 for (k = 0; k < (int64) naccounts * scale; k++)
2041 /* "filler" column defaults to blank padded empty string */
2042 snprintf(sql, sizeof(sql),
2043 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2044 j, k / naccounts + 1, 0);
2045 if (PQputline(con, sql))
2047 fprintf(stderr, "PQputline failed\n");
2052 * If we want to stick with the original logging, print a message each
2053 * 100k inserted rows.
2055 if ((!use_quiet) && (j % 100000 == 0))
2057 INSTR_TIME_SET_CURRENT(diff);
2058 INSTR_TIME_SUBTRACT(diff, start);
2060 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2061 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2063 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2064 j, (int64) naccounts * scale,
2065 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2066 elapsed_sec, remaining_sec);
2068 /* let's not call the timing for each row, but only each 100 rows */
2069 else if (use_quiet && (j % 100 == 0))
2071 INSTR_TIME_SET_CURRENT(diff);
2072 INSTR_TIME_SUBTRACT(diff, start);
2074 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2075 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2077 /* have we reached the next interval (or end)? */
2078 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2080 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2081 j, (int64) naccounts * scale,
2082 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2084 /* skip to the next interval */
2085 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2090 if (PQputline(con, "\\.\n"))
2092 fprintf(stderr, "very last PQputline failed\n");
2097 fprintf(stderr, "PQendcopy failed\n");
2100 executeStatement(con, "commit");
2105 fprintf(stderr, "vacuum...\n");
2106 executeStatement(con, "vacuum analyze pgbench_branches");
2107 executeStatement(con, "vacuum analyze pgbench_tellers");
2108 executeStatement(con, "vacuum analyze pgbench_accounts");
2109 executeStatement(con, "vacuum analyze pgbench_history");
2115 fprintf(stderr, "set primary keys...\n");
2116 for (i = 0; i < lengthof(DDLINDEXes); i++)
2120 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2122 if (index_tablespace != NULL)
2124 char *escape_tablespace;
2126 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2127 strlen(index_tablespace));
2128 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2129 " using index tablespace %s", escape_tablespace);
2130 PQfreemem(escape_tablespace);
2133 executeStatement(con, buffer);
2137 * create foreign keys
2141 fprintf(stderr, "set foreign keys...\n");
2142 for (i = 0; i < lengthof(DDLKEYs); i++)
2144 executeStatement(con, DDLKEYs[i]);
2148 fprintf(stderr, "done.\n");
2153 * Parse the raw sql and replace :param to $n.
2156 parseQuery(Command *cmd, const char *raw_sql)
2161 sql = pg_strdup(raw_sql);
2165 while ((p = strchr(p, ':')) != NULL)
2171 name = parseVariable(p, &eaten);
2181 if (cmd->argc >= MAX_ARGS)
2183 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2188 sprintf(var, "$%d", cmd->argc);
2189 p = replaceVariable(&sql, p, eaten, var);
2191 cmd->argv[cmd->argc] = name;
2200 syntax_error(const char *source, const int lineno,
2201 const char *line, const char *command,
2202 const char *msg, const char *more, const int column)
2204 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2206 fprintf(stderr, " (%s)", more);
2208 fprintf(stderr, " at column %d", column);
2209 fprintf(stderr, " in command \"%s\"\n", command);
2212 fprintf(stderr, "%s\n", line);
2217 for (i = 0; i < column - 1; i++)
2218 fprintf(stderr, " ");
2219 fprintf(stderr, "^ error found here\n");
2225 /* Parse a command; return a Command struct, or NULL if it's a comment */
2227 process_commands(char *buf, const char *source, const int lineno)
2229 const char delim[] = " \f\n\r\t\v";
2231 Command *my_commands;
2236 /* Make the string buf end at the next newline */
2237 if ((p = strchr(buf, '\n')) != NULL)
2240 /* Skip leading whitespace */
2242 while (isspace((unsigned char) *p))
2245 /* If the line is empty or actually a comment, we're done */
2246 if (*p == '\0' || strncmp(p, "--", 2) == 0)
2249 /* Allocate and initialize Command structure */
2250 my_commands = (Command *) pg_malloc(sizeof(Command));
2251 my_commands->line = pg_strdup(buf);
2252 my_commands->command_num = num_commands++;
2253 my_commands->type = 0; /* until set */
2254 my_commands->argc = 0;
2260 my_commands->type = META_COMMAND;
2263 tok = strtok(++p, delim);
2265 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2270 my_commands->cols[j] = tok - buf + 1;
2271 my_commands->argv[j++] = pg_strdup(tok);
2272 my_commands->argc++;
2273 if (max_args >= 0 && my_commands->argc >= max_args)
2274 tok = strtok(NULL, "");
2276 tok = strtok(NULL, delim);
2279 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2282 * parsing: \setrandom variable min max [uniform] \setrandom
2283 * variable min max (gaussian|exponential) threshold
2286 if (my_commands->argc < 4)
2288 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2289 "missing arguments", NULL, -1);
2294 if (my_commands->argc == 4 || /* uniform without/with
2295 * "uniform" keyword */
2296 (my_commands->argc == 5 &&
2297 pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2301 else if ( /* argc >= 5 */
2302 (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2303 (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2305 if (my_commands->argc < 6)
2307 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2308 "missing threshold argument", my_commands->argv[4], -1);
2310 else if (my_commands->argc > 6)
2312 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2313 "too many arguments", my_commands->argv[4],
2314 my_commands->cols[6]);
2317 else /* cannot parse, unexpected arguments */
2319 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2320 "unexpected argument", my_commands->argv[4],
2321 my_commands->cols[4]);
2324 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2326 if (my_commands->argc < 3)
2328 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2329 "missing argument", NULL, -1);
2332 expr_scanner_init(my_commands->argv[2], source, lineno,
2333 my_commands->line, my_commands->argv[0],
2334 my_commands->cols[2] - 1);
2336 if (expr_yyparse() != 0)
2338 /* dead code: exit done from syntax_error called by yyerror */
2342 my_commands->expr = expr_parse_result;
2344 expr_scanner_finish();
2346 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2348 if (my_commands->argc < 2)
2350 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2351 "missing argument", NULL, -1);
2355 * Split argument into number and unit to allow "sleep 1ms" etc.
2356 * We don't have to terminate the number argument with null
2357 * because it will be parsed with atoi, which ignores trailing
2358 * non-digit characters.
2360 if (my_commands->argv[1][0] != ':')
2362 char *c = my_commands->argv[1];
2364 while (isdigit((unsigned char) *c))
2368 my_commands->argv[2] = c;
2369 if (my_commands->argc < 3)
2370 my_commands->argc = 3;
2374 if (my_commands->argc >= 3)
2376 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2377 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2378 pg_strcasecmp(my_commands->argv[2], "s") != 0)
2380 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2381 "unknown time unit, must be us, ms or s",
2382 my_commands->argv[2], my_commands->cols[2]);
2386 /* this should be an error?! */
2387 for (j = 3; j < my_commands->argc; j++)
2388 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2389 my_commands->argv[0], my_commands->argv[j]);
2391 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2393 if (my_commands->argc < 3)
2395 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2396 "missing argument", NULL, -1);
2399 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2401 if (my_commands->argc < 1)
2403 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2404 "missing command", NULL, -1);
2409 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2410 "invalid command", NULL, -1);
2415 my_commands->type = SQL_COMMAND;
2420 my_commands->argv[0] = pg_strdup(p);
2421 my_commands->argc++;
2423 case QUERY_EXTENDED:
2424 case QUERY_PREPARED:
2425 if (!parseQuery(my_commands, p))
2437 * Read a line from fd, and return it in a malloc'd buffer.
2438 * Return NULL at EOF.
2440 * The buffer will typically be larger than necessary, but we don't care
2441 * in this program, because we'll free it as soon as we've parsed the line.
2444 read_line_from_file(FILE *fd)
2446 char tmpbuf[BUFSIZ];
2448 size_t buflen = BUFSIZ;
2451 buf = (char *) palloc(buflen);
2454 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2456 size_t thislen = strlen(tmpbuf);
2458 /* Append tmpbuf to whatever we had already */
2459 memcpy(buf + used, tmpbuf, thislen + 1);
2462 /* Done if we collected a newline */
2463 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2466 /* Else, enlarge buf to ensure we can append next bufferload */
2468 buf = (char *) pg_realloc(buf, buflen);
2480 process_file(char *filename)
2482 #define COMMANDS_ALLOC_NUM 128
2484 Command **my_commands;
2491 if (num_files >= MAX_FILES)
2493 fprintf(stderr, "at most %d SQL files are allowed\n", MAX_FILES);
2497 alloc_num = COMMANDS_ALLOC_NUM;
2498 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2500 if (strcmp(filename, "-") == 0)
2502 else if ((fd = fopen(filename, "r")) == NULL)
2504 fprintf(stderr, "could not open file \"%s\": %s\n",
2505 filename, strerror(errno));
2506 pg_free(my_commands);
2513 while ((buf = read_line_from_file(fd)) != NULL)
2519 command = process_commands(buf, filename, lineno);
2523 if (command == NULL)
2526 my_commands[index] = command;
2529 if (index >= alloc_num)
2531 alloc_num += COMMANDS_ALLOC_NUM;
2532 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2537 my_commands[index] = NULL;
2539 sql_files[num_files++] = my_commands;
2545 process_builtin(char *tb, const char *source)
2547 #define COMMANDS_ALLOC_NUM 128
2549 Command **my_commands;
2555 alloc_num = COMMANDS_ALLOC_NUM;
2556 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2567 while (*tb && *tb != '\n')
2580 command = process_commands(buf, source, lineno);
2581 if (command == NULL)
2584 my_commands[index] = command;
2587 if (index >= alloc_num)
2589 alloc_num += COMMANDS_ALLOC_NUM;
2590 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2594 my_commands[index] = NULL;
2599 /* print out results */
2601 printResults(int ttype, int64 normal_xacts, int nclients,
2602 TState *threads, int nthreads,
2603 instr_time total_time, instr_time conn_total_time,
2604 int64 total_latencies, int64 total_sqlats,
2605 int64 throttle_lag, int64 throttle_lag_max,
2606 int64 throttle_latency_skipped, int64 latency_late)
2608 double time_include,
2613 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2614 tps_include = normal_xacts / time_include;
2615 tps_exclude = normal_xacts / (time_include -
2616 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2619 s = "TPC-B (sort of)";
2620 else if (ttype == 2)
2621 s = "Update only pgbench_accounts";
2622 else if (ttype == 1)
2627 printf("transaction type: %s\n", s);
2628 printf("scaling factor: %d\n", scale);
2629 printf("query mode: %s\n", QUERYMODE[querymode]);
2630 printf("number of clients: %d\n", nclients);
2631 printf("number of threads: %d\n", nthreads);
2634 printf("number of transactions per client: %d\n", nxacts);
2635 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2636 normal_xacts, (int64) nxacts * nclients);
2640 printf("duration: %d s\n", duration);
2641 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2645 /* Remaining stats are nonsensical if we failed to execute any xacts */
2646 if (normal_xacts <= 0)
2649 if (throttle_delay && latency_limit)
2650 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2651 throttle_latency_skipped,
2652 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2655 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2656 latency_limit / 1000.0, latency_late,
2657 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2659 if (throttle_delay || progress || latency_limit)
2661 /* compute and show latency average and standard deviation */
2662 double latency = 0.001 * total_latencies / normal_xacts;
2663 double sqlat = (double) total_sqlats / normal_xacts;
2665 printf("latency average: %.3f ms\n"
2666 "latency stddev: %.3f ms\n",
2667 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2671 /* only an average latency computed from the duration is available */
2672 printf("latency average: %.3f ms\n",
2673 1000.0 * duration * nclients / normal_xacts);
2679 * Report average transaction lag under rate limit throttling. This
2680 * is the delay between scheduled and actual start times for the
2681 * transaction. The measured lag may be caused by thread/client load,
2682 * the database load, or the Poisson throttling process.
2684 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2685 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2688 printf("tps = %f (including connections establishing)\n", tps_include);
2689 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2691 /* Report per-command latencies */
2696 for (i = 0; i < num_files; i++)
2701 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2703 printf("statement latencies in milliseconds:\n");
2705 for (commands = sql_files[i]; *commands != NULL; commands++)
2707 Command *command = *commands;
2708 int cnum = command->command_num;
2710 instr_time total_exec_elapsed;
2711 int total_exec_count;
2714 /* Accumulate per-thread data for command */
2715 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2716 total_exec_count = 0;
2717 for (t = 0; t < nthreads; t++)
2719 TState *thread = &threads[t];
2721 INSTR_TIME_ADD(total_exec_elapsed,
2722 thread->exec_elapsed[cnum]);
2723 total_exec_count += thread->exec_count[cnum];
2726 if (total_exec_count > 0)
2727 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2731 printf("\t%f\t%s\n", total_time, command->line);
2739 main(int argc, char **argv)
2741 static struct option long_options[] = {
2742 /* systematic long/short named options */
2743 {"client", required_argument, NULL, 'c'},
2744 {"connect", no_argument, NULL, 'C'},
2745 {"debug", no_argument, NULL, 'd'},
2746 {"define", required_argument, NULL, 'D'},
2747 {"file", required_argument, NULL, 'f'},
2748 {"fillfactor", required_argument, NULL, 'F'},
2749 {"host", required_argument, NULL, 'h'},
2750 {"initialize", no_argument, NULL, 'i'},
2751 {"jobs", required_argument, NULL, 'j'},
2752 {"log", no_argument, NULL, 'l'},
2753 {"no-vacuum", no_argument, NULL, 'n'},
2754 {"port", required_argument, NULL, 'p'},
2755 {"progress", required_argument, NULL, 'P'},
2756 {"protocol", required_argument, NULL, 'M'},
2757 {"quiet", no_argument, NULL, 'q'},
2758 {"report-latencies", no_argument, NULL, 'r'},
2759 {"scale", required_argument, NULL, 's'},
2760 {"select-only", no_argument, NULL, 'S'},
2761 {"skip-some-updates", no_argument, NULL, 'N'},
2762 {"time", required_argument, NULL, 'T'},
2763 {"transactions", required_argument, NULL, 't'},
2764 {"username", required_argument, NULL, 'U'},
2765 {"vacuum-all", no_argument, NULL, 'v'},
2766 /* long-named only options */
2767 {"foreign-keys", no_argument, &foreign_keys, 1},
2768 {"index-tablespace", required_argument, NULL, 3},
2769 {"tablespace", required_argument, NULL, 2},
2770 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2771 {"sampling-rate", required_argument, NULL, 4},
2772 {"aggregate-interval", required_argument, NULL, 5},
2773 {"rate", required_argument, NULL, 'R'},
2774 {"latency-limit", required_argument, NULL, 'L'},
2779 int nclients = 1; /* default number of simulated clients */
2780 int nthreads = 1; /* default number of threads */
2781 int is_init_mode = 0; /* initialize mode? */
2782 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2783 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2784 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2785 * 2: skip update of branches and tellers */
2787 char *filename = NULL;
2788 bool scale_given = false;
2790 bool benchmarking_option_set = false;
2791 bool initialization_option_set = false;
2793 CState *state; /* status of clients */
2794 TState *threads; /* array of thread */
2796 instr_time start_time; /* start up time */
2797 instr_time total_time;
2798 instr_time conn_total_time;
2799 int64 total_xacts = 0;
2800 int64 total_latencies = 0;
2801 int64 total_sqlats = 0;
2802 int64 throttle_lag = 0;
2803 int64 throttle_lag_max = 0;
2804 int64 throttle_latency_skipped = 0;
2805 int64 latency_late = 0;
2810 #ifdef HAVE_GETRLIMIT
2820 progname = get_progname(argv[0]);
2824 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2829 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2831 puts("pgbench (PostgreSQL) " PG_VERSION);
2837 /* stderr is buffered on Win32. */
2838 setvbuf(stderr, NULL, _IONBF, 0);
2841 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2843 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2845 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2848 state = (CState *) pg_malloc(sizeof(CState));
2849 memset(state, 0, sizeof(CState));
2851 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2859 pghost = pg_strdup(optarg);
2865 do_vacuum_accounts++;
2868 pgport = pg_strdup(optarg);
2875 benchmarking_option_set = true;
2879 benchmarking_option_set = true;
2882 benchmarking_option_set = true;
2883 nclients = atoi(optarg);
2884 if (nclients <= 0 || nclients > MAXCLIENTS)
2886 fprintf(stderr, "invalid number of clients: \"%s\"\n",
2890 #ifdef HAVE_GETRLIMIT
2891 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2892 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2893 #else /* but BSD doesn't ... */
2894 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2895 #endif /* RLIMIT_NOFILE */
2897 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2900 if (rlim.rlim_cur < nclients + 3)
2902 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
2903 nclients + 3, (long) rlim.rlim_cur);
2904 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
2907 #endif /* HAVE_GETRLIMIT */
2909 case 'j': /* jobs */
2910 benchmarking_option_set = true;
2911 nthreads = atoi(optarg);
2914 fprintf(stderr, "invalid number of threads: \"%s\"\n",
2918 #ifndef ENABLE_THREAD_SAFETY
2921 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
2924 #endif /* !ENABLE_THREAD_SAFETY */
2927 benchmarking_option_set = true;
2931 benchmarking_option_set = true;
2932 is_latencies = true;
2936 scale = atoi(optarg);
2939 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
2944 benchmarking_option_set = true;
2947 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2950 nxacts = atoi(optarg);
2953 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
2959 benchmarking_option_set = true;
2962 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2965 duration = atoi(optarg);
2968 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
2973 login = pg_strdup(optarg);
2976 benchmarking_option_set = true;
2980 initialization_option_set = true;
2984 benchmarking_option_set = true;
2986 filename = pg_strdup(optarg);
2987 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2994 benchmarking_option_set = true;
2996 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2998 fprintf(stderr, "invalid variable definition: \"%s\"\n",
3004 if (!putVariable(&state[0], "option", optarg, p))
3009 initialization_option_set = true;
3010 fillfactor = atoi(optarg);
3011 if (fillfactor < 10 || fillfactor > 100)
3013 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3018 benchmarking_option_set = true;
3021 fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f)\n");
3024 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3025 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3027 if (querymode >= NUM_QUERYMODE)
3029 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3035 benchmarking_option_set = true;
3036 progress = atoi(optarg);
3039 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3046 /* get a double from the beginning of option value */
3047 double throttle_value = atof(optarg);
3049 benchmarking_option_set = true;
3051 if (throttle_value <= 0.0)
3053 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3056 /* Invert rate limit into a time offset */
3057 throttle_delay = (int64) (1000000.0 / throttle_value);
3062 double limit_ms = atof(optarg);
3064 if (limit_ms <= 0.0)
3066 fprintf(stderr, "invalid latency limit: \"%s\"\n",
3070 benchmarking_option_set = true;
3071 latency_limit = (int64) (limit_ms * 1000);
3075 /* This covers long options which take no argument. */
3076 if (foreign_keys || unlogged_tables)
3077 initialization_option_set = true;
3079 case 2: /* tablespace */
3080 initialization_option_set = true;
3081 tablespace = pg_strdup(optarg);
3083 case 3: /* index-tablespace */
3084 initialization_option_set = true;
3085 index_tablespace = pg_strdup(optarg);
3088 benchmarking_option_set = true;
3089 sample_rate = atof(optarg);
3090 if (sample_rate <= 0.0 || sample_rate > 1.0)
3092 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3098 fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3101 benchmarking_option_set = true;
3102 agg_interval = atoi(optarg);
3103 if (agg_interval <= 0)
3105 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3112 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3119 * Don't need more threads than there are clients. (This is not merely an
3120 * optimization; throttle_delay is calculated incorrectly below if some
3121 * threads have no clients assigned to them.)
3123 if (nthreads > nclients)
3124 nthreads = nclients;
3126 /* compute a per thread delay */
3127 throttle_delay *= nthreads;
3130 dbName = argv[optind];
3133 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3135 else if (login != NULL && *login != '\0')
3143 if (benchmarking_option_set)
3145 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3154 if (initialization_option_set)
3156 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3161 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3162 if (nxacts <= 0 && duration <= 0)
3163 nxacts = DEFAULT_NXACTS;
3165 /* --sampling-rate may be used only with -l */
3166 if (sample_rate > 0.0 && !use_log)
3168 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3172 /* --sampling-rate may must not be used with --aggregate-interval */
3173 if (sample_rate > 0.0 && agg_interval > 0)
3175 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3179 if (agg_interval > 0 && !use_log)
3181 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3185 if (duration > 0 && agg_interval > duration)
3187 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3191 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3193 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3198 * save main process id in the global variable because process id will be
3199 * changed after fork.
3201 main_pid = (int) getpid();
3202 progress_nclients = nclients;
3203 progress_nthreads = nthreads;
3207 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3208 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3210 /* copy any -D switch values to all clients */
3211 for (i = 1; i < nclients; i++)
3216 for (j = 0; j < state[0].nvariables; j++)
3218 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3227 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3228 pghost, pgport, nclients, nxacts, dbName);
3230 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3231 pghost, pgport, nclients, duration, dbName);
3234 /* opening connection... */
3239 if (PQstatus(con) == CONNECTION_BAD)
3241 fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
3242 fprintf(stderr, "%s", PQerrorMessage(con));
3249 * get the scaling factor that should be same as count(*) from
3250 * pgbench_branches if this is not a custom query
3252 res = PQexec(con, "select count(*) from pgbench_branches");
3253 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3255 fprintf(stderr, "%s", PQerrorMessage(con));
3258 scale = atoi(PQgetvalue(res, 0, 0));
3261 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3262 PQgetvalue(res, 0, 0));
3267 /* warn if we override user-given -s switch */
3270 "scale option ignored, using count from pgbench_branches table (%d)\n",
3275 * :scale variables normally get -s or database scale, but don't override
3276 * an explicit -D switch
3278 if (getVariable(&state[0], "scale") == NULL)
3280 snprintf(val, sizeof(val), "%d", scale);
3281 for (i = 0; i < nclients; i++)
3283 if (!putVariable(&state[i], "startup", "scale", val))
3289 * Define a :client_id variable that is unique per connection. But don't
3290 * override an explicit -D switch.
3292 if (getVariable(&state[0], "client_id") == NULL)
3294 for (i = 0; i < nclients; i++)
3296 snprintf(val, sizeof(val), "%d", i);
3297 if (!putVariable(&state[i], "startup", "client_id", val))
3304 fprintf(stderr, "starting vacuum...");
3305 tryExecuteStatement(con, "vacuum pgbench_branches");
3306 tryExecuteStatement(con, "vacuum pgbench_tellers");
3307 tryExecuteStatement(con, "truncate pgbench_history");
3308 fprintf(stderr, "end.\n");
3310 if (do_vacuum_accounts)
3312 fprintf(stderr, "starting vacuum pgbench_accounts...");
3313 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3314 fprintf(stderr, "end.\n");
3319 /* set random seed */
3320 INSTR_TIME_SET_CURRENT(start_time);
3321 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3323 /* process builtin SQL scripts */
3327 sql_files[0] = process_builtin(tpc_b,
3328 "<builtin: TPC-B (sort of)>");
3333 sql_files[0] = process_builtin(select_only,
3334 "<builtin: select only>");
3339 sql_files[0] = process_builtin(simple_update,
3340 "<builtin: simple update>");
3348 /* set up thread data structures */
3349 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3352 for (i = 0; i < nthreads; i++)
3354 TState *thread = &threads[i];
3357 thread->state = &state[nclients_dealt];
3359 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
3360 thread->random_state[0] = random();
3361 thread->random_state[1] = random();
3362 thread->random_state[2] = random();
3363 thread->throttle_latency_skipped = 0;
3364 thread->latency_late = 0;
3366 nclients_dealt += thread->nstate;
3370 /* Reserve memory for the thread to store per-command latencies */
3373 thread->exec_elapsed = (instr_time *)
3374 pg_malloc(sizeof(instr_time) * num_commands);
3375 thread->exec_count = (int *)
3376 pg_malloc(sizeof(int) * num_commands);
3378 for (t = 0; t < num_commands; t++)
3380 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3381 thread->exec_count[t] = 0;
3386 thread->exec_elapsed = NULL;
3387 thread->exec_count = NULL;
3391 /* all clients must be assigned to a thread */
3392 Assert(nclients_dealt == nclients);
3394 /* get start up time */
3395 INSTR_TIME_SET_CURRENT(start_time);
3397 /* set alarm if duration is specified. */
3402 #ifdef ENABLE_THREAD_SAFETY
3403 for (i = 0; i < nthreads; i++)
3405 TState *thread = &threads[i];
3407 INSTR_TIME_SET_CURRENT(thread->start_time);
3409 /* the first thread (i = 0) is executed by main thread */
3412 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
3414 if (err != 0 || thread->thread == INVALID_THREAD)
3416 fprintf(stderr, "could not create thread: %s\n", strerror(err));
3422 thread->thread = INVALID_THREAD;
3426 INSTR_TIME_SET_CURRENT(threads[0].start_time);
3427 threads[0].thread = INVALID_THREAD;
3428 #endif /* ENABLE_THREAD_SAFETY */
3430 /* wait for threads and accumulate results */
3431 INSTR_TIME_SET_ZERO(conn_total_time);
3432 for (i = 0; i < nthreads; i++)
3434 TState *thread = &threads[i];
3437 #ifdef ENABLE_THREAD_SAFETY
3438 if (threads[i].thread == INVALID_THREAD)
3439 /* actually run this thread directly in the main thread */
3440 (void) threadRun(thread);
3442 /* wait of other threads. should check that 0 is returned? */
3443 pthread_join(thread->thread, NULL);
3445 (void) threadRun(thread);
3446 #endif /* ENABLE_THREAD_SAFETY */
3448 /* thread level stats */
3449 throttle_lag += thread->throttle_lag;
3450 throttle_latency_skipped = threads->throttle_latency_skipped;
3451 latency_late = thread->latency_late;
3452 if (throttle_lag_max > thread->throttle_lag_max)
3453 throttle_lag_max = thread->throttle_lag_max;
3454 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
3456 /* client-level stats */
3457 for (j = 0; j < thread->nstate; j++)
3459 total_xacts += thread->state[j].cnt;
3460 total_latencies += thread->state[j].txn_latencies;
3461 total_sqlats += thread->state[j].txn_sqlats;
3464 disconnect_all(state, nclients);
3467 * XXX We compute results as though every client of every thread started
3468 * and finished at the same time. That model can diverge noticeably from
3469 * reality for a short benchmark run involving relatively many threads.
3470 * The first thread may process notably many transactions before the last
3471 * thread begins. Improving the model alone would bring limited benefit,
3472 * because performance during those periods of partial thread count can
3473 * easily exceed steady state performance. This is one of the many ways
3474 * short runs convey deceptive performance figures.
3476 INSTR_TIME_SET_CURRENT(total_time);
3477 INSTR_TIME_SUBTRACT(total_time, start_time);
3478 printResults(ttype, total_xacts, nclients, threads, nthreads,
3479 total_time, conn_total_time, total_latencies, total_sqlats,
3480 throttle_lag, throttle_lag_max, throttle_latency_skipped,
3487 threadRun(void *arg)
3489 TState *thread = (TState *) arg;
3490 CState *state = thread->state;
3491 FILE *logfile = NULL; /* per-thread log file */
3494 int nstate = thread->nstate;
3495 int remains = nstate; /* number of remaining clients */
3498 /* for reporting progress: */
3499 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3500 int64 last_report = thread_start;
3501 int64 next_report = last_report + (int64) progress * 1000000;
3502 int64 last_count = 0,
3511 * Initialize throttling rate target for all of the thread's clients. It
3512 * might be a little more accurate to reset thread->start_time here too.
3513 * The possible drift seems too small relative to typical throttle delay
3514 * times to worry about it.
3516 INSTR_TIME_SET_CURRENT(start);
3517 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3518 thread->throttle_lag = 0;
3519 thread->throttle_lag_max = 0;
3521 INSTR_TIME_SET_ZERO(thread->conn_time);
3523 /* open log file if requested */
3528 if (thread->tid == 0)
3529 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3531 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3532 logfile = fopen(logpath, "w");
3534 if (logfile == NULL)
3536 fprintf(stderr, "could not open logfile \"%s\": %s\n",
3537 logpath, strerror(errno));
3544 /* make connections to the database */
3545 for (i = 0; i < nstate; i++)
3547 if ((state[i].con = doConnect()) == NULL)
3552 /* time after thread and connections set up */
3553 INSTR_TIME_SET_CURRENT(thread->conn_time);
3554 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
3556 agg_vals_init(&aggs, thread->start_time);
3558 /* send start up queries in async manner */
3559 for (i = 0; i < nstate; i++)
3561 CState *st = &state[i];
3562 Command **commands = sql_files[st->use_file];
3563 int prev_ecnt = st->ecnt;
3565 st->use_file = getrand(thread, 0, num_files - 1);
3566 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3567 remains--; /* I've aborted */
3569 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3571 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3573 remains--; /* I've aborted */
3582 int maxsock; /* max socket number to be waited */
3586 FD_ZERO(&input_mask);
3589 min_usec = PG_INT64_MAX;
3590 for (i = 0; i < nstate; i++)
3592 CState *st = &state[i];
3593 Command **commands = sql_files[st->use_file];
3596 if (st->con == NULL)
3600 else if (st->sleeping)
3602 if (st->throttling && timer_exceeded)
3604 /* interrupt client which has not started a transaction */
3607 st->throttling = false;
3612 else /* just a nap from the script */
3616 if (min_usec == PG_INT64_MAX)
3620 INSTR_TIME_SET_CURRENT(now);
3621 now_usec = INSTR_TIME_GET_MICROSEC(now);
3624 this_usec = st->txn_scheduled - now_usec;
3625 if (min_usec > this_usec)
3626 min_usec = this_usec;
3629 else if (commands[st->state]->type == META_COMMAND)
3631 min_usec = 0; /* the connection is ready to run */
3635 sock = PQsocket(st->con);
3638 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3642 FD_SET(sock, &input_mask);
3648 /* also wake up to print the next progress report on time */
3649 if (progress && min_usec > 0 && thread->tid == 0)
3651 /* get current time if needed */
3656 INSTR_TIME_SET_CURRENT(now);
3657 now_usec = INSTR_TIME_GET_MICROSEC(now);
3660 if (now_usec >= next_report)
3662 else if ((next_report - now_usec) < min_usec)
3663 min_usec = next_report - now_usec;
3667 * Sleep until we receive data from the server, or a nap-time
3668 * specified in the script ends, or it's time to print a progress
3671 if (min_usec > 0 && maxsock != -1)
3673 int nsocks; /* return from select(2) */
3675 if (min_usec != PG_INT64_MAX)
3677 struct timeval timeout;
3679 timeout.tv_sec = min_usec / 1000000;
3680 timeout.tv_usec = min_usec % 1000000;
3681 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3684 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3689 /* must be something wrong */
3690 fprintf(stderr, "select() failed: %s\n", strerror(errno));
3695 /* ok, backend returns reply */
3696 for (i = 0; i < nstate; i++)
3698 CState *st = &state[i];
3699 Command **commands = sql_files[st->use_file];
3700 int prev_ecnt = st->ecnt;
3702 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3703 || commands[st->state]->type == META_COMMAND))
3705 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3706 remains--; /* I've aborted */
3709 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3711 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3713 remains--; /* I've aborted */
3719 /* progress report by thread 0 for all threads */
3720 if (progress && thread->tid == 0)
3722 instr_time now_time;
3725 INSTR_TIME_SET_CURRENT(now_time);
3726 now = INSTR_TIME_GET_MICROSEC(now_time);
3727 if (now >= next_report)
3729 /* generate and show report */
3735 int64 run = now - last_report;
3744 * Add up the statistics of all threads.
3746 * XXX: No locking. There is no guarantee that we get an
3747 * atomic snapshot of the transaction count and latencies, so
3748 * these figures can well be off by a small amount. The
3749 * progress is report's purpose is to give a quick overview of
3750 * how the test is going, so that shouldn't matter too much.
3751 * (If a read from a 64-bit integer is not atomic, you might
3752 * get a "torn" read and completely bogus latencies though!)
3754 for (i = 0; i < progress_nclients; i++)
3756 count += state[i].cnt;
3757 lats += state[i].txn_latencies;
3758 sqlats += state[i].txn_sqlats;
3761 for (i = 0; i < progress_nthreads; i++)
3762 lags += thread[i].throttle_lag;
3764 total_run = (now - thread_start) / 1000000.0;
3765 tps = 1000000.0 * (count - last_count) / run;
3766 latency = 0.001 * (lats - last_lats) / (count - last_count);
3767 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3768 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3769 lag = 0.001 * (lags - last_lags) / (count - last_count);
3770 skipped = thread->throttle_latency_skipped - last_skipped;
3773 "progress: %.1f s, %.1f tps, "
3774 "lat %.3f ms stddev %.3f",
3775 total_run, tps, latency, stdev);
3778 fprintf(stderr, ", lag %.3f ms", lag);
3780 fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
3782 fprintf(stderr, "\n");
3786 last_sqlats = sqlats;
3789 last_skipped = thread->throttle_latency_skipped;
3792 * Ensure that the next report is in the future, in case
3793 * pgbench/postgres got stuck somewhere.
3797 next_report += (int64) progress *1000000;
3798 } while (now >= next_report);
3804 INSTR_TIME_SET_CURRENT(start);
3805 disconnect_all(state, nstate);
3806 INSTR_TIME_SET_CURRENT(end);
3807 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
3814 * Support for duration option: set timer_exceeded after so many seconds.
3820 handle_sig_alarm(SIGNAL_ARGS)
3822 timer_exceeded = true;
3826 setalarm(int seconds)
3828 pqsignal(SIGALRM, handle_sig_alarm);
3834 static VOID CALLBACK
3835 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3837 timer_exceeded = true;
3841 setalarm(int seconds)
3846 /* This function will be called at most once, so we can cheat a bit. */
3847 queue = CreateTimerQueue();
3848 if (seconds > ((DWORD) -1) / 1000 ||
3849 !CreateTimerQueueTimer(&timer, queue,
3850 win32_timer_callback, NULL, seconds * 1000, 0,
3851 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3853 fprintf(stderr, "failed to set timer\n");
3858 /* partial pthread implementation for Windows */
3860 typedef struct win32_pthread
3863 void *(*routine) (void *);
3868 static unsigned __stdcall
3869 win32_pthread_run(void *arg)
3871 win32_pthread *th = (win32_pthread *) arg;
3873 th->result = th->routine(th->arg);
3879 pthread_create(pthread_t *thread,
3880 pthread_attr_t *attr,
3881 void *(*start_routine) (void *),
3887 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3888 th->routine = start_routine;
3892 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3893 if (th->handle == NULL)
3905 pthread_join(pthread_t th, void **thread_return)
3907 if (th == NULL || th->handle == NULL)
3908 return errno = EINVAL;
3910 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3912 _dosmaperr(GetLastError());
3917 *thread_return = th->result;
3919 CloseHandle(th->handle);